Skip to content

Commit f157cdc

Browse files
authored
fix: preserve generated column metadata during schema merge (delta-io#4191)
# Description Fixes a regression where `schema_mode="merge"` appends strip `delta.generationExpression` from the table schema. Once lost, subsequent writes compute NULL instead of generated values. # Related Issue(s) - closes delta-io#4186 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent 47ef3b5 commit f157cdc

5 files changed

Lines changed: 161 additions & 10 deletions

File tree

crates/core/src/kernel/schema/cast/merge_schema.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,12 +329,21 @@ fn merge_arrow_vec_fields(
329329
Err(e)
330330
}
331331
Ok(mut f) => {
332-
// UNDO the implicit schema merging of batch fields into table fields that is done by
333-
// field.try_merge
334-
f.set_metadata(right_field.metadata().clone());
332+
// Preserve existing (table) column metadata (e.g. generated column
333+
// expressions) as the base, then merge in compatible metadata from the
334+
// batch. This prevents batch-side schemas (which often lack table-defined
335+
// metadata) from overwriting table metadata that `Field::try_merge` may
336+
// have merged in.
337+
f.set_metadata(field.metadata().clone());
335338

336339
let mut field_metadata = f.metadata().clone();
337-
try_merge_metadata(&mut field_metadata, right_field.metadata())?;
340+
// Column generation expressions are table-defined metadata and should not
341+
// be inferred or overridden by incoming batch schemas. Ignore them when
342+
// merging Arrow field metadata to avoid spurious schema errors when the
343+
// input includes conflicting `delta.generationExpression` metadata.
344+
let mut right_metadata = right_field.metadata().clone();
345+
right_metadata.remove(ColumnMetadataKey::GenerationExpression.as_ref());
346+
try_merge_metadata(&mut field_metadata, &right_metadata)?;
338347
f.set_metadata(field_metadata);
339348
Ok(f)
340349
}

crates/core/src/kernel/schema/cast/mod.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ mod tests {
230230
use arrow::buffer::{Buffer, NullBuffer};
231231
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
232232
use delta_kernel::engine::arrow_conversion::TryIntoKernel as _;
233-
use delta_kernel::schema::MetadataValue;
233+
use delta_kernel::schema::{ColumnMetadataKey, MetadataValue};
234234
use itertools::Itertools;
235235

236236
use super::merge_schema::{merge_arrow_schema, merge_delta_struct};
@@ -286,6 +286,71 @@ mod tests {
286286
assert_eq!(fields[0].metadata(), &expected_meta);
287287
}
288288

289+
#[test]
290+
fn test_merge_arrow_schema_preserves_table_field_metadata_when_batch_missing() {
291+
let mut left_meta = HashMap::new();
292+
left_meta.insert(
293+
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
294+
"id + value".to_string(),
295+
);
296+
297+
let left_schema = Arc::new(Schema::new(vec![
298+
Field::new("computed", DataType::Int32, false).with_metadata(left_meta),
299+
]));
300+
301+
// Incoming batch/schema omits field metadata; table metadata must remain intact.
302+
let right_schema = Arc::new(Schema::new(vec![Field::new(
303+
"computed",
304+
DataType::Int32,
305+
false,
306+
)]));
307+
308+
let merged = merge_arrow_schema(left_schema, right_schema, true).unwrap();
309+
let computed = merged.field_with_name("computed").unwrap();
310+
assert_eq!(
311+
computed
312+
.metadata()
313+
.get(ColumnMetadataKey::GenerationExpression.as_ref())
314+
.map(|v| v.as_str()),
315+
Some("id + value")
316+
);
317+
}
318+
319+
#[test]
320+
fn test_merge_arrow_schema_ignores_batch_generation_expression_conflicts() {
321+
let mut left_meta = HashMap::new();
322+
left_meta.insert(
323+
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
324+
"id + value".to_string(),
325+
);
326+
327+
let mut right_meta = HashMap::new();
328+
right_meta.insert(
329+
ColumnMetadataKey::GenerationExpression.as_ref().to_string(),
330+
"id * 10".to_string(),
331+
);
332+
333+
let left_schema = Arc::new(Schema::new(vec![
334+
Field::new("computed", DataType::Int32, false).with_metadata(left_meta),
335+
]));
336+
337+
// Batch metadata may include `delta.generationExpression`, but the table's
338+
// generation expression is authoritative and should not be overridden.
339+
let right_schema = Arc::new(Schema::new(vec![
340+
Field::new("computed", DataType::Int32, false).with_metadata(right_meta),
341+
]));
342+
343+
let merged = merge_arrow_schema(left_schema, right_schema, true).unwrap();
344+
let computed = merged.field_with_name("computed").unwrap();
345+
assert_eq!(
346+
computed
347+
.metadata()
348+
.get(ColumnMetadataKey::GenerationExpression.as_ref())
349+
.map(|v| v.as_str()),
350+
Some("id + value")
351+
);
352+
}
353+
289354
#[test]
290355
fn test_merge_arrow_schema_with_nested() {
291356
let left_schema = Arc::new(Schema::new(vec![Field::new(

crates/core/src/operations/write/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ impl std::future::IntoFuture for WriteBuilder {
542542
}
543543
}
544544

545-
if let Some(new_schema) = new_schema {
545+
if let Some(new_schema) = new_schema.as_ref() {
546546
let mut schema_evolution_projection =
547547
Vec::with_capacity(new_schema.fields().len());
548548
for field in new_schema.fields() {
@@ -594,8 +594,17 @@ impl std::future::IntoFuture for WriteBuilder {
594594
};
595595

596596
if should_update_schema {
597+
// Use the merged Arrow schema (not the DataFusion plan schema) when
598+
// performing schema evolution. DataFusion expressions do not reliably
599+
// preserve Arrow field metadata, which would otherwise strip column
600+
// metadata such as generated column expressions.
597601
let schema_struct: StructType =
598-
source.schema().as_arrow().try_into_kernel()?;
602+
match (this.schema_mode, schema_drift, new_schema.as_deref()) {
603+
(Some(SchemaMode::Merge), true, Some(schema)) => {
604+
schema.try_into_kernel()?
605+
}
606+
_ => source.schema().as_arrow().try_into_kernel()?,
607+
};
599608
// Verify if delta schema changed
600609
if &schema_struct != snapshot.schema().as_ref() {
601610
let current_protocol = snapshot.protocol();

crates/core/tests/integration_datafusion.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,8 +1478,12 @@ fn schema_with_generated_column_and_user(user_nullable: bool) -> StructType {
14781478
}
14791479

14801480
fn id_value_record_batch() -> RecordBatch {
1481-
let id_arr = Int32Array::from(vec![1, 2]);
1482-
let value_arr = Int32Array::from(vec![10, 20]);
1481+
id_value_record_batch_with(vec![1, 2], vec![10, 20])
1482+
}
1483+
1484+
fn id_value_record_batch_with(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
1485+
let id_arr = Int32Array::from(ids);
1486+
let value_arr = Int32Array::from(values);
14831487
RecordBatch::try_from_iter_with_nullable(vec![
14841488
("id", Arc::new(id_arr) as ArrayRef, false),
14851489
("value", Arc::new(value_arr) as ArrayRef, false),
@@ -1513,6 +1517,35 @@ async fn test_schema_merge_append_missing_nullable_column_with_generated_columns
15131517
.await
15141518
.unwrap();
15151519

1520+
// Ensure schema merge didn't strip generated column metadata.
1521+
let schema = table.snapshot().unwrap().snapshot().arrow_schema();
1522+
let computed = schema.field_with_name("computed").unwrap();
1523+
assert_eq!(
1524+
computed
1525+
.metadata()
1526+
.get(ColumnMetadataKey::GenerationExpression.as_ref())
1527+
.map(|v| v.as_str()),
1528+
Some("id + value")
1529+
);
1530+
1531+
// Subsequent appends should continue to generate values for missing generated columns.
1532+
let table = table
1533+
.write(vec![id_value_record_batch_with(vec![3, 4], vec![30, 40])])
1534+
.with_schema_mode(SchemaMode::Merge)
1535+
.await
1536+
.unwrap();
1537+
1538+
// Ensure subsequent schema merges also preserve generated column metadata.
1539+
let schema = table.snapshot().unwrap().snapshot().arrow_schema();
1540+
let computed = schema.field_with_name("computed").unwrap();
1541+
assert_eq!(
1542+
computed
1543+
.metadata()
1544+
.get(ColumnMetadataKey::GenerationExpression.as_ref())
1545+
.map(|v| v.as_str()),
1546+
Some("id + value")
1547+
);
1548+
15161549
let batches = ctx
15171550
.read_table(table.table_provider().await.unwrap())
15181551
.unwrap()
@@ -1530,6 +1563,8 @@ async fn test_schema_merge_append_missing_nullable_column_with_generated_columns
15301563
"+----+-------+----------+------+",
15311564
"| 1 | 10 | 11 | |",
15321565
"| 2 | 20 | 22 | |",
1566+
"| 3 | 30 | 33 | |",
1567+
"| 4 | 40 | 44 | |",
15331568
"+----+-------+----------+------+",
15341569
],
15351570
&batches

python/tests/test_generated_columns.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,9 @@ def test_merge_with_g_during_schema_evolution(
251251
)
252252

253253
id_col = ArrowField("id", DataType.int32(), nullable=True)
254-
gc = ArrowField("gc", DataType.int32(), nullable=True)
254+
gc = ArrowField("gc", DataType.int32(), nullable=True).with_metadata(
255+
{"delta.generationExpression": "5"}
256+
)
255257
expected_data = Table.from_pydict(
256258
{"id": Array([1, 2], type=id_col), "gc": Array([5, 5], type=gc)},
257259
)
@@ -293,6 +295,37 @@ def test_raise_when_gc_passed_merge_statement_during_schema_evolution(
293295
)
294296

295297

298+
def test_schema_evolution_does_not_override_existing_gc_expression(tmp_path):
299+
table_schema = DeltaSchema(
300+
[
301+
Field(name="id", type=PrimitiveType("integer"), nullable=True),
302+
Field(
303+
name="gc",
304+
type=PrimitiveType("integer"),
305+
nullable=True,
306+
metadata={"delta.generationExpression": "5"},
307+
),
308+
Field(name="user", type=PrimitiveType("string"), nullable=True),
309+
]
310+
)
311+
dt = DeltaTable.create(tmp_path, schema=table_schema)
312+
313+
id_col = ArrowField("id", DataType.int32(), nullable=True)
314+
altered_gc_col = ArrowField("gc", DataType.int32(), nullable=True).with_metadata(
315+
{"delta.generationExpression": "id * 10"}
316+
)
317+
altered_gc_data = Table.from_pydict(
318+
{"id": Array([1, 2], type=id_col), "gc": Array([5, 5], type=altered_gc_col)},
319+
)
320+
321+
# Force schema evolution by omitting nullable `user`, and ensure batch-side
322+
# generationExpression metadata does not override table metadata for `gc`.
323+
write_deltalake(dt, mode="append", data=altered_gc_data, schema_mode="merge")
324+
dt = DeltaTable(tmp_path)
325+
fields_by_name = {field.name: field for field in dt.schema().fields}
326+
assert fields_by_name["gc"].metadata == {"delta.generationExpression": "5"}
327+
328+
296329
def test_merge_with_gc_invalid(table_with_gc: DeltaTable, invalid_gc_data):
297330
import re
298331

0 commit comments

Comments
 (0)