Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API table.scan does not conform to Iceberg spec for identity partition columns #1401

Open
1 of 3 tasks
rkuhlercadent opened this issue Dec 4, 2024 · 9 comments · May be fixed by #1443
Open
1 of 3 tasks

API table.scan does not conform to Iceberg spec for identity partition columns #1401

rkuhlercadent opened this issue Dec 4, 2024 · 9 comments · May be fixed by #1443
Assignees

Comments

@rkuhlercadent
Copy link

Apache Iceberg version

0.8.0 (latest release)

Please describe the bug 🐞

Per the Iceberg spec, partition columns with identity transforms should get their values from the metadata if not present in the data file. However, table.scan returns null values instead.

https://iceberg.apache.org/spec/#column-projection

"Values for field ids which are not present in a data file must be resolved according the following rules:

Return the value from partition metadata if an Identity Transform exists for the field and the partition value is present in the partition struct on data_file object in the manifest. This allows for metadata only migrations of Hive tables."

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@kevinjqliu
Copy link
Contributor

hi @rkuhlercadent thanks for reporting this issue. can you provide some pseudocode where you're seeing this issue?

@rkuhlercadent
Copy link
Author

rkuhlercadent commented Dec 6, 2024

Here is a python script that will demonstrate the issue.

import os
import datetime

from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, compute_statistics_plan
from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
from pyiceberg.schema import Schema
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.table import TableProperties
from pyiceberg.typedef import Record
from pyiceberg.types import StringType, IntegerType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table.name_mapping import create_mapping_from_schema
import pyarrow as pa
import pyarrow.parquet as pq


def demonstrate_identity_partition_scan_issue():
    # we have petabytes of parquet data in hive format on s3 already that we are cataloging in iceberg format.
    # note that these parquet files do NOT have the partition columns in them which is standard for hive format.
    # the partition values must be taken from the iceberg metadata for the identity partition columns as
    # specified in the iceberg spec: https://iceberg.apache.org/spec/#column-projection
    # "Values for field ids which are not present in a data file must be resolved according the following rules:
    # Return the value from partition metadata if an Identity Transform exists for the field and the partition
    # value is present in the partition struct on data_file object in the manifest. This allows for metadata
    # only migrations of Hive tables."
    warehouse_path = os.path.dirname(os.path.realpath(__file__))
    namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
    table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
    catalog = get_iceberg_catalog(warehouse_path)
    drop_catalog_entities_for_test(catalog, namespace_name)
    # create sample hive files
    sample_hive_parquet_file = create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 202412)
    # catalog existing hive data in iceberg
    catalog.create_namespace(namespace_name)
    table = create_iceberg_table(catalog, namespace_name, table_name)
    add_data_file(table, sample_hive_parquet_file, table.metadata.default_spec_id)
    # the partition_id columns should have values from the metadata not null in this output
    # this same iceberg metadata correctly returns the partition_id column values in spark, athena, and snowflake
    print(table.scan().to_arrow())


def get_iceberg_catalog(warehouse_path):
    # using sqlite catalog on local filesystem for demo
    catalog = SqlCatalog(
        "default",
        **{
            "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
            "warehouse": f"file://{warehouse_path}",
        })
    return catalog


def drop_catalog_entities_for_test(catalog, namespace_name):
    if namespace_name in [n[0] for n in catalog.list_namespaces()]:
        for _, table_name in catalog.list_tables(namespace_name):
            catalog.drop_table(f"{namespace_name}.{table_name}")
        catalog.drop_namespace(namespace_name)


def create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, partition_id):
    location = f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
    os.makedirs(os.path.dirname(location), exist_ok=True)
    name = datetime.datetime.strptime(str(partition_id), "%Y%m").strftime("%B %Y")
    names = pa.array([name], type=pa.string())
    pq.write_table(pa.table([names], names=["name"]), location)
    return {
        "location": location,
        "file_size": os.path.getsize(location),
        "partition_id": partition_id
    }


def create_iceberg_table(catalog, namespace_name, table_name):
    print("creating iceberg table")
    schema = Schema(
        NestedField(field_id=1, name="partition_id", field_type=IntegerType(), required=False),
        NestedField(field_id=2, name="name", field_type=StringType(), required=False))
    partition_spec = PartitionSpec(
        PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_id"))
    table = catalog.create_table(
        f"{namespace_name}.{table_name}",
        schema=schema,
        partition_spec=partition_spec,
        properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()})
    return table


def add_data_file(table, hive_data_file, spec_id):
    print("adding data file")
    parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
    stats_columns = compute_statistics_plan(table.schema(), table.metadata.properties)
    statistics = data_file_statistics_from_parquet_metadata(
        parquet_metadata=parquet_metadata,
        stats_columns=stats_columns,
        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
    data_file = DataFile(
        content=DataFileContent.DATA,
        file_path=hive_data_file.get("location"),
        file_format=FileFormat.PARQUET,
        partition=Record(partition_id=hive_data_file.get("partition_id")),
        file_size_in_bytes=hive_data_file.get("file_size"),
        sort_order_id=None,
        spec_id=spec_id,
        equality_ids=None,
        key_metadata=None,
        **statistics.to_serialized_dict())
    with table.transaction() as tx:
        with tx.update_snapshot().overwrite() as update_snapshot:
            update_snapshot.append_data_file(data_file)


if __name__ == "__main__":
    demonstrate_identity_partition_scan_issue()

Current output from test script. note null partition_id values.

creating iceberg table
adding data file
pyarrow.Table
partition_id: int32
name: large_string
----
partition_id: [[null]]
name: [["December 2024"]]

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Dec 6, 2024

Thanks for providing the test! I added a few print statements

import os
import datetime

from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, compute_statistics_plan
from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
from pyiceberg.schema import Schema
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.table import TableProperties
from pyiceberg.typedef import Record
from pyiceberg.types import StringType, IntegerType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table.name_mapping import create_mapping_from_schema
import pyarrow as pa
import pyarrow.parquet as pq


def demonstrate_identity_partition_scan_issue():
    # we have petabytes of parquet data in hive format on s3 already that we are cataloging in iceberg format.
    # note that these parquet files do NOT have the partition columns in them which is standard for hive format.
    # the partition values must be taken from the iceberg metadata for the identity partition columns as
    # specified in the iceberg spec: https://iceberg.apache.org/spec/#column-projection
    # "Values for field ids which are not present in a data file must be resolved according the following rules:
    # Return the value from partition metadata if an Identity Transform exists for the field and the partition
    # value is present in the partition struct on data_file object in the manifest. This allows for metadata
    # only migrations of Hive tables."
    warehouse_path = os.path.dirname(os.path.realpath(__file__))
    namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
    table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
    catalog = get_iceberg_catalog(warehouse_path)
    drop_catalog_entities_for_test(catalog, namespace_name)
    # create sample hive files
    sample_hive_parquet_file = create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 202412)
    # catalog existing hive data in iceberg
    catalog.create_namespace(namespace_name)
    table = create_iceberg_table(catalog, namespace_name, table_name)

    print("Hive parquet data:\n", pq.read_table(sample_hive_parquet_file.get("location")))
    print()
    add_data_file(table, sample_hive_parquet_file, table.metadata.default_spec_id)
    # the partition_id columns should have values from the metadata not null in this output
    # this same iceberg metadata correctly returns the partition_id column values in spark, athena, and snowflake
    print("Table partitions:\n", table.inspect.partitions().to_pandas())
    print()

    print("Table scan:\n", table.scan().to_arrow())
    print()


def get_iceberg_catalog(warehouse_path):
    # using sqlite catalog on local filesystem for demo
    catalog = SqlCatalog(
        "default",
        **{
            "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
            "warehouse": f"file://{warehouse_path}",
        })
    return catalog


def drop_catalog_entities_for_test(catalog, namespace_name):
    if namespace_name in [n[0] for n in catalog.list_namespaces()]:
        for _, table_name in catalog.list_tables(namespace_name):
            catalog.drop_table(f"{namespace_name}.{table_name}")
        catalog.drop_namespace(namespace_name)


def create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, partition_id):
    location = f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
    os.makedirs(os.path.dirname(location), exist_ok=True)
    name = datetime.datetime.strptime(str(partition_id), "%Y%m").strftime("%B %Y")
    names = pa.array([name], type=pa.string())
    pq.write_table(pa.table([names], names=["name"]), location)
    return {
        "location": location,
        "file_size": os.path.getsize(location),
        "partition_id": partition_id
    }


def create_iceberg_table(catalog, namespace_name, table_name):
    print("creating iceberg table")
    schema = Schema(
        NestedField(field_id=1, name="partition_id", field_type=IntegerType(), required=False),
        NestedField(field_id=2, name="name", field_type=StringType(), required=False))
    partition_spec = PartitionSpec(
        PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_id"))
    table = catalog.create_table(
        f"{namespace_name}.{table_name}",
        schema=schema,
        partition_spec=partition_spec,
        properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()})
    return table


def add_data_file(table, hive_data_file, spec_id):
    print("adding data file")
    parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
    stats_columns = compute_statistics_plan(table.schema(), table.metadata.properties)
    statistics = data_file_statistics_from_parquet_metadata(
        parquet_metadata=parquet_metadata,
        stats_columns=stats_columns,
        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
    data_file = DataFile(
        content=DataFileContent.DATA,
        file_path=hive_data_file.get("location"),
        file_format=FileFormat.PARQUET,
        partition=Record(partition_id=hive_data_file.get("partition_id")),
        file_size_in_bytes=hive_data_file.get("file_size"),
        sort_order_id=None,
        spec_id=spec_id,
        equality_ids=None,
        key_metadata=None,
        **statistics.to_serialized_dict())
    with table.transaction() as tx:
        with tx.update_snapshot().overwrite() as update_snapshot:
            update_snapshot.append_data_file(data_file)


if __name__ == "__main__":
    demonstrate_identity_partition_scan_issue()

And heres the output

creating iceberg table
Hive parquet data:
 pyarrow.Table
name: string
----
name: [["December 2024"]]

adding data file
Table partitions:
                   partition  spec_id  record_count  ...  equality_delete_file_count         last_updated_at  last_updated_snapshot_id
0  {'partition_id': 202412}        0             1  ...                           0 2024-12-06 18:37:00.672       4125530822203270775

[1 rows x 11 columns]

Table scan:
 pyarrow.Table
partition_id: int32
name: large_string
----
partition_id: [[null]]
name: [["December 2024"]]

The issue is the IdentityTransform partition column partition_id is present in the metadata, but not in the table scan.
This deviates from the table spec for column projection https://iceberg.apache.org/spec/#column-projection

This issue also occurs for the Table.add_files API

@kevinjqliu
Copy link
Contributor

Heres the code path for arrow table scan:

Table.to_arrow:

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

ArrowScan.to_table:

def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
"""Scan the Iceberg table and return a pa.Table.
Returns a pa.Table with data from the Iceberg table by resolving the
right columns that match the current table schema. Only data that
matches the provided row_filter expression is returned.
Args:
tasks: FileScanTasks representing the data files and delete files to read from.
Returns:
A PyArrow table. Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
executor = ExecutorFactory.get_or_create()
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
if len(batches) > 0:
return pa.Table.from_batches(batches)
else:
return None
futures = [
executor.submit(
_table_from_scan_task,
task,
)
for task in tasks
]
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if self._limit is not None and total_row_count >= self._limit:
break
# by now, we've either completed all tasks or satisfied the limit
if self._limit is not None:
_ = [f.cancel() for f in futures if not f.done()]
tables = [f.result() for f in completed_futures if f.result()]
if len(tables) < 1:
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
result = pa.concat_tables(tables, promote_options="permissive")
if self._limit is not None:
return result.slice(0, self._limit)
return result

we likely need to post-process according to the column projection rules.

@kevinjqliu
Copy link
Contributor

@rkuhlercadent thanks a bunch for reporting this issue! Would you like to help contribute this fix?

@gabeiglio
Copy link

If its available I would want to give it a go!

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Dec 6, 2024

@gabeiglio assigned to you since I realized OP checked "I cannot contribute a fix for this bug at this time"

@gabeiglio
Copy link

gabeiglio commented Dec 13, 2024

Im open for feedback but as I investigated this issue im inclined that the fix would need to be in _task_to_record_batches.

By comparing the projected schema vs the file projection schema we could:

  1. Check the missing id from file to the partitionSpec and check if isinstance is of IdentityTransform
  2. Check if the data file partition struct contains that partition field (check by name)
  3. Try to inject this new column in the resultant RecordBatch

Im still figuring out how to do step three (and if its possible), apologies for the speed but Ive been a bit short on time. @kevinjqliu does it make sense?

This is my first contribution to the project so I might be missing more context

@kevinjqliu
Copy link
Contributor

That makes sense to me. I think we generally need a place to replicate the column projection logic according to the spec.
Currently, on the read path, the only projection done is to prune columns

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

By comparing the projected schema vs the file projection schema

yea the issue occurs when the table schema has fields that are not present in the file schema.
From the spec:

Values for field ids which are not present in a data file must be resolved according the following rules 

Check if the data file partition struct contains that partition field (check by name)

We don't need this extra check since the table/file schema mismatch will tell us which columns are missing. Also we'd always want to check by field id
From the spec

Columns in Iceberg data files are selected by field id.

Try to inject this new column in the resultant RecordBatch

Yea we'd want to append whatever the value is to the data file records. Luckily arrow is columnar so there wont be much penalty.

@gabeiglio gabeiglio linked a pull request Dec 18, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants