Skip to content

Commit f4e8f4c

Browse files
committed
fix(cube): Use reader_options_customizer in one more place
1 parent dacfbae commit f4e8f4c

3 files changed

Lines changed: 12 additions & 13 deletions

File tree

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::arrow::array::RecordBatch;
3131
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
3232
use crate::datasource::file_format::file_compression_type::FileCompressionType;
3333
use crate::datasource::file_format::write::get_writer_schema;
34-
use crate::datasource::physical_plan::parquet::{can_expr_be_pushed_down_with_schemas, get_reader_options_config_or_default, MetadataFetcher, ReaderOptionsConfig};
34+
use crate::datasource::physical_plan::parquet::{can_expr_be_pushed_down_with_schemas, get_reader_options_config_or_default, get_reader_options_customizer, MetadataFetcher};
3535
use crate::datasource::physical_plan::parquet::source::ParquetSource;
3636
use crate::datasource::physical_plan::{FileSink, FileSinkConfig};
3737
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
@@ -432,7 +432,7 @@ impl FileFormat for ParquetFormat {
432432

433433
async fn create_physical_plan(
434434
&self,
435-
_state: &dyn Session,
435+
state: &dyn Session,
436436
conf: FileScanConfig,
437437
filters: Option<&Arc<dyn PhysicalExpr>>,
438438
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -451,7 +451,7 @@ impl FileFormat for ParquetFormat {
451451
metadata_size_hint = Some(metadata);
452452
}
453453

454-
let mut source = ParquetSource::new(self.options.clone(), ReaderOptionsConfig::noop()); // TODO upgrade DF
454+
let mut source = ParquetSource::new(self.options.clone(), get_reader_options_customizer(state.config()));
455455

456456
if let Some(predicate) = predicate {
457457
source = source.with_predicate(Arc::clone(&conf.file_schema), predicate);

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -523,18 +523,9 @@ impl ExecutionPlan for ParquetExec {
523523
partition_index: usize,
524524
ctx: Arc<TaskContext>,
525525
) -> Result<SendableRecordBatchStream> {
526-
// TODO upgrade DF: we need reader_options_customizer used as in the commented code below.
527526
self.inner.execute(partition_index, ctx)
528527

529528
// TODO upgrade DF: We also want Ok(Box::pin(stream.instrument(tracing::trace_span!("read_files")))) applied, probably in inner.
530-
531-
// let reader_options_customizer =
532-
// get_reader_options_customizer(ctx.session_config());
533-
534-
// let opener = ParquetOpener {
535-
// ...
536-
// reader_options_customizer,
537-
// };
538529
}
539530
fn metrics(&self) -> Option<MetricsSet> {
540531
self.inner.metrics()

datafusion/core/src/datasource/physical_plan/parquet/source.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,16 @@ impl ParquetSource {
300300
pub fn new(table_parquet_options: TableParquetOptions, reader_options_customizer: Arc<dyn ReaderOptionsCustomizer>) -> Self {
301301
Self {
302302
table_parquet_options,
303+
metrics: Default::default(),
304+
predicate: Default::default(),
305+
pruning_predicate: Default::default(),
306+
page_pruning_predicate: Default::default(),
307+
parquet_file_reader_factory: Default::default(),
308+
schema_adapter_factory: Default::default(),
309+
batch_size: Default::default(),
310+
metadata_size_hint: Default::default(),
311+
projected_statistics: Default::default(),
303312
reader_options_customizer,
304-
..Self::default()
305313
}
306314
}
307315

0 commit comments

Comments
 (0)