Skip to content

Commit 376a5ad

Browse files
authored
feat: support nested struct columns in Z-Order optimization (delta-io#2007) (delta-io#4358)
Z-Order now accepts dot-notation paths for nested struct fields (e.g., "meta.field_a"). Validation layer only checked top-level field names, so "meta.field_a" was rejected as an unknown column. And even if validation was bypassed, the execution layer treated the dot as a table qualifier instead of a struct field accessor, so it would fail there too. # Description - Fixes delta-io#2007: Z-Order now accepts dot-notation paths for nested struct fields (e.g., "meta.field_a") - Adds validate_zorder_column() that walks the schema to verify each path segment exists and intermediate segments are struct types - Fixes read_zorder() to build proper DataFusion nested field expressions instead of treating dots as table qualifiers # Related Issue(s) - closes delta-io#2007 --------- Signed-off-by: Byeori Kim <bk.byeori.kim@gmail.com>
1 parent b1cb138 commit 376a5ad

2 files changed

Lines changed: 162 additions & 20 deletions

File tree

crates/core/src/operations/optimize.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ use crate::delta_datafusion::{
5555
};
5656
use crate::errors::{DeltaResult, DeltaTableError};
5757
use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL};
58-
use crate::kernel::{Action, Add, PartitionsExt, Remove, Version, scalars::ScalarExt};
58+
use crate::kernel::{
59+
Action, Add, DataType, PartitionsExt, Remove, StructType, Version, scalars::ScalarExt,
60+
};
5961
use crate::kernel::{EagerSnapshot, resolve_snapshot};
6062
use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef};
6163
use crate::parquet_utils::default_writer_properties;
@@ -726,17 +728,25 @@ impl MergePlan {
726728
context: Arc<zorder::ZOrderExecContext>,
727729
table_provider: DeltaTableProvider,
728730
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
729-
use datafusion::common::Column;
731+
use datafusion::functions::core::expr_ext::FieldAccessor;
730732
use datafusion::logical_expr::expr::ScalarFunction;
731-
use datafusion::logical_expr::{Expr, ScalarUDF};
733+
use datafusion::logical_expr::{Expr, ScalarUDF, ident};
732734

733735
let provider = table_provider.with_files(files.files);
734736
let df = context.ctx.read_table(Arc::new(provider))?;
735737

736738
let cols = context
737739
.columns
738740
.iter()
739-
.map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
741+
.map(|col_name| {
742+
let mut segments = col_name.split('.');
743+
let first = segments.next().expect("column name cannot be empty");
744+
let mut expr = ident(first);
745+
for segment in segments {
746+
expr = expr.field(segment);
747+
}
748+
expr
749+
})
740750
.collect_vec();
741751
let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
742752
Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
@@ -1235,6 +1245,31 @@ async fn build_compaction_plan(
12351245
))
12361246
}
12371247

1248+
/// Validates that a z-order column path exists in the schema, supporting nested
1249+
/// struct fields via dot notation (e.g., "meta.field_a").
1250+
fn validate_zorder_column(schema: &StructType, column: &str) -> Result<(), DeltaTableError> {
1251+
let mut segments = column.split('.').peekable();
1252+
let mut current_struct = schema;
1253+
while let Some(segment) = segments.next() {
1254+
let field = current_struct.field(segment).ok_or_else(|| {
1255+
DeltaTableError::Generic(format!(
1256+
"Z-order column \"{column}\": field \"{segment}\" not found in schema"
1257+
))
1258+
})?;
1259+
if segments.peek().is_some() {
1260+
match field.data_type() {
1261+
DataType::Struct(inner) => current_struct = inner,
1262+
_ => {
1263+
return Err(DeltaTableError::Generic(format!(
1264+
"Z-order column \"{column}\": \"{segment}\" is not a struct type"
1265+
)));
1266+
}
1267+
}
1268+
}
1269+
}
1270+
Ok(())
1271+
}
1272+
12381273
async fn build_zorder_plan(
12391274
log_store: &dyn LogStore,
12401275
zorder_columns: Vec<String>,
@@ -1257,19 +1292,8 @@ async fn build_zorder_plan(
12571292
"Z-order columns cannot be partition columns. Found: {zorder_partition_cols:?}"
12581293
)));
12591294
}
1260-
let field_names = snapshot
1261-
.schema()
1262-
.fields()
1263-
.map(|field| field.name().to_string())
1264-
.collect_vec();
1265-
let unknown_columns = zorder_columns
1266-
.iter()
1267-
.filter(|col| !field_names.contains(col))
1268-
.collect_vec();
1269-
if !unknown_columns.is_empty() {
1270-
return Err(DeltaTableError::Generic(format!(
1271-
"Z-order columns must be present in the table schema. Unknown columns: {unknown_columns:?}"
1272-
)));
1295+
for col in &zorder_columns {
1296+
validate_zorder_column(snapshot.schema().as_ref(), col)?;
12731297
}
12741298

12751299
// For now, just be naive and optimize all files in each selected partition.

crates/core/tests/command_optimize.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,9 +1299,12 @@ async fn test_zorder_rejects_nonexistent_columns() -> Result<(), Box<dyn Error>>
12991299
.with_type(OptimizeType::ZOrder(vec!["non-existent".to_string()]))
13001300
.await;
13011301
assert!(result.is_err());
1302-
assert!(result.unwrap_err().to_string().contains(
1303-
"Z-order columns must be present in the table schema. Unknown columns: [\"non-existent\"]"
1304-
));
1302+
assert!(
1303+
result
1304+
.unwrap_err()
1305+
.to_string()
1306+
.contains("field \"non-existent\" not found in schema")
1307+
);
13051308
Ok(())
13061309
}
13071310

@@ -1499,6 +1502,121 @@ async fn test_zorder_respects_target_size() -> Result<(), Box<dyn Error>> {
14991502
Ok(())
15001503
}
15011504

1505+
#[tokio::test]
1506+
async fn test_zorder_nested_columns() -> Result<(), Box<dyn Error>> {
1507+
let schema = Arc::new(ArrowSchema::new(vec![
1508+
Field::new(
1509+
"meta",
1510+
ArrowDataType::Struct(vec![Field::new("field_a", ArrowDataType::Int32, false)].into()),
1511+
false,
1512+
),
1513+
Field::new("value", ArrowDataType::Int32, false),
1514+
]));
1515+
1516+
let batch1 = RecordBatch::try_new(
1517+
schema.clone(),
1518+
vec![
1519+
Arc::new(arrow_array::StructArray::from(vec![(
1520+
Arc::new(Field::new("field_a", ArrowDataType::Int32, false)),
1521+
Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow_array::Array>,
1522+
)])),
1523+
Arc::new(Int32Array::from(vec![10, 20, 30])),
1524+
],
1525+
)?;
1526+
1527+
let batch2 = RecordBatch::try_new(
1528+
schema.clone(),
1529+
vec![
1530+
Arc::new(arrow_array::StructArray::from(vec![(
1531+
Arc::new(Field::new("field_a", ArrowDataType::Int32, false)),
1532+
Arc::new(Int32Array::from(vec![4, 5, 6])) as Arc<dyn arrow_array::Array>,
1533+
)])),
1534+
Arc::new(Int32Array::from(vec![40, 50, 60])),
1535+
],
1536+
)?;
1537+
1538+
let table = DeltaTable::new_in_memory()
1539+
.write(vec![batch1])
1540+
.with_save_mode(deltalake_core::protocol::SaveMode::Append)
1541+
.await?;
1542+
1543+
let table = table
1544+
.write(vec![batch2])
1545+
.with_save_mode(deltalake_core::protocol::SaveMode::Append)
1546+
.await?;
1547+
1548+
let (_, metrics) = table
1549+
.optimize()
1550+
.with_type(OptimizeType::ZOrder(vec!["meta.field_a".to_string()]))
1551+
.await?;
1552+
1553+
assert_eq!(metrics.num_files_added, 1);
1554+
assert_eq!(metrics.num_files_removed, 2);
1555+
1556+
Ok(())
1557+
}
1558+
1559+
#[tokio::test]
1560+
async fn test_zorder_rejects_invalid_nested_path() -> Result<(), Box<dyn Error>> {
1561+
let schema = Arc::new(ArrowSchema::new(vec![
1562+
Field::new(
1563+
"meta",
1564+
ArrowDataType::Struct(vec![Field::new("field_a", ArrowDataType::Int32, false)].into()),
1565+
false,
1566+
),
1567+
Field::new("value", ArrowDataType::Int32, false),
1568+
]));
1569+
1570+
let batch = RecordBatch::try_new(
1571+
schema.clone(),
1572+
vec![
1573+
Arc::new(arrow_array::StructArray::from(vec![(
1574+
Arc::new(Field::new("field_a", ArrowDataType::Int32, false)),
1575+
Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow_array::Array>,
1576+
)])),
1577+
Arc::new(Int32Array::from(vec![10, 20, 30])),
1578+
],
1579+
)?;
1580+
1581+
// Non-existent nested field
1582+
let table = DeltaTable::new_in_memory()
1583+
.write(vec![batch.clone()])
1584+
.with_save_mode(deltalake_core::protocol::SaveMode::Append)
1585+
.await?;
1586+
1587+
let result = table
1588+
.optimize()
1589+
.with_type(OptimizeType::ZOrder(vec!["meta.nonexistent".to_string()]))
1590+
.await;
1591+
assert!(result.is_err());
1592+
assert!(
1593+
result
1594+
.unwrap_err()
1595+
.to_string()
1596+
.contains("field \"nonexistent\" not found in schema")
1597+
);
1598+
1599+
// Non-struct intermediate field
1600+
let table = DeltaTable::new_in_memory()
1601+
.write(vec![batch])
1602+
.with_save_mode(deltalake_core::protocol::SaveMode::Append)
1603+
.await?;
1604+
1605+
let result = table
1606+
.optimize()
1607+
.with_type(OptimizeType::ZOrder(vec!["value.sub".to_string()]))
1608+
.await;
1609+
assert!(result.is_err());
1610+
assert!(
1611+
result
1612+
.unwrap_err()
1613+
.to_string()
1614+
.contains("\"value\" is not a struct type")
1615+
);
1616+
1617+
Ok(())
1618+
}
1619+
15021620
async fn read_parquet_file(
15031621
path: &Path,
15041622
object_store: ObjectStoreRef,

0 commit comments

Comments
 (0)