Feature description
Extend arrow_concat_promote_options to handle type mismatches across flush batch boundaries, not just within a single batch.
Currently, ArrowToParquetWriter uses pa.concat_tables(promote_options=...) to unify types within a flush batch. But pyarrow.ParquetWriter locks its schema on the first write_table() call and rejects any subsequent batch with a different schema - even for safe promotions like float32 -> float64:
ArrowInvalid: Table schema does not match schema used to create file:
table: val: float vs. file: val: double
This is a write-time hard failure. The Parquet format requires a single schema per file, and PyArrow enforces this by locking the schema on the first write. The data never makes it to disk.
I am willing to work on this issue.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
Any pipeline where a resource yields Arrow tables (or DataFrames converted to Arrow) with varying numeric types across batches, and the total row count exceeds buffer_max_items (default 5000). Common examples:
- Multiple source files with independent type inference (
read_csv, read_excel, etc.)
- Numeric downcasting optimizations applied per-file
- Columns that are all-NaN in some files (stay
float64) but have data in others (narrowed to float32)
- API responses where optional numeric fields are absent in some pages
The behavior is non-deterministic: a pipeline that succeeds with 2000 rows per batch crashes with 6000 rows per batch, because the larger data spans multiple flush batches.
import os
import dlt
import pyarrow as pa
os.environ["DATA_WRITER__ARROW_CONCAT_PROMOTE_OPTIONS"] = "permissive"
@dlt.resource(write_disposition="replace")
def mixed_types(rows_per_table: int):
yield pa.table({"val": pa.array([1.0] * rows_per_table, type=pa.float32())})
yield pa.table({"val": pa.array([2.0] * rows_per_table, type=pa.float64())})
pipeline = dlt.pipeline("cross_batch_repro", destination="duckdb")
pipeline.run(mixed_types(rows_per_table=2000), loader_file_format="parquet") # works
pipeline.run(mixed_types(rows_per_table=6000), loader_file_format="parquet") # crashes
import pyarrow as pa
import pyarrow.parquet as pq
import io
buf = io.BytesIO()
t1 = pa.table({"val": pa.array([1.0], type=pa.float32())})
writer = pq.ParquetWriter(buf, t1.schema)
writer.write_table(t1) # locks schema to float32
t2 = pa.table({"val": pa.array([2.0], type=pa.float64())})
writer.write_table(t2) # ArrowInvalid - even though float32->float64 is lossless
Proposed solution
Extend ArrowToParquetWriter.write_data() to reconcile schemas across flush batches when arrow_concat_promote_options != "none". When the incoming table's schema differs from the writer's locked schema, use pa.unify_schemas() with the same promote_options value that's already used for within-batch concat. This is a single code path - PyArrow natively handles the "default" vs "permissive" distinction, so no separate logic is needed:
unified = pa.unify_schemas(
[writer.schema, table.schema],
promote_options=promote_options, # same value used for within-batch concat
)
if unified == writer.schema:
table = table.cast(unified) # narrower incoming - widen to match writer
else:
raise SchemaChanged(unified, table) # wider incoming - signal file rotation
Based on the unified schema:
- Incoming narrower than writer (e.g.,
float32 into a float64 writer): cast the table up to match the writer schema. Lossless, keeps data in the same file.
- Incoming wider than writer (e.g.,
float64 into a float32 writer): rotate to a new parquet file with the unified schema. Destinations already handle multiple files per table.
This mirrors how dlt already handles cross-batch column additions via file rotation. Backwards compatible: promote_options="none" (default) is completely unchanged. No new config options needed.
Related issues
Relates to #3697
Feature description
Extend
arrow_concat_promote_optionsto handle type mismatches across flush batch boundaries, not just within a single batch.Currently,
ArrowToParquetWriterusespa.concat_tables(promote_options=...)to unify types within a flush batch. Butpyarrow.ParquetWriterlocks its schema on the firstwrite_table()call and rejects any subsequent batch with a different schema - even for safe promotions likefloat32 -> float64:This is a write-time hard failure. The Parquet format requires a single schema per file, and PyArrow enforces this by locking the schema on the first write. The data never makes it to disk.
I am willing to work on this issue.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
Any pipeline where a resource yields Arrow tables (or DataFrames converted to Arrow) with varying numeric types across batches, and the total row count exceeds
buffer_max_items(default 5000). Common examples:read_csv,read_excel, etc.)float64) but have data in others (narrowed tofloat32)The behavior is non-deterministic: a pipeline that succeeds with 2000 rows per batch crashes with 6000 rows per batch, because the larger data spans multiple flush batches.
Proposed solution
Extend
ArrowToParquetWriter.write_data()to reconcile schemas across flush batches whenarrow_concat_promote_options != "none". When the incoming table's schema differs from the writer's locked schema, usepa.unify_schemas()with the samepromote_optionsvalue that's already used for within-batch concat. This is a single code path - PyArrow natively handles the"default"vs"permissive"distinction, so no separate logic is needed:Based on the unified schema:
float32into afloat64writer): cast the table up to match the writer schema. Lossless, keeps data in the same file.float64into afloat32writer): rotate to a new parquet file with the unified schema. Destinations already handle multiple files per table.This mirrors how dlt already handles cross-batch column additions via file rotation. Backwards compatible:
promote_options="none"(default) is completely unchanged. No new config options needed.Related issues
Relates to #3697