Skip to content

Commit 4d103a7

Browse files
authored
chore: upgrade datafusion 33 (#1775)
# Description I've had to live on the bleeding edge of datafusion since I've discovered multiple bugs while implementing merge enhancements. Creating this PR to contain changes necessary to use it. Datafusion has made significant changes with how table statistics are represented. I refactored and was able to trim a significant amount of code. There were some bugs with how we presented statistics for tables that do not contain column metadata (deleta-0.2.0) where we stated the number of records for a file is 0. ## Fixes The null-ability status for partition columns are now accurately captured by Datafusion. Before if a partition column contained a null value an error would be returned. This should be resolved now.
1 parent 44a3760 commit 4d103a7

10 files changed

Lines changed: 348 additions & 443 deletions

File tree

Cargo.toml

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,24 @@ debug = "line-tables-only"
1919

2020
[workspace.dependencies]
2121
# arrow
22-
arrow = { version = "47" }
23-
arrow-array = { version = "47" }
24-
arrow-buffer = { version = "47" }
25-
arrow-cast = { version = "47" }
26-
arrow-ord = { version = "47" }
27-
arrow-row = { version = "47" }
28-
arrow-schema = { version = "47" }
29-
arrow-select = { version = "47" }
30-
parquet = { version = "47" }
22+
arrow = { version = "48.0.1" }
23+
arrow-array = { version = "48.0.1" }
24+
arrow-buffer = { version = "48.0.1" }
25+
arrow-cast = { version = "48.0.1" }
26+
arrow-ord = { version = "48.0.1" }
27+
arrow-row = { version = "48.0.1" }
28+
arrow-schema = { version = "48.0.1" }
29+
arrow-select = { version = "48.0.1" }
30+
parquet = { version = "48.0.1" }
3131

3232
# datafusion
33-
datafusion = { version = "32" }
34-
datafusion-expr = { version = "32" }
35-
datafusion-common = { version = "32" }
36-
datafusion-proto = { version = "32" }
37-
datafusion-sql = { version = "32" }
38-
datafusion-physical-expr = { version = "32" }
33+
datafusion = { version = "33.0.0" }
34+
datafusion-expr = { version = "33.0.0" }
35+
datafusion-common = { version = "33.0.0" }
36+
datafusion-proto = { version = "33.0.0" }
37+
datafusion-sql = { version = "33.0.0" }
38+
datafusion-physical-expr = { version = "33.0.0" }
39+
3940

4041
# serde
4142
serde = { version = "1", features = ["derive"] }

crates/deltalake-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
111111
# Datafusion
112112
dashmap = { version = "5", optional = true }
113113

114-
sqlparser = { version = "0.38", optional = true }
114+
sqlparser = { version = "0.39", optional = true }
115115

116116
# NOTE dependencies only for integration tests
117117
fs_extra = { version = "1.3.0", optional = true }

crates/deltalake-core/src/delta_datafusion/expr.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
7070
fn get_window_meta(&self, name: &str) -> Option<Arc<datafusion_expr::WindowUDF>> {
7171
self.state.window_functions().get(name).cloned()
7272
}
73+
74+
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
75+
unimplemented!()
76+
}
7377
}
7478

7579
/// Parse a string predicate into an `Expr`

crates/deltalake-core/src/delta_datafusion/mod.rs

Lines changed: 188 additions & 380 deletions
Large diffs are not rendered by default.

crates/deltalake-core/src/operations/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ mod datafusion_utils {
208208
metrics::{ExecutionPlanMetricsSet, MetricsSet},
209209
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
210210
};
211-
use datafusion_common::DFSchema;
211+
use datafusion_common::{DFSchema, Statistics};
212212
use datafusion_expr::Expr;
213213
use futures::{Stream, StreamExt};
214214

@@ -334,7 +334,7 @@ mod datafusion_utils {
334334
}))
335335
}
336336

337-
fn statistics(&self) -> datafusion_common::Statistics {
337+
fn statistics(&self) -> DataFusionResult<Statistics> {
338338
self.parent.statistics()
339339
}
340340

crates/deltalake-core/src/operations/optimize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,10 +1185,10 @@ pub(super) mod zorder {
11851185
.ok_or(DataFusionError::NotImplemented(
11861186
"z-order on zero columns.".to_string(),
11871187
))?;
1188-
let columns = columns
1188+
let columns: Vec<ArrayRef> = columns
11891189
.iter()
11901190
.map(|col| col.clone().into_array(length))
1191-
.collect_vec();
1191+
.try_collect()?;
11921192
let array = zorder_key(&columns)?;
11931193
Ok(ColumnarValue::Array(array))
11941194
}

crates/deltalake-core/src/operations/transaction/state.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,12 @@ impl<'a> AddContainer<'a> {
188188
Some(v) => serde_json::Value::String(v.to_string()),
189189
None => serde_json::Value::Null,
190190
};
191-
to_correct_scalar_value(&value, data_type).unwrap_or(
192-
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
193-
)
191+
to_correct_scalar_value(&value, data_type)
192+
.ok()
193+
.flatten()
194+
.unwrap_or(
195+
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
196+
)
194197
} else if let Ok(Some(statistics)) = add.get_stats() {
195198
let values = if get_max {
196199
statistics.max_values
@@ -200,7 +203,11 @@ impl<'a> AddContainer<'a> {
200203

201204
values
202205
.get(&column.name)
203-
.and_then(|f| to_correct_scalar_value(f.as_value()?, data_type))
206+
.and_then(|f| {
207+
to_correct_scalar_value(f.as_value()?, data_type)
208+
.ok()
209+
.flatten()
210+
})
204211
.unwrap_or(
205212
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
206213
)

crates/deltalake-core/tests/integration_datafusion.rs

Lines changed: 118 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use std::error::Error;
4646
mod common;
4747

4848
mod local {
49+
use datafusion::common::stats::Precision;
4950
use deltalake_core::writer::JsonWriter;
5051

5152
use super::*;
@@ -281,67 +282,146 @@ mod local {
281282

282283
#[tokio::test]
283284
async fn test_datafusion_stats() -> Result<()> {
285+
// Validate a table that contains statisitics for all files
284286
let table = open_table("./tests/data/delta-0.8.0").await.unwrap();
285-
let statistics = table.state.datafusion_table_statistics();
287+
let statistics = table.state.datafusion_table_statistics()?;
286288

287-
assert_eq!(statistics.num_rows, Some(4),);
289+
assert_eq!(statistics.num_rows, Precision::Exact(4_usize),);
288290

289-
assert_eq!(statistics.total_byte_size, Some(440 + 440));
291+
assert_eq!(
292+
statistics.total_byte_size,
293+
Precision::Exact((440 + 440) as usize)
294+
);
290295

296+
let column_stats = statistics.column_statistics.get(0).unwrap();
297+
assert_eq!(column_stats.null_count, Precision::Exact(0));
291298
assert_eq!(
292-
statistics
293-
.column_statistics
294-
.clone()
295-
.unwrap()
296-
.iter()
297-
.map(|x| x.null_count)
298-
.collect::<Vec<Option<usize>>>(),
299-
vec![Some(0)],
299+
column_stats.max_value,
300+
Precision::Exact(ScalarValue::from(4_i32))
301+
);
302+
assert_eq!(
303+
column_stats.min_value,
304+
Precision::Exact(ScalarValue::from(0_i32))
300305
);
301306

302307
let ctx = SessionContext::new();
303308
ctx.register_table("test_table", Arc::new(table))?;
304-
305-
let batches = ctx
309+
let actual = ctx
306310
.sql("SELECT max(value), min(value) FROM test_table")
307311
.await?
308312
.collect()
309313
.await?;
310314

311-
assert_eq!(batches.len(), 1);
312-
let batch = &batches[0];
315+
let expected = vec![
316+
"+-----------------------+-----------------------+",
317+
"| MAX(test_table.value) | MIN(test_table.value) |",
318+
"+-----------------------+-----------------------+",
319+
"| 4 | 0 |",
320+
"+-----------------------+-----------------------+",
321+
];
322+
assert_batches_sorted_eq!(&expected, &actual);
323+
324+
// Validate a table that does not contain column statisitics
325+
let table = open_table("./tests/data/delta-0.2.0").await.unwrap();
326+
let statistics = table.state.datafusion_table_statistics()?;
327+
328+
assert_eq!(statistics.num_rows, Precision::Absent);
329+
313330
assert_eq!(
314-
batch.column(0).as_ref(),
315-
Arc::new(Int32Array::from(vec![4])).as_ref(),
331+
statistics.total_byte_size,
332+
Precision::Exact((400 + 404 + 396) as usize)
316333
);
334+
let column_stats = statistics.column_statistics.get(0).unwrap();
335+
assert_eq!(column_stats.null_count, Precision::Absent);
336+
assert_eq!(column_stats.max_value, Precision::Absent);
337+
assert_eq!(column_stats.min_value, Precision::Absent);
338+
339+
ctx.register_table("test_table2", Arc::new(table))?;
340+
let actual = ctx
341+
.sql("SELECT max(value), min(value) FROM test_table2")
342+
.await?
343+
.collect()
344+
.await?;
317345

346+
let expected = vec![
347+
"+------------------------+------------------------+",
348+
"| MAX(test_table2.value) | MIN(test_table2.value) |",
349+
"+------------------------+------------------------+",
350+
"| 3 | 1 |",
351+
"+------------------------+------------------------+",
352+
];
353+
assert_batches_sorted_eq!(&expected, &actual);
354+
355+
// Validate a table that contains nested structures.
356+
357+
// This table is interesting since it goes through schema evolution.
358+
// In particular 'new_column' contains statistics for when it
359+
// is introduced (10) but the commit following (11) does not contain
360+
// statistics for this column.
361+
let table = open_table("./tests/data/delta-1.2.1-only-struct-stats")
362+
.await
363+
.unwrap();
364+
let schema = table.get_schema().unwrap();
365+
let statistics = table.state.datafusion_table_statistics()?;
366+
assert_eq!(statistics.num_rows, Precision::Exact(12));
367+
368+
// `new_column` statistics
369+
let stats = statistics
370+
.column_statistics
371+
.get(schema.index_of("new_column").unwrap())
372+
.unwrap();
373+
assert_eq!(stats.null_count, Precision::Absent);
374+
assert_eq!(stats.min_value, Precision::Absent);
375+
assert_eq!(stats.max_value, Precision::Absent);
376+
377+
// `date` statistics
378+
let stats = statistics
379+
.column_statistics
380+
.get(schema.index_of("date").unwrap())
381+
.unwrap();
382+
assert_eq!(stats.null_count, Precision::Exact(0));
383+
// 2022-10-24
318384
assert_eq!(
319-
batch.column(1).as_ref(),
320-
Arc::new(Int32Array::from(vec![0])).as_ref(),
385+
stats.min_value,
386+
Precision::Exact(ScalarValue::Date32(Some(19289)))
321387
);
322-
323388
assert_eq!(
324-
statistics
325-
.column_statistics
326-
.clone()
327-
.unwrap()
328-
.iter()
329-
.map(|x| x.max_value.as_ref())
330-
.collect::<Vec<Option<&ScalarValue>>>(),
331-
vec![Some(&ScalarValue::from(4_i32))],
389+
stats.max_value,
390+
Precision::Exact(ScalarValue::Date32(Some(19289)))
332391
);
333392

393+
// `timestamp` statistics
394+
let stats = statistics
395+
.column_statistics
396+
.get(schema.index_of("timestamp").unwrap())
397+
.unwrap();
398+
assert_eq!(stats.null_count, Precision::Exact(0));
399+
// 2022-10-24T22:59:32.846Z
334400
assert_eq!(
335-
statistics
336-
.column_statistics
337-
.clone()
338-
.unwrap()
339-
.iter()
340-
.map(|x| x.min_value.as_ref())
341-
.collect::<Vec<Option<&ScalarValue>>>(),
342-
vec![Some(&ScalarValue::from(0_i32))],
401+
stats.min_value,
402+
Precision::Exact(ScalarValue::TimestampMicrosecond(
403+
Some(1666652372846000),
404+
None
405+
))
406+
);
407+
// 2022-10-24T22:59:46.083Z
408+
assert_eq!(
409+
stats.max_value,
410+
Precision::Exact(ScalarValue::TimestampMicrosecond(
411+
Some(1666652386083000),
412+
None
413+
))
343414
);
344415

416+
// `struct_element` statistics
417+
let stats = statistics
418+
.column_statistics
419+
.get(schema.index_of("nested_struct").unwrap())
420+
.unwrap();
421+
assert_eq!(stats.null_count, Precision::Absent);
422+
assert_eq!(stats.min_value, Precision::Absent);
423+
assert_eq!(stats.max_value, Precision::Absent);
424+
345425
Ok(())
346426
}
347427

@@ -782,14 +862,14 @@ mod local {
782862

783863
let expected_schema = ArrowSchema::new(vec![
784864
ArrowField::new("c3", ArrowDataType::Int32, true),
785-
ArrowField::new("c1", ArrowDataType::Int32, false),
865+
ArrowField::new("c1", ArrowDataType::Int32, true),
786866
ArrowField::new(
787867
"c2",
788868
ArrowDataType::Dictionary(
789869
Box::new(ArrowDataType::UInt16),
790870
Box::new(ArrowDataType::Utf8),
791871
),
792-
false,
872+
true,
793873
),
794874
]);
795875

crates/deltalake-sql/src/planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ mod tests {
9292
use arrow_schema::{DataType, Field, Schema};
9393
use datafusion_common::config::ConfigOptions;
9494
use datafusion_common::DataFusionError;
95+
use datafusion_common::Result as DataFusionResult;
9596
use datafusion_expr::logical_plan::builder::LogicalTableSource;
9697
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};
9798
use datafusion_sql::TableReference;
@@ -124,6 +125,10 @@ mod tests {
124125

125126
impl ContextProvider for TestSchemaProvider {
126127
fn get_table_provider(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
128+
self.get_table_source(name)
129+
}
130+
131+
fn get_table_source(&self, name: TableReference) -> DFResult<Arc<dyn TableSource>> {
127132
match self.tables.get(name.table()) {
128133
Some(table) => Ok(table.clone()),
129134
_ => Err(DataFusionError::Plan(format!(

python/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
3939
reqwest = { version = "*", features = ["native-tls-vendored"] }
4040

4141
[dependencies.pyo3]
42-
version = "0.19"
42+
version = "0.20"
4343
features = ["extension-module", "abi3", "abi3-py38"]
4444

4545
[dependencies.deltalake]

0 commit comments

Comments
 (0)