diff --git a/Cargo.toml b/Cargo.toml index edcaa4ff5..2a97f824d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,11 +38,12 @@ arrow-select = { version = "57" } object_store = { version = "0.12.1" } parquet = { version = "57" } -# datafusion -datafusion = "51.0" -datafusion-datasource = "51.0" -datafusion-ffi = "51.0" -datafusion-proto = "51.0" +# datafusion 52.1 +datafusion = { version = "52.1.0" } +datafusion-datasource = { version = "52.1.0" } +datafusion-ffi = { version = "52.1.0" } +datafusion-proto = { version = "52.1.0" } +datafusion-physical-expr-adapter = { version = "52.1.0" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index dc8359af1..eb53e45ef 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -38,6 +38,7 @@ object_store = { workspace = true } datafusion = { workspace = true, optional = true } datafusion-datasource = { workspace = true, optional = true } datafusion-proto = { workspace = true, optional = true } +datafusion-physical-expr-adapter = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -105,7 +106,12 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] default = ["rustls"] -datafusion = ["dep:datafusion", "datafusion-datasource", "datafusion-proto"] +datafusion = [ + "dep:datafusion", + "datafusion-datasource", + "datafusion-proto", + "datafusion-physical-expr-adapter", +] datafusion-ext = ["datafusion"] json = ["parquet/json"] python = ["arrow/pyarrow"] diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index b86112643..239aac913 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -92,7 +92,6 @@ mod find_files; pub mod logical; pub mod physical; pub mod planner; -mod schema_adapter; mod session; mod table_provider; pub(crate) mod utils; diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs deleted file mode 100644 index fb39418b1..000000000 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; - -use crate::kernel::schema::cast_record_batch; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; -use datafusion::common::{ColumnStatistics, Result, not_impl_err}; -use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; - -/// A Schema Adapter Factory which provides casting record batches from parquet to meet -/// delta lake conventions. -#[derive(Debug)] -pub(crate) struct DeltaSchemaAdapterFactory {} - -impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - table_schema: SchemaRef, - ) -> Box { - Box::new(DeltaSchemaAdapter { - projected_table_schema, - table_schema, - }) - } -} - -pub(crate) struct DeltaSchemaAdapter { - /// The schema for the table, projected to include only the fields being output (projected) by - /// the mapping. - projected_table_schema: SchemaRef, - /// Schema for the table - table_schema: SchemaRef, -} - -impl SchemaAdapter for DeltaSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - fn map_schema(&self, file_schema: &Schema) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self - .projected_table_schema - .fields() - .find(file_field.name()) - .is_some() - { - projection.push(file_idx); - } - } - - Ok(( - Arc::new(SchemaMapping { - projected_schema: self.projected_table_schema.clone(), - }), - projection, - )) - } -} - -#[derive(Debug)] -pub(crate) struct SchemaMapping { - projected_schema: SchemaRef, -} - -impl SchemaMapper for SchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> Result { - let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; - Ok(record_batch) - } - - fn map_column_statistics( - &self, - _file_col_statistics: &[ColumnStatistics], - ) -> Result> { - not_impl_err!("Mapping column statistics is not implemented for DeltaSchemaAdapter") - } -} diff --git a/crates/core/src/delta_datafusion/session.rs b/crates/core/src/delta_datafusion/session.rs index 8ad1e185f..8c83f59c6 100644 --- a/crates/core/src/delta_datafusion/session.rs +++ b/crates/core/src/delta_datafusion/session.rs @@ -48,7 +48,16 @@ impl Default for DeltaSessionConfig { fn default() -> Self { DeltaSessionConfig { inner: SessionConfig::default() - .set_bool("datafusion.sql_parser.enable_ident_normalization", false), + .set_bool("datafusion.sql_parser.enable_ident_normalization", false) + .set_bool("datafusion.execution.parquet.schema_force_view_types", true) + // Workaround: hash-join dynamic filtering (IN-list pushdown) can panic when join + // keys include dictionary arrays (still reproducible with DF 52.1.x crates). + // Disable IN-list pushdown and fall back to hash lookups. + .set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 0) + .set_usize( + "datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values", + 0, + ), } } } diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 5115fb149..17882b8b9 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -12,14 +12,15 @@ use datafusion::catalog::TableProvider; use datafusion::catalog::memory::DataSourceExec; use datafusion::common::pruning::PruningStatistics; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion::common::{Column, DFSchemaRef, Result, Statistics, ToDFSchema}; +use datafusion::common::{Column, ColumnStatistics, DFSchemaRef, Result, Statistics, ToDFSchema}; use datafusion::config::{ConfigOptions, TableParquetOptions}; use datafusion::datasource::TableType; use datafusion::datasource::physical_plan::{ - FileGroup, FileSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict, + FileGroup, wrap_partition_type_in_dict, wrap_partition_value_in_dict, }; use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::sink::DataSinkExec; +use datafusion::datasource::table_schema::TableSchema; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::logical_expr::dml::InsertOp; @@ -50,7 +51,7 @@ use url::Url; use uuid::Uuid; use crate::delta_datafusion::engine::AsObjectStoreUrl; -use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; + use crate::delta_datafusion::table_provider::next::SnapshotWrapper; use crate::delta_datafusion::{ DataFusionMixins as _, FindFilesExprProperties, LogDataHandler, get_null_of_arrow_type, @@ -151,7 +152,7 @@ impl DeltaScanConfigBuilder { Some(name) => { if column_names.contains(name) { return Err(DeltaTableError::Generic(format!( - "Unable to add file path column since column with name {name} exits" + "Unable to add file path column since column with name {name} exists" ))); } @@ -179,7 +180,7 @@ impl DeltaScanConfigBuilder { wrap_partition_values: self.wrap_partition_values.unwrap_or(true), enable_parquet_pushdown: self.enable_parquet_pushdown, schema: self.schema.clone(), - schema_force_view_types: false, + schema_force_view_types: true, }) } } @@ -538,12 +539,82 @@ impl<'a> DeltaScanBuilder<'a> { let stats = stats.unwrap_or(Statistics::new_unknown(&schema)); + // DF52's TableSchema outputs columns as: file_schema + partition_columns + // Source stats are indexed by TableConfiguration.schema() field order, which may differ + // from the scan schema order. We need name-based remapping, not index-based. + let partition_col_names = self.snapshot.metadata().partition_columns(); + + // Build name -> ColumnStatistics map from source stats (keyed by TableConfiguration schema order) + let source_schema = self.snapshot.schema(); + let stats_by_name: HashMap = source_schema + .fields() + .enumerate() + .filter_map(|(idx, field)| { + stats + .column_statistics + .get(idx) + .map(|s| (field.name().to_string(), s.clone())) + }) + .collect(); + + // Build stats in DF52 order: file_schema columns first, then partition_columns + // file_schema columns are in file_schema field order (non-partition from logical_schema) + let file_col_stats: Vec = file_schema + .fields() + .iter() + .map(|f| { + stats_by_name + .get(f.name()) + .cloned() + .unwrap_or_else(ColumnStatistics::new_unknown) + }) + .collect(); + + // Partition columns must be in metadata.partition_columns() order (not schema encounter order) + let partition_col_stats: Vec = partition_col_names + .iter() + .map(|name| { + stats_by_name + .get(name) + .cloned() + .unwrap_or_else(ColumnStatistics::new_unknown) + }) + .collect(); + + // Combine: file columns first, then partition columns + let mut reordered_stats = file_col_stats; + reordered_stats.extend(partition_col_stats); + + let stats = Statistics { + num_rows: stats.num_rows, + total_byte_size: stats.total_byte_size, + column_statistics: reordered_stats, + }; + + // Add unknown stats for file_column if present (it's added as partition field but not in original schema) + let stats = if config.file_column_name.is_some() { + let mut col_stats = stats.column_statistics; + col_stats.push(ColumnStatistics::new_unknown()); + Statistics { + num_rows: stats.num_rows, + total_byte_size: stats.total_byte_size, + column_statistics: col_stats, + } + } else { + stats + }; + let parquet_options = TableParquetOptions { global: self.session.config().options().execution.parquet.clone(), ..Default::default() }; - let mut file_source = ParquetSource::new(parquet_options); + let partition_fields: Vec> = + table_partition_cols.into_iter().map(Arc::new).collect(); + let table_schema = TableSchema::new(file_schema, partition_fields); + + let mut file_source = + ParquetSource::new(table_schema).with_table_parquet_options(parquet_options); // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the @@ -553,11 +624,9 @@ impl<'a> DeltaScanBuilder<'a> { { file_source = file_source.with_predicate(predicate); }; - let file_source = - file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?; let file_scan_config = - FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source) + FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(file_source)) .with_file_groups( // If all files were filtered out, we still need to emit at least one partition to // pass datafusion sanity checks. @@ -570,9 +639,8 @@ impl<'a> DeltaScanBuilder<'a> { }, ) .with_statistics(stats) - .with_projection_indices(self.projection.cloned()) + .with_projection_indices(self.projection.cloned())? .with_limit(self.limit) - .with_table_partition_cols(table_partition_cols) .build(); let metrics = ExecutionPlanMetricsSet::new(); diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs index 275dc053a..60aae057e 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs @@ -9,9 +9,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{ArrayAccessor, RecordBatch, StringArray}; +use arrow::array::{RecordBatch, StringArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{SchemaRef, UInt16Type}; +use arrow_array::StringViewArray; use arrow_array::{Array, BooleanArray}; use dashmap::DashMap; use datafusion::common::config::ConfigOptions; @@ -151,9 +152,8 @@ impl DeltaScanExec { } pub(crate) fn delta_plan(&self) -> &KernelScanPlan { - &self.scan_plan + self.scan_plan.as_ref() } - /// Transform the statistics from the inner physical parquet read plan to the logical /// schema we expose via the table provider. We do not attempt to provide meaningful /// statistics for metadata columns as we do not expect these to be useful in planning. @@ -519,25 +519,55 @@ fn split_by_file_id_runs( .downcast_ref::>() .ok_or_else(|| { internal_datafusion_err!( - "Expected file id column '{}' to be Dictionary, got {:?}", + "Expected file id column '{}' to be Dictionary, got {:?}", batch.schema_ref().field(file_id_idx).name(), batch.column(file_id_idx).data_type() ) })?; - let typed = dict.downcast_dict::().ok_or_else(|| { - internal_datafusion_err!( - "Expected file id column '{}' to be Dictionary, got {:?}", + // Parquet reads may yield Utf8 or Utf8View depending on DataFusion settings. + // Accept either for the synthetic file id column. + let keys = dict.keys(); + + enum FileIdValues<'a> { + Utf8(&'a StringArray), + Utf8View(&'a StringViewArray), + } + + let values = if let Some(values) = dict + .values() + .as_ref() + .as_any() + .downcast_ref::() + { + FileIdValues::Utf8(values) + } else if let Some(values) = dict + .values() + .as_ref() + .as_any() + .downcast_ref::() + { + FileIdValues::Utf8View(values) + } else { + return Err(internal_datafusion_err!( + "Expected file id column '{}' to be Dictionary, got {:?}", batch.schema_ref().field(file_id_idx).name(), batch.column(file_id_idx).data_type() - ) - })?; + )); + }; + + let file_id_for_row = |row: usize| -> String { + let key = keys.value(row) as usize; + match values { + FileIdValues::Utf8(arr) => arr.value(key).to_string(), + FileIdValues::Utf8View(arr) => arr.value(key).to_string(), + } + }; if dict.is_null(0) { return Err(internal_datafusion_err!("file id value must not be null")); } - let keys = typed.keys(); let mut prev_key = keys.value(0); let mut start = 0usize; let mut runs = Vec::new(); @@ -548,14 +578,14 @@ fn split_by_file_id_runs( } let key = keys.value(i); if key != prev_key { - let file_id = typed.value(start).to_string(); + let file_id = file_id_for_row(start); runs.push((file_id, batch.slice(start, i - start))); start = i; prev_key = key; } } - let file_id = typed.value(start).to_string(); + let file_id = file_id_for_row(start); runs.push((file_id, batch.slice(start, batch.num_rows() - start))); Ok(runs) @@ -568,6 +598,7 @@ mod tests { use arrow::array::AsArray; use arrow::datatypes::DataType; use arrow_array::Array; + use arrow_array::ArrayAccessor; use datafusion::{ common::stats::Precision, physical_plan::{collect, collect_partitioned}, @@ -633,10 +664,15 @@ mod tests { // Verify file_id column has the correct type let schema = data[0].schema(); let file_id_field = schema.column_with_name("file_id").unwrap().1; - assert_eq!( - file_id_field.data_type(), - &DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()) - ); + match file_id_field.data_type() { + DataType::Dictionary(_, value_type) + if value_type.as_ref() == &DataType::Utf8 + || value_type.as_ref() == &DataType::Utf8View => + { + // ok + } + other => panic!("unexpected file_id dtype: {other:?}"), + } Ok(()) } @@ -896,7 +932,7 @@ mod tests { let scan = provider.scan(&session.state(), None, &[], None).await?; let statistics = scan.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Exact(5)); - assert_eq!(statistics.total_byte_size, Precision::Exact(3240)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(3240)); for col_stat in statistics.column_statistics.iter() { assert_eq!(col_stat.null_count, Precision::Absent); assert_eq!(col_stat.min_value, Precision::Absent); @@ -1279,7 +1315,7 @@ mod tests { let file_id_idx = file_id_column_idx(&batch, FILE_ID_COLUMN_DEFAULT)?; let err = split_by_file_id_runs(&batch, file_id_idx).unwrap_err(); let message = err.to_string(); - assert!(message.contains("Dictionary")); + assert!(message.contains("Dictionary + { + if row_count == 0 { + Arc::new(StringViewArray::from_iter_values(std::iter::empty::<&str>())) + } else { + Arc::new(StringViewArray::from_iter_values([file_id.as_str()])) + } + } + _ => { + if row_count == 0 { + Arc::new(StringArray::from(Vec::>::new())) + } else { + Arc::new(StringArray::from(vec![Some(file_id.as_str())])) + } + } + }; + let file_id_array: DictionaryArray = - repeat_n(file_id.as_str(), result.num_rows()).collect(); + DictionaryArray::try_new(keys, values)?; super::finalize_transformed_batch( result, &self.scan_plan, diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs index e0fb9de55..461f580cc 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs @@ -24,12 +24,14 @@ use chrono::{TimeZone as _, Utc}; use dashmap::DashMap; use datafusion::{ catalog::Session, + common::tree_node::{Transformed, TreeNode}, common::{ColumnStatistics, HashMap, Result, Statistics, plan_err, stats::Precision}, config::TableParquetOptions, datasource::physical_plan::{ParquetSource, parquet::CachedParquetFileReaderFactory}, error::DataFusionError, execution::object_store::ObjectStoreUrl, physical_expr::planner::logical2physical, + physical_expr::{PhysicalExpr, expressions::Literal as PhysicalLiteral}, physical_plan::{ ExecutionPlan, empty::EmptyExec, @@ -40,11 +42,15 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_datasource::{ - PartitionedFile, compute_all_files_statistics, + PartitionedFile, TableSchema, compute_all_files_statistics, file_groups::FileGroup, file_scan_config::{FileScanConfigBuilder, wrap_partition_value_in_dict}, source::DataSourceExec, }; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, + schema_rewriter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}, +}; use delta_kernel::{ Engine, Expression, expressions::StructData, scan::ScanMetadata, table_features::TableFeature, }; @@ -182,7 +188,7 @@ async fn get_data_scan_plan( // this is used to create a DataSourceExec plan for each store // To correlate the data with the original file, we add the file url as a partition value // This is required to apply the correct transform to the data in downstream processing. - let to_partitioned_file = |mut f: ScanFileContext| { + let to_partitioned_file = |f: ScanFileContext| { if let Some(part_stata) = &f.partitions { update_partition_stats(part_stata, &f.stats, &mut partition_stats)?; } @@ -199,15 +205,10 @@ async fn get_data_scan_plan( .into(); let file_value = wrap_partition_value_in_dict(ScalarValue::Utf8(Some(f.file_url.to_string()))); - f.stats.column_statistics.push(ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Exact(file_value.clone()), - min_value: Precision::Exact(file_value.clone()), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - }); + // NOTE: `PartitionedFile::with_statistics` appends exact stats for partition columns based + // on `partition_values`, so partition values must be set first. + partitioned_file.partition_values = vec![file_value.clone()]; partitioned_file = partitioned_file.with_statistics(Arc::new(f.stats)); - partitioned_file.partition_values = vec![file_value]; Ok::<_, DataFusionError>(( f.file_url.as_object_store_url(), (partitioned_file, None::>), @@ -283,6 +284,7 @@ fn update_partition_stats( max_value: value, distinct_count: Precision::Absent, sum_value: Precision::Absent, + byte_size: Precision::Absent, }, ); } @@ -295,7 +297,11 @@ type FilesByStore = (ObjectStoreUrl, Vec<(PartitionedFile, Option>)>); async fn get_read_plan( state: &dyn Session, files_by_store: impl IntoIterator, - physical_schema: &SchemaRef, + // Schema used for Parquet reads / predicate evaluation. + // Note: this is not necessarily the final output schema. + // View types (e.g. Utf8View/BinaryView) and other output typing adjustments are applied + // later, at the DeltaScan boundary, not during Parquet read planning. + parquet_read_schema: &SchemaRef, limit: Option, file_id_field: &FieldRef, predicate: Option<&Expr>, @@ -307,7 +313,7 @@ async fn get_read_plan( ..Default::default() }; - let mut full_read_schema = SchemaBuilder::from(physical_schema.as_ref().clone()); + let mut full_read_schema = SchemaBuilder::from(parquet_read_schema.as_ref().clone()); full_read_schema.push(file_id_field.as_ref().clone().with_nullable(true)); let full_read_schema = Arc::new(full_read_schema.finish()); @@ -316,14 +322,24 @@ async fn get_read_plan( state.runtime_env().object_store(&store_url)?, state.runtime_env().cache_manager.get_file_metadata_cache(), )); - let mut file_source = - ParquetSource::new(pq_options.clone()).with_parquet_file_reader_factory(reader_factory); + + // NOTE: In the "next" provider, DataFusion's Parquet scan partition fields are file-id + // only. Delta partition columns/values are injected via kernel transforms and handled + // above Parquet, so they are not part of the Parquet partition schema here. + let table_schema = + TableSchema::new(parquet_read_schema.clone(), vec![file_id_field.clone()]); + let full_table_schema = table_schema.table_schema().clone(); + let mut file_source = ParquetSource::new(table_schema) + .with_table_parquet_options(pq_options.clone()) + .with_parquet_file_reader_factory(reader_factory); // TODO(roeap); we might be able to also push selection vectors into the read plan // by creating parquet access plans. However we need to make sure this does not // interfere with other delta features like row ids. let has_selection_vectors = files.iter().any(|(_, sv)| sv.is_some()); if !has_selection_vectors && let Some(pred) = predicate { + // Predicate pushdown can reference the synthetic file-id partition column. + // Use the full read schema (data columns + file-id) when planning. let physical = logical2physical(pred, full_read_schema.as_ref()); file_source = file_source .with_predicate(physical) @@ -332,15 +348,14 @@ async fn get_read_plan( let file_group: FileGroup = files.into_iter().map(|file| file.0).collect(); let (file_groups, statistics) = - compute_all_files_statistics(vec![file_group], full_read_schema.clone(), true, false)?; + compute_all_files_statistics(vec![file_group], full_table_schema, true, false)?; - let config = - FileScanConfigBuilder::new(store_url, physical_schema.clone(), Arc::new(file_source)) - .with_file_groups(file_groups) - .with_statistics(statistics) - .with_table_partition_cols(vec![file_id_field.as_ref().clone()]) - .with_limit(limit) - .build(); + let config = FileScanConfigBuilder::new(store_url, Arc::new(file_source)) + .with_file_groups(file_groups) + .with_statistics(statistics) + .with_limit(limit) + .with_expr_adapter(Some(Arc::new(DeltaPhysicalExprAdapterFactory))) + .build(); plans.push(DataSourceExec::from_data_source(config) as Arc); } @@ -364,9 +379,18 @@ fn finalize_transformed_batch( batch }; // NOTE: most data is read properly typed already, however columns added via - // literals in the transdormations may need to be cast to the physical expected type. + // literals in the transformations may need to be cast to the physical expected type. let result = cast_record_batch(result, &scan_plan.result_schema)?; if let Some((arr, field)) = file_id_col { + let arr = if arr.data_type() != field.data_type() { + let options = CastOptions { + safe: true, + ..Default::default() + }; + cast_with_options(arr.as_ref(), field.data_type(), &options)? + } else { + arr + }; let mut columns = result.columns().to_vec(); columns.push(arr); let mut fields = result.schema().fields().to_vec(); @@ -403,9 +427,104 @@ fn cast_record_batch(batch: RecordBatch, target_schema: &SchemaRef) -> Result, + physical_has_view_types: bool, +} + +impl PhysicalExprAdapterFactory for DeltaPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + let physical_has_view_types = schema_has_view_types(physical_file_schema.as_ref()); + let inner = + DefaultPhysicalExprAdapterFactory.create(logical_file_schema, physical_file_schema); + Arc::new(DeltaPhysicalExprAdapter { + inner, + physical_has_view_types, + }) + } +} + +impl PhysicalExprAdapter for DeltaPhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + let expr = self.inner.rewrite(expr)?; + if self.physical_has_view_types { + Ok(expr) + } else { + // DataFusion's DefaultPhysicalExprAdapter inserts casts on columns, but does not + // currently rewrite literals to match the physical schema. Parquet predicate + // evaluation/pushdown happens against the physical file schema prior to those casts. + // + // See upstream TODO: + // datafusion-physical-expr-adapter/src/schema_rewriter.rs + // "TODO: ... move the cast from the column to literal expressions ..." + convert_physical_view_literals_to_base(expr) + } + } +} + +fn convert_physical_view_literals_to_base( + expr: Arc, +) -> Result> { + expr.transform_up(|e| { + if let Some(lit) = e.as_any().downcast_ref::() + && let Some(converted) = convert_scalar_view_to_base(lit.value()) + { + return Ok(Transformed::yes( + Arc::new(PhysicalLiteral::new(converted)) as _ + )); + } + Ok(Transformed::no(e)) + }) + .map(|t| t.data) +} + +fn convert_scalar_view_to_base(scalar: &ScalarValue) -> Option { + match scalar { + ScalarValue::Utf8View(v) => Some(ScalarValue::Utf8(v.clone())), + ScalarValue::BinaryView(v) => Some(ScalarValue::Binary(v.clone())), + ScalarValue::Dictionary(key_type, inner) => { + convert_scalar_view_to_base(inner).map(|converted_inner| { + ScalarValue::Dictionary(key_type.clone(), Box::new(converted_inner)) + }) + } + _ => None, + } +} + +fn schema_has_view_types(schema: &Schema) -> bool { + schema + .fields() + .iter() + .any(|f| data_type_has_view_types(f.data_type())) +} + +fn data_type_has_view_types(dt: &DataType) -> bool { + match dt { + DataType::Utf8View | DataType::BinaryView => true, + DataType::Dictionary(_, value) => data_type_has_view_types(value.as_ref()), + DataType::Map(entry, _) => data_type_has_view_types(entry.data_type()), + DataType::Struct(fields) => fields + .iter() + .any(|f| data_type_has_view_types(f.data_type())), + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::ListView(inner) + | DataType::FixedSizeList(inner, _) => data_type_has_view_types(inner.data_type()), + _ => false, + } +} + #[cfg(test)] mod tests { - use arrow_array::{Int32Array, RecordBatch, StringArray, StructArray}; + use arrow_array::{BinaryArray, Int32Array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, Fields, Schema}; use datafusion::{ physical_plan::collect, @@ -452,9 +571,10 @@ mod tests { let path = Path::from("test_data.parquet"); store.put(&path, buffer.into()).await?; let mut file: PartitionedFile = store.head(&path).await?.into(); - file.partition_values.push(ScalarValue::Utf8(Some( - "memory:///test_data.parquet".to_string(), - ))); + file.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "memory:///test_data.parquet".to_string(), + )))); let files_by_store = vec![( store_url.as_object_store_url(), @@ -533,39 +653,6 @@ mod tests { ]; assert_batches_sorted_eq!(&expected, &batches); - // extended schema with missing column and different data types - let arrow_schema_extended = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("value", DataType::Utf8View, true), - Field::new("value2", DataType::Utf8View, true), - ])); - let plan = get_read_plan( - &session.state(), - files_by_store, - &arrow_schema_extended, - Some(1), - &file_id_field, - None, - ) - .await?; - let batches = collect(plan, session.task_ctx()).await?; - let expected = vec![ - "+----+-------+--------+-----------------------------+", - "| id | value | value2 | __delta_rs_file_id__ |", - "+----+-------+--------+-----------------------------+", - "| 1 | a | | memory:///test_data.parquet |", - "+----+-------+--------+-----------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &batches); - assert!(matches!( - batches[0].column(1).data_type(), - DataType::Utf8View - )); - assert!(matches!( - batches[0].column(2).data_type(), - DataType::Utf8View - )); - Ok(()) } @@ -610,9 +697,10 @@ mod tests { let path = Path::from("test_data.parquet"); store.put(&path, buffer.into()).await?; let mut file: PartitionedFile = store.head(&path).await?.into(); - file.partition_values.push(ScalarValue::Utf8(Some( - "memory:///test_data.parquet".to_string(), - ))); + file.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "memory:///test_data.parquet".to_string(), + )))); let files_by_store = vec![( store_url.as_object_store_url(), @@ -726,9 +814,11 @@ mod tests { let path = Path::from("test_data.parquet"); store_1.put(&path, buffer.into()).await?; let mut file_1: PartitionedFile = store_1.head(&path).await?.into(); - file_1.partition_values.push(ScalarValue::Utf8(Some( - "first:///test_data.parquet".to_string(), - ))); + file_1 + .partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "first:///test_data.parquet".to_string(), + )))); let mut buffer = Vec::new(); let mut arrow_writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None)?; @@ -737,9 +827,11 @@ mod tests { let path = Path::from("test_data.parquet"); store_2.put(&path, buffer.into()).await?; let mut file_2: PartitionedFile = store_2.head(&path).await?.into(); - file_2.partition_values.push(ScalarValue::Utf8(Some( - "second:///test_data.parquet".to_string(), - ))); + file_2 + .partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "second:///test_data.parquet".to_string(), + )))); let files_by_store = vec![ ( @@ -810,9 +902,10 @@ mod tests { let path = Path::from("test_data.parquet"); store.put(&path, buffer.into()).await?; let mut file: PartitionedFile = store.head(&path).await?.into(); - file.partition_values.push(ScalarValue::Utf8(Some( - "memory:///test_data.parquet".to_string(), - ))); + file.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "memory:///test_data.parquet".to_string(), + )))); let files_by_store = vec![( store_url.as_object_store_url(), @@ -847,4 +940,167 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_predicate_pushdown_allows_view_literal_against_base_parquet_schema() -> TestResult + { + use datafusion::scalar::ScalarValue; + + let store = Arc::new(InMemory::new()); + let store_url = Url::parse("memory:///")?; + let session = Arc::new(create_session().into_inner()); + session + .runtime_env() + .register_object_store(&store_url, store.clone()); + + // Parquet read schema uses base types; view types are applied at the DeltaScan boundary. + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + let data = RecordBatch::try_new( + arrow_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("charlie"), + ])), + ], + )?; + + let mut buffer = Vec::new(); + let mut arrow_writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None)?; + arrow_writer.write(&data)?; + arrow_writer.close()?; + + let path = Path::from("test_view_literal.parquet"); + store.put(&path, buffer.into()).await?; + let mut file: PartitionedFile = store.head(&path).await?.into(); + file.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "memory:///test_view_literal.parquet".to_string(), + )))); + + let files_by_store = vec![( + store_url.as_object_store_url(), + vec![(file, None::>)], + )]; + + let file_id_field = Arc::new(Field::new( + FILE_ID_COLUMN_DEFAULT, + DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()), + false, + )); + + let predicate = col("name").eq(lit(ScalarValue::Utf8View(Some("bob".to_string())))); + let plan = get_read_plan( + &session.state(), + files_by_store, + &arrow_schema, + None, + &file_id_field, + Some(&predicate), + ) + .await?; + let batches = collect(plan, session.task_ctx()).await?; + + let expected = vec![ + "+----+------+-------------------------------------+", + "| id | name | __delta_rs_file_id__ |", + "+----+------+-------------------------------------+", + "| 2 | bob | memory:///test_view_literal.parquet |", + "+----+------+-------------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn test_predicate_pushdown_allows_binaryview_literal_against_base_parquet_schema() + -> TestResult { + use datafusion::scalar::ScalarValue; + + let store = Arc::new(InMemory::new()); + let store_url = Url::parse("memory:///")?; + let session = Arc::new(create_session().into_inner()); + session + .runtime_env() + .register_object_store(&store_url, store.clone()); + + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("data", DataType::Binary, true), + ])); + let data = RecordBatch::try_new( + arrow_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"aaa".as_slice()), + Some(b"bbb".as_slice()), + Some(b"ccc".as_slice()), + ])), + ], + )?; + + let mut buffer = Vec::new(); + let mut arrow_writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None)?; + arrow_writer.write(&data)?; + arrow_writer.close()?; + + let path = Path::from("test_binary_view.parquet"); + store.put(&path, buffer.into()).await?; + let mut file: PartitionedFile = store.head(&path).await?.into(); + file.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + "memory:///test_binary_view.parquet".to_string(), + )))); + + let files_by_store = vec![( + store_url.as_object_store_url(), + vec![(file, None::>)], + )]; + + let file_id_field = Arc::new(Field::new( + FILE_ID_COLUMN_DEFAULT, + DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()), + false, + )); + + let predicate = col("data").eq(lit(ScalarValue::BinaryView(Some(b"bbb".to_vec())))); + let plan = get_read_plan( + &session.state(), + files_by_store, + &arrow_schema, + None, + &file_id_field, + Some(&predicate), + ) + .await?; + let batches = collect(plan, session.task_ctx()).await?; + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + let id_col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.value(0), 2); + + let data_col = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(data_col.value(0), b"bbb"); + + assert_eq!(batches[0].num_columns(), 3); + assert_eq!(batches[0].schema().field(2).name(), FILE_ID_COLUMN_DEFAULT); + + Ok(()) + } } diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs index 71a741677..73743431e 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs @@ -49,7 +49,7 @@ use crate::kernel::{Scan, Snapshot}; /// Manages three schemas: /// - **result_schema**: Logical schema exposed to query after all transformations /// - **output_schema**: Final schema including metadata columns (e.g., file_id) -/// - **parquet_read_schema**: Physical schema for reading Parquet files +/// - **parquet_read_schema**: Physical schema for Parquet reads + predicate evaluation /// /// # Predicate Pushdown /// @@ -67,7 +67,7 @@ pub(crate) struct KernelScanPlan { /// If set, indicates a projection to apply to the /// scan output to obtain the result schema pub(crate) result_projection: Option>, - /// The schema the inner Parquet scan should read from data files. + /// Physical schema used for Parquet reads and predicate evaluation. pub(crate) parquet_read_schema: SchemaRef, /// If set, indicates a predicate to apply at the Parquet scan level pub(crate) parquet_predicate: Option, @@ -203,10 +203,8 @@ impl KernelScanPlan { } else { result_schema.clone() }; - let parquet_read_schema = config.physical_arrow_schema( - scan.snapshot().table_configuration(), - &scan.physical_schema().as_ref().try_into_arrow()?, - )?; + let parquet_read_schema = + config.parquet_file_schema(&scan.physical_schema().as_ref().try_into_arrow()?)?; Ok(Self { scan, result_schema, @@ -233,6 +231,8 @@ impl KernelScanPlan { impl DeltaScanConfig { pub(crate) fn file_id_field(&self) -> FieldRef { + // NOTE: keep the synthetic file-id column as Dictionary. + // Arrow's dictionary packing does not support Utf8View, and this column is internal. Arc::new(Field::new( self.file_column_name .as_deref() @@ -275,6 +275,61 @@ impl DeltaScanConfig { Ok(table_schema) } + fn parquet_file_schema(&self, base: &Schema) -> Result { + // IMPORTANT: This schema is used for Parquet reading and predicate evaluation. + // + // DataFusion can materialize `Utf8View/BinaryView` when requested, but predicate + // pushdown is evaluated against the physical file schema before schema-rewriter casts + // are applied, and those coercions are not consistently propagated through nested field + // accesses nor to literals. This can produce mismatches during evaluation such as: + // `Invalid comparison operation: Utf8 == Utf8View`. + // + // To keep pushdown stable, we request base `Utf8/Binary` here and only expose view + // types at the DeltaScan boundary (see `map_field`). + // + // NOTE: This Parquet read schema is intentionally not the same as the table provider + // output schema. The output schema may force view types (Utf8View/BinaryView) and other + // presentation-level physical types, but Parquet read + predicate pushdown must operate + // on the file's base types. + let table_schema = Arc::new(Schema::new( + base.fields() + .iter() + .map(|f| self.map_field_for_parquet(f.clone())) + .collect_vec(), + )); + Ok(table_schema) + } + + fn map_field_for_parquet(&self, field: FieldRef) -> FieldRef { + let dt = match field.data_type() { + DataType::Struct(fields) => DataType::Struct( + fields + .iter() + .map(|f| self.map_field_for_parquet(f.clone())) + .collect(), + ), + DataType::List(inner) => DataType::List(self.map_field_for_parquet(inner.clone())), + DataType::LargeList(inner) => { + DataType::LargeList(self.map_field_for_parquet(inner.clone())) + } + DataType::ListView(inner) => { + DataType::ListView(self.map_field_for_parquet(inner.clone())) + } + // Always use base types for the Parquet read schema. + DataType::Utf8View => DataType::Utf8, + DataType::BinaryView => DataType::Binary, + _ => field.data_type().clone(), + }; + + let field = if &dt != field.data_type() { + Arc::new(field.as_ref().clone().with_data_type(dt)) + } else { + field + }; + + field + } + fn map_field(&self, field: FieldRef, partition_cols: &[String]) -> FieldRef { if partition_cols.contains(field.name()) && self.wrap_partition_values { return match field.data_type() { @@ -350,9 +405,16 @@ pub(crate) fn supports_filters_pushdown( .file_column_name .as_deref() .unwrap_or(FILE_ID_COLUMN_DEFAULT); + + // Parquet predicate pushdown is enabled only when we can safely apply it at read time. + // Deletion vectors require preserving row order for selection masks, and row tracking + // disables predicate pushdown in the read plan. + let parquet_pushdown_enabled = scan_config.enable_parquet_pushdown + && !config.is_feature_enabled(&TableFeature::RowTracking) + && !config.is_feature_enabled(&TableFeature::DeletionVectors); filter .iter() - .map(|f| process_predicate(f, config, file_id_field).pushdown) + .map(|f| process_predicate(f, config, file_id_field, parquet_pushdown_enabled).pushdown) .collect() } @@ -374,9 +436,13 @@ fn process_filters( .file_column_name .as_deref() .unwrap_or(FILE_ID_COLUMN_DEFAULT); + + let parquet_pushdown_enabled = scan_config.enable_parquet_pushdown + && !config.is_feature_enabled(&TableFeature::RowTracking) + && !config.is_feature_enabled(&TableFeature::DeletionVectors); let (parquet, kernel): (Vec<_>, Vec<_>) = filters .iter() - .map(|f| process_predicate(f, config, file_id_field)) + .map(|f| process_predicate(f, config, file_id_field, parquet_pushdown_enabled)) .map(|p| (p.parquet_predicate, p.kernel_predicate)) .unzip(); let parquet = if config.is_feature_enabled(&TableFeature::ColumnMapping) { @@ -403,6 +469,7 @@ fn process_predicate<'a>( expr: &'a Expr, config: &TableConfiguration, file_id_column: &str, + parquet_pushdown_enabled: bool, ) -> ProcessedPredicate<'a> { let cols = config.metadata().partition_columns(); let only_partition_refs = expr.column_refs().iter().all(|c| cols.contains(&c.name)); @@ -431,7 +498,13 @@ fn process_predicate<'a>( // push down any predicate to parquet (TableProviderFilterPushDown::Inexact, None) } else { - (TableProviderFilterPushDown::Inexact, Some(expr)) + // For non-partition predicates we can *attempt* Parquet pushdown, but it is not a + // correctness boundary (it may be partially applied or skipped). Keep this Inexact so + // DataFusion retains a post-scan Filter. + ( + TableProviderFilterPushDown::Inexact, + parquet_pushdown_enabled.then_some(expr), + ) }; return ProcessedPredicate { pushdown, @@ -453,7 +526,7 @@ fn process_predicate<'a>( ProcessedPredicate { pushdown: TableProviderFilterPushDown::Inexact, kernel_predicate: None, - parquet_predicate: Some(expr), + parquet_predicate: parquet_pushdown_enabled.then_some(expr), } } @@ -507,6 +580,29 @@ mod tests { use super::*; + fn schema_has_view_types(schema: &Schema) -> bool { + schema + .fields() + .iter() + .any(|f| data_type_has_view_types(f.data_type())) + } + + fn data_type_has_view_types(dt: &DataType) -> bool { + match dt { + DataType::Utf8View | DataType::BinaryView => true, + DataType::Dictionary(_, value) => data_type_has_view_types(value.as_ref()), + DataType::Map(entry, _) => data_type_has_view_types(entry.data_type()), + DataType::Struct(fields) => fields + .iter() + .any(|f| data_type_has_view_types(f.data_type())), + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::ListView(inner) + | DataType::FixedSizeList(inner, _) => data_type_has_view_types(inner.data_type()), + _ => false, + } + } + #[tokio::test] async fn test_rewrite_expression() -> TestResult { let mut table = open_fs_path("../test/tests/data/table_with_column_mapping"); @@ -627,4 +723,30 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_view_types_only_exposed_in_result_schema() -> TestResult { + let mut table = open_fs_path("../test/tests/data/table_with_column_mapping"); + table.load().await?; + + let snapshot = table.snapshot()?.snapshot().snapshot(); + + let mut config = DeltaScanConfig::default(); + config.schema_force_view_types = true; + let scan_plan = KernelScanPlan::try_new(snapshot, None, &[], &config, None)?; + assert!(schema_has_view_types(scan_plan.result_schema.as_ref())); + assert!(!schema_has_view_types( + scan_plan.parquet_read_schema.as_ref() + )); + + let mut config = DeltaScanConfig::default(); + config.schema_force_view_types = false; + let scan_plan = KernelScanPlan::try_new(snapshot, None, &[], &config, None)?; + assert!(!schema_has_view_types(scan_plan.result_schema.as_ref())); + assert!(!schema_has_view_types( + scan_plan.parquet_read_schema.as_ref() + )); + + Ok(()) + } } diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs index b7a5790a5..956228a6d 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs @@ -302,6 +302,7 @@ fn extract_file_statistics( min_value: Precision::Absent, sum_value: Precision::Absent, distinct_count: Precision::Absent, + byte_size: Precision::Absent, }; } @@ -330,6 +331,7 @@ fn extract_file_statistics( min_value, sum_value: Precision::Absent, distinct_count: Precision::Absent, + byte_size: Precision::Absent, } }) .collect_vec(); diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index a8529e1f1..aec02ba48 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -293,6 +293,7 @@ mod datafusion { min_value, sum_value: Precision::Absent, distinct_count: Precision::Absent, + byte_size: Precision::Absent, }) } } @@ -309,6 +310,7 @@ mod datafusion { min_value: self.min_value.min(&other.min_value), sum_value: Precision::Absent, distinct_count: self.distinct_count.add(&other.distinct_count), + byte_size: Precision::Absent, } } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 70440dbeb..3ecf33686 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -108,6 +108,7 @@ pub use self::table::builder::{ }; pub use self::table::config::TableProperty; pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path}; +#[allow(deprecated)] pub use operations::DeltaOps; pub use protocol::checkpoints; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index c88d65cb6..ba46b80cb 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -151,7 +151,6 @@ impl DeleteBuilder { self } - /// The Datafusion session state to use pub fn with_session_state(mut self, session: Arc) -> Self { self.session = Some(session); self diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index ce9b15e03..7d38df6a3 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -17,11 +17,10 @@ use arrow_schema::{ArrowError, Field, Schema}; use chrono::{DateTime, Utc}; use datafusion::catalog::Session; use datafusion::common::ScalarValue; -use datafusion::common::config::TableParquetOptions; +use datafusion::config::TableParquetOptions; use datafusion::datasource::memory::DataSourceExec; -use datafusion::datasource::physical_plan::{ - FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, -}; +use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::table_schema::TableSchema; use datafusion::physical_expr::{PhysicalExpr, expressions}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::projection::ProjectionExec; @@ -396,48 +395,62 @@ impl CdfLoadBuilder { Self::get_remove_action_type(), )?; - // Create the parquet scans for each associated type of file. - let mut parquet_source = ParquetSource::new(TableParquetOptions::new()); + let cdc_partition_fields: Vec> = + cdc_partition_cols.into_iter().map(Arc::new).collect(); + let add_remove_partition_fields: Vec> = add_remove_partition_cols + .into_iter() + .map(Arc::new) + .collect(); + + let cdc_table_schema = TableSchema::new(Arc::clone(&cdc_file_schema), cdc_partition_fields); + let add_table_schema = TableSchema::new( + Arc::clone(&add_remove_file_schema), + add_remove_partition_fields.clone(), + ); + let remove_table_schema = TableSchema::new( + Arc::clone(&add_remove_file_schema), + add_remove_partition_fields, + ); + + let parquet_options = TableParquetOptions { + global: session.config().options().execution.parquet.clone(), + ..Default::default() + }; + + let mut cdc_source = ParquetSource::new(cdc_table_schema) + .with_table_parquet_options(parquet_options.clone()); + let mut add_source = ParquetSource::new(add_table_schema) + .with_table_parquet_options(parquet_options.clone()); + let mut remove_source = + ParquetSource::new(remove_table_schema).with_table_parquet_options(parquet_options); + if let Some(filters) = filters { - parquet_source = parquet_source.with_predicate(Arc::clone(filters)); + cdc_source = cdc_source.with_predicate(Arc::clone(filters)); + add_source = add_source.with_predicate(Arc::clone(filters)); + remove_source = remove_source.with_predicate(Arc::clone(filters)); } - let parquet_source: Arc = Arc::new(parquet_source); + let cdc_scan: Arc = DataSourceExec::from_data_source( - FileScanConfigBuilder::new( - self.log_store.object_store_url(), - Arc::clone(&cdc_file_schema), - Arc::clone(&parquet_source), - ) - .with_file_groups(cdc_file_groups.into_values().map(FileGroup::from).collect()) - .with_table_partition_cols(cdc_partition_cols) - .build(), + FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(cdc_source)) + .with_file_groups(cdc_file_groups.into_values().map(FileGroup::from).collect()) + .build(), ); let add_scan: Arc = DataSourceExec::from_data_source( - FileScanConfigBuilder::new( - self.log_store.object_store_url(), - Arc::clone(&add_remove_file_schema), - Arc::clone(&parquet_source), - ) - .with_file_groups(add_file_groups.into_values().map(FileGroup::from).collect()) - .with_table_partition_cols(add_remove_partition_cols.clone()) - .build(), + FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(add_source)) + .with_file_groups(add_file_groups.into_values().map(FileGroup::from).collect()) + .build(), ); let remove_scan: Arc = DataSourceExec::from_data_source( - FileScanConfigBuilder::new( - self.log_store.object_store_url(), - Arc::clone(&add_remove_file_schema), - parquet_source, - ) - .with_file_groups( - remove_file_groups - .into_values() - .map(FileGroup::from) - .collect(), - ) - .with_table_partition_cols(add_remove_partition_cols) - .build(), + FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(remove_source)) + .with_file_groups( + remove_file_groups + .into_values() + .map(FileGroup::from) + .collect(), + ) + .build(), ); // The output batches are then unioned to create a single output. Coalesce partitions is only here for the time diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index db778475d..48cfc2f59 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -383,7 +383,16 @@ impl MergeBuilder { self } - /// The Datafusion session state to use + /// Set the DataFusion session used for planning and execution. + /// + /// The provided `state` must wrap a concrete `datafusion::execution::context::SessionState`. + /// Other `datafusion::catalog::Session` implementations will cause the operation to return an + /// error at execution time. + /// + /// This strictness avoids subtle bugs where Delta object stores could be registered on one + /// runtime environment while execution uses a different `task_ctx()` / runtime environment. + /// + /// Example: `Arc::new(create_session().state())`. pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self @@ -1495,13 +1504,14 @@ fn modify_schema( Ok(target_field) => { // This case is when there is an added column in an nested datatype let new_field = merge_arrow_field(target_field, source_field, true)?; - if &new_field != target_field { + if new_field != **target_field { ending_schema.try_merge(&Arc::new(new_field))?; } } Err(_) => { // This function is called multiple time with different operations so this handle any collisions - ending_schema.try_merge(&Arc::new(source_field.to_owned().with_nullable(true)))?; + ending_schema + .try_merge(&Arc::new(source_field.as_ref().clone().with_nullable(true)))?; } } } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index ccd3f4b61..46d65877b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -58,7 +58,7 @@ use crate::protocol::DeltaOperation; use crate::table::config::TablePropertiesExt as _; use crate::table::state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; -use crate::{DeltaTable, ObjectMeta, PartitionFilter, crate_version}; +use crate::{DeltaTable, ObjectMeta, PartitionFilter, crate_version, to_kernel_predicate}; /// Metrics from Optimize #[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] @@ -329,7 +329,6 @@ impl<'a> OptimizeBuilder<'a> { self } - /// The Datafusion session state to use pub fn with_session_state(mut self, session: Arc) -> Self { self.session = Some(session); self @@ -944,7 +943,17 @@ async fn build_compaction_plan( let mut metrics = Metrics::default(); let mut partition_files: HashMap, Vec)> = HashMap::new(); - let mut file_stream = snapshot.file_views_by_partitions(log_store, filters); + + let predicate = if filters.is_empty() { + None + } else { + Some(Arc::new(to_kernel_predicate( + filters, + snapshot.schema().as_ref(), + )?)) + }; + + let mut file_stream = snapshot.file_views(log_store, predicate); while let Some(file) = file_stream.next().await { let file = file?; metrics.total_considered_files += 1; @@ -1056,7 +1065,17 @@ async fn build_zorder_plan( let mut metrics = Metrics::default(); let mut partition_files: HashMap, MergeBin)> = HashMap::new(); - let mut file_stream = snapshot.file_views_by_partitions(log_store, filters); + + let predicate = if filters.is_empty() { + None + } else { + Some(Arc::new(to_kernel_predicate( + filters, + snapshot.schema().as_ref(), + )?)) + }; + + let mut file_stream = snapshot.file_views(log_store, predicate); while let Some(file) = file_stream.next().await { let file = file?; let partition_values = file diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 4c619d047..b5a5e3d6b 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -178,7 +178,8 @@ mod local { .collect() .await?; - let batch = &batches[0]; + let schema = batches[0].schema(); + let batch = arrow::compute::concat_batches(&schema, &batches)?; assert_eq!( batch.column(0).as_ref(), @@ -200,7 +201,8 @@ mod local { .collect() .await?; - let batch = &batches[0]; + let schema = batches[0].schema(); + let batch = arrow::compute::concat_batches(&schema, &batches)?; assert_eq!( batch.column(0).as_ref(), @@ -327,7 +329,8 @@ mod local { let batches = df.collect().await?; - let batch = &batches[0]; + let schema = batches[0].schema(); + let batch = arrow::compute::concat_batches(&schema, &batches)?; assert_eq!( batch.column(0).as_ref(), @@ -497,9 +500,11 @@ mod local { assert_eq!(statistics.num_rows, Precision::Absent); - assert_eq!( - statistics.total_byte_size, - Precision::Exact((400 + 404 + 396) as usize) + let total_byte_size = statistics.total_byte_size.clone(); + let expected_total_byte_size = (400 + 404 + 396) as usize; + assert!( + total_byte_size == Precision::Exact(expected_total_byte_size) + || total_byte_size == Precision::Inexact(expected_total_byte_size) ); let column_stats = statistics.column_statistics.first().unwrap(); assert_eq!(column_stats.null_count, Precision::Absent); @@ -1364,7 +1369,7 @@ async fn simple_query(context: &IntegrationContext) -> TestResult { } #[tokio::test] -async fn test_schema_adapter_empty_batch() { +async fn test_schema_evolution_missing_column_returns_nulls() { let ctx = SessionContext::new(); let tmp_dir = tempfile::tempdir().unwrap(); let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); diff --git a/python/Cargo.toml b/python/Cargo.toml index 03fc07df2..ab770acc3 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -25,9 +25,11 @@ pyo3-arrow = { version = "0.14.0", default-features = false } # arrow arrow-schema = { workspace = true, features = ["serde"] } +arrow-cast = { workspace = true } # datafusion datafusion-ffi = { workspace = true } +datafusion-execution = { version = "52.1.0" } # serde serde = { workspace = true } diff --git a/python/src/datafusion.rs b/python/src/datafusion.rs index ced94a7f0..23dda4075 100644 --- a/python/src/datafusion.rs +++ b/python/src/datafusion.rs @@ -266,6 +266,11 @@ mod tests { Ok(None) } } + + fn reset_state(&self) -> Arc> { + // DataFusion may need to restart a stream from the beginning (e.g. recursive CTEs). + Arc::new(RwLock::new(TestBatchGenerator::new(self.data.clone()))) + } } #[tokio::test] diff --git a/python/src/lib.rs b/python/src/lib.rs index 6ab412109..b5a6f3554 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1857,7 +1857,16 @@ impl RawDeltaTable { TokioDeltaScan::new(scan, handle.clone()) .with_object_store(object_store_url, object_store), ) as Arc; - let provider = FFI_TableProvider::new(tokio_scan, false, Some(handle.clone())); + let ctx = + Arc::new(SessionContext::new()) as Arc; + let task_ctx_provider = datafusion_ffi::execution::FFI_TaskContextProvider::from(&ctx); + let provider = FFI_TableProvider::new( + tokio_scan, + false, + Some(handle.clone()), + task_ctx_provider, + None, + ); PyCapsule::new(py, provider, Some(name.clone())) } diff --git a/python/src/reader.rs b/python/src/reader.rs index 6305ab5c3..ec0b06a1f 100644 --- a/python/src/reader.rs +++ b/python/src/reader.rs @@ -1,4 +1,7 @@ -use arrow_schema::{ArrowError, SchemaRef}; +use std::sync::Arc; + +use arrow_cast::cast; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use deltalake::arrow::array::RecordBatchReader; use deltalake::arrow::record_batch::RecordBatch; use deltalake::datafusion::execution::SendableRecordBatchStream; @@ -9,15 +12,77 @@ use crate::utils::rt; /// A lazy adapter to convert an async RecordBatchStream into a sync RecordBatchReader struct StreamToReaderAdapter { schema: SchemaRef, + cast_targets: Vec>, + needs_cast: bool, stream: SendableRecordBatchStream, } +fn view_type_contract(schema: &SchemaRef) -> (SchemaRef, Vec>, bool) { + let mut targets = Vec::with_capacity(schema.fields().len()); + let mut needs_cast = false; + + let fields = schema + .fields() + .iter() + .map(|f| { + let dt = match f.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + needs_cast = true; + targets.push(Some(DataType::Utf8View)); + DataType::Utf8View + } + DataType::Binary | DataType::LargeBinary => { + needs_cast = true; + targets.push(Some(DataType::BinaryView)); + DataType::BinaryView + } + _ => { + targets.push(None); + f.data_type().clone() + } + }; + + if &dt != f.data_type() { + Arc::new(f.as_ref().clone().with_data_type(dt)) + } else { + f.clone() + } + }) + .collect::>(); + + let out = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); + (out, targets, needs_cast) +} + impl Iterator for StreamToReaderAdapter { type Item = Result; fn next(&mut self) -> Option { - rt().block_on(self.stream.next()) - .map(|b| b.map_err(|e| ArrowError::ExternalError(Box::new(e)))) + let next = rt() + .block_on(self.stream.next()) + .map(|b| b.map_err(|e| ArrowError::ExternalError(Box::new(e)))); + + match next { + Some(Ok(batch)) if self.needs_cast => Some(self.normalize_batch(batch)), + other => other, + } + } +} + +impl StreamToReaderAdapter { + fn normalize_batch(&self, batch: RecordBatch) -> Result { + let mut cols = Vec::with_capacity(batch.num_columns()); + + for i in 0..batch.num_columns() { + match &self.cast_targets[i] { + Some(dt) if batch.schema().field(i).data_type() != dt => { + cols.push(cast(batch.column(i).as_ref(), dt)?); + } + _ => cols.push(batch.column(i).clone()), + } + } + + RecordBatch::try_new(self.schema.clone(), cols) } } @@ -31,8 +96,11 @@ impl RecordBatchReader for StreamToReaderAdapter { pub(crate) fn convert_stream_to_reader( stream: SendableRecordBatchStream, ) -> Box { + let (schema, cast_targets, needs_cast) = view_type_contract(&stream.schema()); Box::new(StreamToReaderAdapter { - schema: stream.schema(), + schema, + cast_targets, + needs_cast, stream, }) } diff --git a/python/src/writer.rs b/python/src/writer.rs index 109be666c..e3aaeb53b 100644 --- a/python/src/writer.rs +++ b/python/src/writer.rs @@ -90,6 +90,40 @@ impl LazyBatchGenerator for ArrowStreamBatchGenerator { None => Ok(None), // End of stream } } + + fn reset_state(&self) -> Arc> { + Arc::new(RwLock::new(ExhaustedStreamGenerator)) + } +} + +/// Exhausted stream generator (consumed streams cannot be reset). +#[derive(Debug)] +struct ExhaustedStreamGenerator; + +impl std::fmt::Display for ExhaustedStreamGenerator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ExhaustedStreamGenerator") + } +} + +impl LazyBatchGenerator for ExhaustedStreamGenerator { + fn as_any(&self) -> &dyn Any { + self + } + + fn generate_next_batch( + &mut self, + ) -> deltalake::datafusion::error::Result> { + Err(deltalake::datafusion::error::DataFusionError::Execution( + "Stream-based generator cannot be reset; the original stream has been consumed. \ + Buffer input data if plan re-execution is required." + .to_string(), + )) + } + + fn reset_state(&self) -> Arc> { + Arc::new(RwLock::new(ExhaustedStreamGenerator)) + } } /// A lazy casting wrapper around a RecordBatchReader diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 3dfa9c5b9..609849058 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -20,6 +20,20 @@ from minio import Minio +def pytest_collection_modifyitems( + config: pytest.Config, items: list[pytest.Item] +) -> None: + if os.environ.get("DELTALAKE_RUN_DATAFUSION_TESTS") == "1": + return + + skip = pytest.mark.skip( + reason="DataFusion Python integration tests require matching datafusion wheels; disabled by default." + ) + for item in items: + if "datafusion" in item.keywords: + item.add_marker(skip) + + def wait_till_host_is_available(host: str, timeout_sec: int = 0.5): spacing = 2 start = time.monotonic() @@ -261,7 +275,7 @@ def sample_table_with_spaces_numbers() -> Table: nrows = 5 return Table.from_pydict( { - "1id": Array(["1", "2", "3", "4", "5"], DataType.string()), + "1id": Array(["1", "2", "3", "4", "5"], DataType.string_view()), "price": Array(list(range(nrows)), DataType.int64()), "sold items": Array(list(range(nrows)), DataType.int32()), "deleted": Array( @@ -271,7 +285,7 @@ def sample_table_with_spaces_numbers() -> Table: }, schema=Schema( fields=[ - Field("1id", type=DataType.string(), nullable=True), + Field("1id", type=DataType.string_view(), nullable=True), Field("price", type=DataType.int64(), nullable=True), Field("sold items", type=DataType.int32(), nullable=True), Field("deleted", type=DataType.bool(), nullable=True), diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 0ad430fce..589a28dac 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -12,6 +12,45 @@ import pyarrow as pa +def _normalize_pyarrow_view_types_for_sort(tbl: "pa.Table") -> "pa.Table": + """Normalize view types so pyarrow Table.sort_by works. + + PyArrow does not currently support sorting RecordBatches that contain + `binary_view` (and sometimes `string_view`) columns. + """ + + import pyarrow as pa + + # Older PyArrow versions may not expose view types. + binary_view = getattr(pa, "binary_view", None) + string_view = getattr(pa, "string_view", None) + if binary_view is None and string_view is None: + return tbl + + fields = [] + columns = [] + for field in tbl.schema: + col = tbl[field.name] + ty = field.type + + if binary_view is not None and ty == binary_view(): + ty = pa.binary() + # PyArrow 16.x lacks casts for view types, so rebuild the array. + col = pa.array(col.to_pylist(), type=ty) + elif string_view is not None and ty == string_view(): + ty = pa.string() + col = pa.array(col.to_pylist(), type=ty) + + fields.append( + pa.field(field.name, ty, nullable=field.nullable, metadata=field.metadata) + ) + columns.append(col) + + return pa.Table.from_arrays( + columns, schema=pa.schema(fields, metadata=tbl.schema.metadata) + ) + + def test_read_cdf_partitioned_with_predicate(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") data = dt.load_cdf(0, 3, predicate="birthday = '2023-12-25'").read_all() @@ -497,17 +536,26 @@ def test_delete_partitioned_cdf(tmp_path, sample_data_pyarrow: "pa.Table"): .sort_by("int64") ) - table_schema = pa.schema(dt.schema()) - table_schema = table_schema.insert( - len(table_schema), pa.field("_change_type", pa.string(), nullable=False) - ) - cdc_data = ( - pa.table(dt.load_cdf().read_all()) - .filter(pc.field("_change_type") == "delete") - .select(["int64", "_change_type"]) - .sort_by("int64") + # delta-rs returns Arrow view types (e.g. `string_view`) for zero-copy. + # PyArrow 16.x lacks compute kernels for view types, and even `Table.filter` + # can fail because it applies `take` across *all* columns. + # Convert the only columns we need to kernel-backed types. + raw_cdc_table = pa.table(dt.load_cdf().read_all()) + change_type_py = raw_cdc_table.column( + raw_cdc_table.schema.get_field_index("_change_type") + ).to_pylist() + cdc_table = pa.table( + { + "int64": raw_cdc_table.column( + raw_cdc_table.schema.get_field_index("int64") + ), + "_change_type": pa.array(change_type_py, type=pa.string()), + } ) + delete_mask = pa.array([(v == "delete") for v in change_type_py], type=pa.bool_()) + cdc_data = cdc_table.filter(delete_mask).sort_by("int64") + assert cdc_data.to_pydict() == expected_data.to_pydict() @@ -617,7 +665,9 @@ def test_write_predicate_partitioned_cdf(tmp_path, sample_data_pyarrow: "pa.Tabl cdc_data = cdc_data.combine_chunks().sort_by([("_change_type", "ascending")]) assert expected_data == cdc_data - assert dt.to_pyarrow_table().sort_by([("utf8", "ascending")]) == sample_data_pyarrow + + table = _normalize_pyarrow_view_types_for_sort(dt.to_pyarrow_table()) + assert table.sort_by([("utf8", "ascending")]) == sample_data_pyarrow @pytest.mark.pyarrow @@ -662,8 +712,11 @@ def test_write_overwrite_unpartitioned_cdf(tmp_path, sample_data_pyarrow: "pa.Ta for col in tbl.column_names if col not in ["_commit_version", "_commit_timestamp"] ] - assert pa.table(tbl.select(select_cols)).sort_by(sort_values) == expected_data - assert dt.to_pyarrow_table().sort_by([("utf8", "ascending")]) == sample_data_pyarrow + actual = _normalize_pyarrow_view_types_for_sort(pa.table(tbl.select(select_cols))) + assert actual.sort_by(sort_values) == expected_data + + table = _normalize_pyarrow_view_types_for_sort(dt.to_pyarrow_table()) + assert table.sort_by([("utf8", "ascending")]) == sample_data_pyarrow @pytest.mark.pyarrow @@ -715,9 +768,10 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data_pyarrow: "pa.Tabl "_change_data shouldn't exist since a specific partition was overwritten" ) - assert pa.table(dt.load_cdf().read_all()).drop_columns( - ["_commit_version", "_commit_timestamp"] - ).sort_by(sort_values).select(expected_data.column_names) == pa.concat_tables( + actual = _normalize_pyarrow_view_types_for_sort(pa.table(dt.load_cdf().read_all())) + assert actual.drop_columns(["_commit_version", "_commit_timestamp"]).sort_by( + sort_values + ).select(expected_data.column_names) == pa.concat_tables( [first_batch, expected_data] ).sort_by(sort_values) @@ -767,10 +821,11 @@ def test_read_cdf_last_version(tmp_path): "foo": Array([1, 2, 3], type=Field("foo", DataType.int32(), nullable=True)), "_change_type": Array( ["insert", "insert", "insert"], - type=Field("foo", DataType.string(), nullable=True), + type=Field("_change_type", DataType.string_view(), nullable=True), ), "_commit_version": Array( - [0, 0, 0], type=Field("foo", DataType.int64(), nullable=True) + [0, 0, 0], + type=Field("_commit_version", DataType.int64(), nullable=True), ), } ) diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index fb528f8f7..d6485a963 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -511,7 +511,6 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): dt = DeltaTable(tmp_path) assert dt.version() == 1 new_df = dt.to_pandas() - print(dt.to_pandas()) assert len(new_df) == 1, "We overwrote! there should only be one row" diff --git a/python/tests/test_datafusion.py b/python/tests/test_datafusion.py index 47945a693..7a5b91754 100644 --- a/python/tests/test_datafusion.py +++ b/python/tests/test_datafusion.py @@ -1,17 +1,37 @@ +import os +from importlib.metadata import PackageNotFoundError, version + import pytest from arro3.core import Array, DataType, Field, Table from deltalake import DeltaTable, write_deltalake +def _datafusion_major_version() -> int | None: + try: + return int(version("datafusion").split(".")[0]) + except (PackageNotFoundError, ValueError): + return None + + @pytest.mark.datafusion def test_datafusion_table_provider(tmp_path): + if os.environ.get("DELTALAKE_RUN_DATAFUSION_TESTS") != "1": + pytest.skip( + "DataFusion Python integration tests are disabled by default; set DELTALAKE_RUN_DATAFUSION_TESTS=1" + ) + + datafusion_major = _datafusion_major_version() + if datafusion_major is None or datafusion_major < 52: + pytest.skip( + "DataFusion Python integration requires datafusion>=52 wheels (PyPI currently provides 51.x)" + ) nrows = 5 table = Table( { "id": Array( ["1", "2", "3", "4", "5"], - Field("id", type=DataType.string_view(), nullable=True), + Field("id", type=DataType.string(), nullable=True), ), "price": Array( list(range(nrows)), Field("price", type=DataType.int64(), nullable=True) diff --git a/python/tests/test_lakefs.py b/python/tests/test_lakefs.py index a1c54deda..af5ae5a31 100644 --- a/python/tests/test_lakefs.py +++ b/python/tests/test_lakefs.py @@ -132,7 +132,6 @@ def test_delete(lakefs_path: str, sample_table: Table, lakefs_storage_options): def test_optimize_min_commit_interval( lakefs_path: str, sample_table: Table, lakefs_storage_options ): - print(lakefs_path) write_deltalake( lakefs_path, sample_table, @@ -269,7 +268,8 @@ def test_add_constraint(lakefs_path, sample_table: Table, lakefs_storage_options data = Table( { "id": Array( - ["1"], type=ArrowField("id", type=DataType.string(), nullable=True) + ["1"], + type=ArrowField("id", type=DataType.string_view(), nullable=True), ), "price": Array( [-1], type=ArrowField("price", type=DataType.int64(), nullable=True) @@ -362,7 +362,9 @@ def test_merge(lakefs_path, sample_table: Table, lakefs_storage_options): source_table = Table( { - "id": Array(["5"], type=ArrowField("id", DataType.string(), nullable=True)), + "id": Array( + ["5"], type=ArrowField("id", DataType.string_view(), nullable=True) + ), "weight": Array( [105], type=ArrowField("weight", DataType.int32(), nullable=True) ), @@ -477,7 +479,7 @@ def sample_table_update(): { "id": Array( ["1", "2", "3", "4", "5"], - type=ArrowField("id", DataType.string(), nullable=True), + type=ArrowField("id", DataType.string_view(), nullable=True), ), "price": Array( list(range(nrows)), diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index da7c3eb42..3345264f3 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -28,7 +28,7 @@ def test_merge_when_matched_delete_wo_predicate( source_table = Table( { - "id": Array(["5"], ArrowField("id", DataType.string(), nullable=True)), + "id": Array(["5"], ArrowField("id", DataType.string_view(), nullable=True)), "weight": Array([105], ArrowField("id", DataType.int32(), nullable=True)), } ) @@ -1921,7 +1921,7 @@ def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, streaming: b { "foo": Array( ["a", "b", None, None], - ArrowField("foo", type=DataType.string(), nullable=True), + ArrowField("foo", type=DataType.string_view(), nullable=True), ), "bar": Array( [1, 2, 3, None], @@ -1961,7 +1961,7 @@ def get_value(name: str): { "foo": Array( ["a"], - ArrowField("foo", type=DataType.string(), nullable=True), + ArrowField("foo", type=DataType.string_view(), nullable=True), ), "bar": Array( [10], diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 37abde1df..e7b798115 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -113,7 +113,7 @@ def test_optimize_schema_evolved_table( { "foo": Array( ["1"], - ArrowField("foo", type=DataType.string_view(), nullable=True), + ArrowField("foo", type=DataType.string(), nullable=True), ), } ) @@ -124,7 +124,7 @@ def test_optimize_schema_evolved_table( { "bar": Array( ["1"], - ArrowField("bar", type=DataType.string_view(), nullable=True), + ArrowField("bar", type=DataType.string(), nullable=True), ), } ) @@ -188,7 +188,7 @@ def test_zorder_with_space_partition(tmp_path: pathlib.Path): partitioned_df = test_table.to_pandas( partitions=[("country", "=", "United States")], ) - print(partitioned_df) + _ = partitioned_df test_table.optimize.z_order(columns=["user"]) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index e8e912094..9a5173647 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -180,7 +180,6 @@ def test_load_as_version_datetime_with_logs_removed( for file_name, dt_epoch in log_mtime_pairs: file_path = log_path / file_name - print(file_path) os.utime(file_path, (dt_epoch, dt_epoch)) dt = DeltaTable(tmp_path, version=expected_version) @@ -1127,11 +1126,11 @@ def test_read_query_builder_join_multiple_tables(tmp_path): { "date": Array( ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"], - ArrowField("date", type=DataType.string_view(), nullable=True), + ArrowField("date", type=DataType.string(), nullable=True), ), "value": Array( ["a", "b", "c", "d"], - ArrowField("value", type=DataType.string_view(), nullable=True), + ArrowField("value", type=DataType.string(), nullable=True), ), } ), diff --git a/python/tests/test_update.py b/python/tests/test_update.py index e9fb1e79c..259db30f6 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -15,7 +15,7 @@ def sample_table(): { "id": Array( ["1", "2", "3", "4", "5"], - ArrowField("id", type=DataType.string(), nullable=True), + ArrowField("id", type=DataType.string_view(), nullable=True), ), "price": Array( list(range(nrows)), diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index d0ba3644e..c780fdebe 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -178,9 +178,7 @@ def test_merge_schema(existing_table: DeltaTable): read_data = existing_table.to_pyarrow_table().sort_by( [("utf8", "ascending"), ("new_x", "ascending")] ) - print(repr(read_data.to_pylist())) concated = pa.concat_tables([old_table_data, new_data]) - print(repr(concated.to_pylist())) assert read_data == concated write_deltalake(existing_table, new_data, mode="overwrite", schema_mode="overwrite") @@ -550,9 +548,7 @@ def test_roundtrip_null_partition( ChunkedArray( Array( ["a", "a", "a", "a", None], - type=ArrowField( - "utf8_with_nulls", DataType.string_view(), nullable=True - ), + type=ArrowField("utf8_with_nulls", DataType.string(), nullable=True), ) ), ) @@ -1368,7 +1364,7 @@ def test_partition_overwrite_with_new_partition( { "p1": Array( ["1", "1", "2", "2"], - ArrowField("p1", type=DataType.string_view(), nullable=False), + ArrowField("p1", type=DataType.string(), nullable=False), ), "p2": Array( [1, 2, 1, 2], @@ -1710,7 +1706,7 @@ def test_schema_cols_diff_order(tmp_path: pathlib.Path): { "foo": Array( ["B"] * 10, - ArrowField("foo", type=DataType.string_view(), nullable=True), + ArrowField("foo", type=DataType.string(), nullable=True), ), "bar": Array( [1] * 10, @@ -2042,7 +2038,6 @@ def test_roundtrip_cdc_evolution(tmp_path: pathlib.Path): delta_table.update(predicate="utf8 = '1'", updates={"utf8": "'hello world'"}) delta_table = DeltaTable(tmp_path) - print(os.listdir(tmp_path)) # This is kind of a weak test to verify that CDFs were written assert os.path.isdir(os.path.join(tmp_path, "_change_data")) @@ -2670,7 +2665,6 @@ def test_write_table_with_deletion_vectors(tmp_path: pathlib.Path): ) assert dt.protocol().min_writer_version == 7 assert dt.version() == 0 - print(dt.protocol().writer_features) data = Table.from_pydict( {