Feature description
If the partition columns of a Delta Lake table differ from the primary key columns, partition pruning will not be active for that table, resulting in a full table scan.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
Reduce the memory usage of a deltatable merge.
Proposed solution
Change of the function of merge_delta_table in deltalake.py
def merge_delta_table(
table: DeltaTable,
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
load_table_name: str,
streamed_exec: bool,
) -> None:
"""Merges in-memory Arrow data into on-disk Delta table."""
strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item]
if strategy in ("upsert", "insert-only"):
evolve_delta_table_schema(table, data.schema)
"""START of Change
partition_by = get_columns_names_with_prop(schema, "partition")
if "parent" in schema:
unique_column = get_first_column_name_with_prop(schema, "unique")
predicate_sel_cols = [unique_column] if unique_column else []
else:
predicate_sel_cols = get_columns_names_with_prop(schema, "primary_key")
predicate_columns = list(dict.fromkeys(partition_by + predicate_sel_cols))
predicate = " AND ".join([f"target.{c} = source.{c}" for c in predicate_columns])
"""END of Change
qry = table.merge(
source=ensure_delta_compatible_arrow_data(data, partition_by),
predicate=predicate,
source_alias="source",
target_alias="target",
streamed_exec=streamed_exec,
)
if strategy == "upsert":
qry = qry.when_matched_update_all()
qry = qry.when_not_matched_insert_all()
qry.execute()
else:
raise ValueError(
f'Merge strategy "{strategy}" is not supported for Delta tables. '
f'Table: "{load_table_name}".'
)
Related issues
No response
Feature description
If the partition columns of a Delta Lake table differ from the primary key columns, partition pruning will not be active for that table, resulting in a full table scan.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
Reduce the memory usage of a deltatable merge.
Proposed solution
Change of the function of merge_delta_table in deltalake.py
def merge_delta_table(
table: DeltaTable,
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
load_table_name: str,
streamed_exec: bool,
) -> None:
"""Merges in-memory Arrow data into on-disk Delta table."""
Related issues
No response