Skip to content

Commit 90633dc

Browse files
authored
deps: upgrade to DataFusion 53.0, Arrow to 58.1 (#3629)
1 parent 10513e5 commit 90633dc

22 files changed

Lines changed: 1137 additions & 600 deletions

File tree

native/Cargo.lock

Lines changed: 539 additions & 323 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ edition = "2021"
3434
rust-version = "1.88"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.11.1" }
40-
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "52.4.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42-
datafusion-datasource = { version = "52.4.0" }
43-
datafusion-physical-expr-adapter = { version = "52.4.0" }
44-
datafusion-spark = { version = "52.4.0" }
40+
parquet = { version = "58.1.0", default-features = false, features = ["experimental"] }
41+
datafusion = { version = "53.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42+
datafusion-datasource = { version = "53.0.0" }
43+
datafusion-physical-expr-adapter = { version = "53.0.0" }
44+
datafusion-spark = { version = "53.0.0", features = ["core"] }
4545
datafusion-comet-spark-expr = { path = "spark-expr" }
4646
datafusion-comet-common = { path = "common" }
4747
datafusion-comet-jni-bridge = { path = "jni-bridge" }
@@ -54,12 +54,12 @@ num = "0.4"
5454
rand = "0.10"
5555
regex = "1.12.3"
5656
thiserror = "2"
57-
object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] }
57+
object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] }
5858
url = "2.2"
5959
aws-config = "1.8.14"
6060
aws-credential-types = "1.2.13"
61-
iceberg = { git = "https://github.com/apache/iceberg-rust", tag = "v0.9.0-rc.1" }
62-
iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", tag = "v0.9.0-rc.1", features = ["opendal-all"] }
61+
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "477a1e5" }
62+
iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "477a1e5", features = ["opendal-all"] }
6363

6464
[profile.release]
6565
debug = true

native/core/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ aws-credential-types = { workspace = true }
7070
parking_lot = "0.12.5"
7171
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
7272
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] }
73-
object_store_opendal = {version = "0.55.0", optional = true}
73+
object_store_opendal = { git = "https://github.com/apache/opendal", rev = "173feb6", package = "object_store_opendal", optional = true}
7474
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
75-
opendal = { version ="0.55.0", optional = true, features = ["services-hdfs"] }
75+
opendal = { git = "https://github.com/apache/opendal", rev = "173feb6", optional = true, features = ["services-hdfs"] }
7676
iceberg = { workspace = true }
7777
iceberg-storage-opendal = { workspace = true }
7878
serde_json = "1.0"
@@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] }
9191
lazy_static = "1.4"
9292
assertables = "9"
9393
hex = "0.4.3"
94-
datafusion-functions-nested = { version = "52.4.0" }
94+
datafusion-functions-nested = { version = "53.0.0" }
9595

9696
[features]
9797
backtrace = ["datafusion/backtrace"]

native/core/src/execution/jni_api.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,11 @@ fn prepare_datafusion_session_context(
393393

394394
// register UDFs from datafusion-spark crate
395395
fn register_datafusion_spark_function(session_ctx: &SessionContext) {
396+
// Don't register SparkArrayRepeat — it returns NULL when the element is NULL
397+
// (e.g. array_repeat(null, 3) returns NULL instead of [null, null, null]).
398+
// Comet's Scala serde wraps the call in a CaseWhen for null count handling,
399+
// so DataFusion's built-in ArrayRepeat is sufficient.
400+
// TODO: file upstream issue against datafusion-spark
396401
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
397402
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
398403
session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default()));

native/core/src/execution/memory_pools/fair_pool.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,21 @@ impl MemoryPool for CometFairMemoryPool {
103103
.expect("unexpected amount of unregister happened");
104104
}
105105

106-
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
107-
self.try_grow(reservation, additional).unwrap();
106+
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
107+
self.try_grow(_reservation, additional).unwrap();
108108
}
109109

110-
fn shrink(&self, reservation: &MemoryReservation, subtractive: usize) {
110+
fn shrink(&self, _reservation: &MemoryReservation, subtractive: usize) {
111111
if subtractive > 0 {
112112
let mut state = self.state.lock();
113-
let size = reservation.size();
114-
if size < subtractive {
115-
panic!("Failed to release {subtractive} bytes where only {size} bytes reserved")
113+
// We don't use reservation.size() here because DataFusion 53+ decrements
114+
// the reservation's atomic size before calling pool.shrink(), so it would
115+
// reflect the post-shrink value rather than the pre-shrink value.
116+
if state.used < subtractive {
117+
panic!(
118+
"Failed to release {subtractive} bytes where only {} bytes tracked by pool",
119+
state.used
120+
)
116121
}
117122
self.release(subtractive)
118123
.unwrap_or_else(|_| panic!("Failed to release {subtractive} bytes"));
@@ -122,7 +127,7 @@ impl MemoryPool for CometFairMemoryPool {
122127

123128
fn try_grow(
124129
&self,
125-
reservation: &MemoryReservation,
130+
_reservation: &MemoryReservation,
126131
additional: usize,
127132
) -> Result<(), DataFusionError> {
128133
if additional > 0 {
@@ -132,10 +137,13 @@ impl MemoryPool for CometFairMemoryPool {
132137
.pool_size
133138
.checked_div(num)
134139
.expect("overflow in checked_div");
135-
let size = reservation.size();
136-
if limit < size + additional {
140+
// We use state.used instead of reservation.size() because DataFusion 53+
141+
// calls pool.try_grow() before incrementing the reservation's atomic size,
142+
// so reservation.size() would not include prior grows.
143+
let used = state.used;
144+
if limit < used + additional {
137145
return resources_err!(
138-
"Failed to acquire {additional} bytes where {size} bytes already reserved and the fair limit is {limit} bytes, {num} registered"
146+
"Failed to acquire {additional} bytes where {used} bytes already reserved and the fair limit is {limit} bytes, {num} registered"
139147
);
140148
}
141149

native/core/src/execution/operators/expand.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub struct ExpandExec {
4242
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
4343
child: Arc<dyn ExecutionPlan>,
4444
schema: SchemaRef,
45-
cache: PlanProperties,
45+
cache: Arc<PlanProperties>,
4646
}
4747

4848
impl ExpandExec {
@@ -52,12 +52,12 @@ impl ExpandExec {
5252
child: Arc<dyn ExecutionPlan>,
5353
schema: SchemaRef,
5454
) -> Self {
55-
let cache = PlanProperties::new(
55+
let cache = Arc::new(PlanProperties::new(
5656
EquivalenceProperties::new(Arc::clone(&schema)),
5757
Partitioning::UnknownPartitioning(1),
5858
EmissionType::Final,
5959
Boundedness::Bounded,
60-
);
60+
));
6161

6262
Self {
6363
projections,
@@ -129,7 +129,7 @@ impl ExecutionPlan for ExpandExec {
129129
Ok(Box::pin(expand_stream))
130130
}
131131

132-
fn properties(&self) -> &PlanProperties {
132+
fn properties(&self) -> &Arc<PlanProperties> {
133133
&self.cache
134134
}
135135

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct IcebergScanExec {
5858
/// Output schema after projection
5959
output_schema: SchemaRef,
6060
/// Cached execution plan properties
61-
plan_properties: PlanProperties,
61+
plan_properties: Arc<PlanProperties>,
6262
/// Catalog-specific configuration for FileIO
6363
catalog_properties: HashMap<String, String>,
6464
/// Pre-planned file scan tasks
@@ -93,13 +93,13 @@ impl IcebergScanExec {
9393
})
9494
}
9595

96-
fn compute_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties {
97-
PlanProperties::new(
96+
fn compute_properties(schema: SchemaRef, num_partitions: usize) -> Arc<PlanProperties> {
97+
Arc::new(PlanProperties::new(
9898
EquivalenceProperties::new(schema),
9999
Partitioning::UnknownPartitioning(num_partitions),
100100
EmissionType::Incremental,
101101
Boundedness::Bounded,
102-
)
102+
))
103103
}
104104
}
105105

@@ -116,7 +116,7 @@ impl ExecutionPlan for IcebergScanExec {
116116
Arc::clone(&self.output_schema)
117117
}
118118

119-
fn properties(&self) -> &PlanProperties {
119+
fn properties(&self) -> &Arc<PlanProperties> {
120120
&self.plan_properties
121121
}
122122

@@ -288,7 +288,7 @@ where
288288
_ => {
289289
let adapter = self
290290
.adapter_factory
291-
.create(Arc::clone(&self.schema), Arc::clone(&file_schema));
291+
.create(Arc::clone(&self.schema), Arc::clone(&file_schema))?;
292292
let exprs =
293293
build_projection_expressions(&self.schema, &adapter).map_err(|e| {
294294
DataFusionError::Execution(format!(

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,18 @@ use std::{
2323
fmt,
2424
fmt::{Debug, Formatter},
2525
fs::File,
26-
io::Cursor,
2726
sync::Arc,
2827
};
2928

29+
#[cfg(feature = "hdfs-opendal")]
3030
use opendal::Operator;
31+
#[cfg(feature = "hdfs-opendal")]
32+
use std::io::Cursor;
3133

3234
use crate::execution::shuffle::CompressionCodec;
33-
use crate::parquet::parquet_support::{
34-
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
35-
};
35+
use crate::parquet::parquet_support::is_hdfs_scheme;
36+
#[cfg(feature = "hdfs-opendal")]
37+
use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs};
3638
use arrow::datatypes::{Schema, SchemaRef};
3739
use arrow::record_batch::RecordBatch;
3840
use async_trait::async_trait;
@@ -45,7 +47,7 @@ use datafusion::{
4547
metrics::{ExecutionPlanMetricsSet, MetricsSet},
4648
stream::RecordBatchStreamAdapter,
4749
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
48-
SendableRecordBatchStream, Statistics,
50+
SendableRecordBatchStream,
4951
},
5052
};
5153
use futures::TryStreamExt;
@@ -64,6 +66,7 @@ enum ParquetWriter {
6466
/// Contains the arrow writer, HDFS operator, and destination path
6567
/// an Arrow writer writes to in-memory buffer the data converted to Parquet format
6668
/// The opendal::Writer is created lazily on first write
69+
#[cfg(feature = "hdfs-opendal")]
6770
Remote(
6871
ArrowWriter<Cursor<Vec<u8>>>,
6972
Option<opendal::Writer>,
@@ -80,6 +83,7 @@ impl ParquetWriter {
8083
) -> std::result::Result<(), parquet::errors::ParquetError> {
8184
match self {
8285
ParquetWriter::LocalFile(writer) => writer.write(batch),
86+
#[cfg(feature = "hdfs-opendal")]
8387
ParquetWriter::Remote(
8488
arrow_parquet_buffer_writer,
8589
hdfs_writer_opt,
@@ -134,6 +138,7 @@ impl ParquetWriter {
134138
writer.close()?;
135139
Ok(())
136140
}
141+
#[cfg(feature = "hdfs-opendal")]
137142
ParquetWriter::Remote(
138143
arrow_parquet_buffer_writer,
139144
mut hdfs_writer_opt,
@@ -208,7 +213,7 @@ pub struct ParquetWriterExec {
208213
/// Metrics
209214
metrics: ExecutionPlanMetricsSet,
210215
/// Cache for plan properties
211-
cache: PlanProperties,
216+
cache: Arc<PlanProperties>,
212217
}
213218

214219
impl ParquetWriterExec {
@@ -228,12 +233,12 @@ impl ParquetWriterExec {
228233
// Preserve the input's partitioning so each partition writes its own file
229234
let input_partitioning = input.output_partitioning().clone();
230235

231-
let cache = PlanProperties::new(
236+
let cache = Arc::new(PlanProperties::new(
232237
EquivalenceProperties::new(Arc::clone(&input.schema())),
233238
input_partitioning,
234239
EmissionType::Final,
235240
Boundedness::Bounded,
236-
);
241+
));
237242

238243
Ok(ParquetWriterExec {
239244
input,
@@ -275,7 +280,7 @@ impl ParquetWriterExec {
275280
output_file_path: &str,
276281
schema: SchemaRef,
277282
props: WriterProperties,
278-
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
283+
_runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
279284
object_store_options: &HashMap<String, String>,
280285
) -> Result<ParquetWriter> {
281286
// Parse URL and match on storage scheme directly
@@ -284,11 +289,11 @@ impl ParquetWriterExec {
284289
})?;
285290

286291
if is_hdfs_scheme(&url, object_store_options) {
287-
// HDFS storage
292+
#[cfg(feature = "hdfs-opendal")]
288293
{
289294
// Use prepare_object_store_with_configs to create and register the object store
290295
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
291-
runtime_env,
296+
_runtime_env,
292297
output_file_path.to_string(),
293298
object_store_options,
294299
)
@@ -324,6 +329,12 @@ impl ParquetWriterExec {
324329
object_store_path.to_string(),
325330
))
326331
}
332+
#[cfg(not(feature = "hdfs-opendal"))]
333+
{
334+
Err(DataFusionError::Execution(
335+
"HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(),
336+
))
337+
}
327338
} else if output_file_path.starts_with("file://")
328339
|| output_file_path.starts_with("file:")
329340
|| !output_file_path.contains("://")
@@ -405,11 +416,7 @@ impl ExecutionPlan for ParquetWriterExec {
405416
Some(self.metrics.clone_inner())
406417
}
407418

408-
fn statistics(&self) -> Result<Statistics> {
409-
self.input.partition_statistics(None)
410-
}
411-
412-
fn properties(&self) -> &PlanProperties {
419+
fn properties(&self) -> &Arc<PlanProperties> {
413420
&self.cache
414421
}
415422

@@ -576,6 +583,7 @@ mod tests {
576583

577584
/// Helper function to create a test RecordBatch with 1000 rows of (int, string) data
578585
/// Example batch_id 1 -> 0..1000, 2 -> 1001..2000
586+
#[allow(dead_code)]
579587
fn create_test_record_batch(batch_id: i32) -> Result<RecordBatch> {
580588
assert!(batch_id > 0, "batch_id must be greater than 0");
581589
let num_rows = batch_id * 1000;

native/core/src/execution/operators/scan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct ScanExec {
7070
/// It is also used in unit test to mock the input data from JVM.
7171
pub batch: Arc<Mutex<Option<InputBatch>>>,
7272
/// Cache of expensive-to-compute plan properties
73-
cache: PlanProperties,
73+
cache: Arc<PlanProperties>,
7474
/// Metrics collector
7575
metrics: ExecutionPlanMetricsSet,
7676
/// Baseline metrics
@@ -93,14 +93,14 @@ impl ScanExec {
9393
// Build schema directly from data types since get_next now always unpacks dictionaries
9494
let schema = schema_from_data_types(&data_types);
9595

96-
let cache = PlanProperties::new(
96+
let cache = Arc::new(PlanProperties::new(
9797
EquivalenceProperties::new(Arc::clone(&schema)),
9898
// The partitioning is not important because we are not using DataFusion's
9999
// query planner or optimizer
100100
Partitioning::UnknownPartitioning(1),
101101
EmissionType::Final,
102102
Boundedness::Bounded,
103-
);
103+
));
104104

105105
Ok(Self {
106106
exec_context_id,
@@ -415,7 +415,7 @@ impl ExecutionPlan for ScanExec {
415415
)))
416416
}
417417

418-
fn properties(&self) -> &PlanProperties {
418+
fn properties(&self) -> &Arc<PlanProperties> {
419419
&self.cache
420420
}
421421

0 commit comments

Comments
 (0)