Skip to content

Commit 60580cf

Browse files
authored
feat: update to datafusion 52 FileSource APIs (#147)
* feat: upgrade to datafusion 52 apis * feat: update FileOpener implementation
1 parent 6f2fc76 commit 60580cf

File tree

4 files changed

+54
-44
lines changed

4 files changed

+54
-44
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ all-features = true
3434
[dependencies]
3535
async-trait = { version = "0.1.77" }
3636
bytes = "1.4"
37-
datafusion = "51.0"
38-
datafusion-datasource = "51.0"
37+
datafusion = "52.0"
3938
futures = { version = "0.3", default-features = false, features = ["std"] }
4039
futures-util = { version = "0.3" }
4140
object_store = { version = "0.12" }

src/file_format.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::common::Statistics;
2525
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
2626
use datafusion::datasource::file_format::FileFormat;
2727
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
28+
use datafusion::datasource::table_schema::TableSchema;
2829
use datafusion::error::{DataFusionError, Result};
2930
use datafusion::physical_plan::ExecutionPlan;
3031
use futures::TryStreamExt;
@@ -122,7 +123,7 @@ impl FileFormat for OrcFormat {
122123
Ok(DataSourceExec::from_data_source(conf))
123124
}
124125

125-
fn file_source(&self) -> Arc<dyn FileSource> {
126-
Arc::new(OrcSource::default())
126+
fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
127+
Arc::new(OrcSource::new(table_schema))
127128
}
128129
}

src/file_source.rs

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,35 @@
1616
// under the License.
1717

1818
use crate::physical_exec::OrcOpener;
19-
use datafusion::common::Statistics;
19+
use datafusion::common::DataFusionError;
2020
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
21+
use datafusion::datasource::table_schema::TableSchema;
2122
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
22-
use datafusion_datasource::TableSchema;
23+
use datafusion::physical_plan::projection::ProjectionExprs;
2324
use object_store::ObjectStore;
2425
use std::any::Any;
2526
use std::sync::Arc;
2627

2728
#[derive(Debug, Clone)]
2829
pub struct OrcSource {
2930
metrics: ExecutionPlanMetricsSet,
30-
statistics: Statistics,
3131
batch_size: usize,
32+
table_schema: TableSchema,
33+
projection: ProjectionExprs,
3234
}
3335

34-
impl Default for OrcSource {
35-
fn default() -> Self {
36+
impl OrcSource {
37+
pub fn new(table_schema: TableSchema) -> Self {
38+
let table_schema_ref = table_schema.table_schema();
39+
let projection = ProjectionExprs::from_indices(
40+
&(0..table_schema_ref.fields().len()).collect::<Vec<_>>(),
41+
table_schema_ref,
42+
);
3643
Self {
3744
metrics: ExecutionPlanMetricsSet::default(),
38-
statistics: Statistics::default(),
3945
batch_size: 1024,
46+
table_schema,
47+
projection,
4048
}
4149
}
4250
}
@@ -47,45 +55,49 @@ impl FileSource for OrcSource {
4755
object_store: Arc<dyn ObjectStore>,
4856
config: &FileScanConfig,
4957
_partition: usize,
50-
) -> Arc<dyn FileOpener> {
51-
Arc::new(OrcOpener::new(object_store, config, self.batch_size))
58+
) -> Result<Arc<dyn FileOpener>, DataFusionError> {
59+
OrcOpener::try_new(
60+
object_store,
61+
self.table_schema.table_schema().clone(),
62+
config.batch_size.unwrap_or(self.batch_size),
63+
self.projection.clone(),
64+
)
65+
.map(|f| Arc::new(f) as Arc<dyn FileOpener>)
5266
}
5367

5468
fn as_any(&self) -> &dyn Any {
5569
self
5670
}
5771

72+
fn table_schema(&self) -> &TableSchema {
73+
&self.table_schema
74+
}
75+
5876
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
5977
Arc::new(Self {
6078
batch_size,
6179
..self.clone()
6280
})
6381
}
6482

65-
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
66-
Arc::new(self.clone())
67-
}
68-
69-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
70-
Arc::new(self.clone())
71-
}
72-
73-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
74-
Arc::new(Self {
75-
statistics,
76-
..self.clone()
77-
})
83+
fn projection(&self) -> Option<&ProjectionExprs> {
84+
Some(&self.projection)
7885
}
7986

8087
fn metrics(&self) -> &ExecutionPlanMetricsSet {
8188
&self.metrics
8289
}
8390

84-
fn statistics(&self) -> datafusion::common::Result<Statistics> {
85-
Ok(self.statistics.clone())
86-
}
87-
8891
fn file_type(&self) -> &str {
8992
"orc"
9093
}
94+
95+
fn try_pushdown_projection(
96+
&self,
97+
projection: &ProjectionExprs,
98+
) -> Result<Option<Arc<dyn FileSource>>, DataFusionError> {
99+
let mut source = self.clone();
100+
source.projection = self.projection.try_merge(projection)?;
101+
Ok(Some(Arc::new(source)))
102+
}
91103
}

src/physical_exec.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ use std::sync::Arc;
1919

2020
use datafusion::arrow::datatypes::SchemaRef;
2121
use datafusion::arrow::error::ArrowError;
22-
use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener, FileScanConfig};
22+
use datafusion::datasource::listing::PartitionedFile;
23+
use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener};
2324
use datafusion::error::Result;
24-
use datafusion_datasource::PartitionedFile;
25+
use datafusion::physical_plan::projection::ProjectionExprs;
2526
use orc_rust::projection::ProjectionMask;
2627
use orc_rust::ArrowReaderBuilder;
2728

@@ -31,28 +32,25 @@ use object_store::ObjectStore;
3132
use super::object_store_reader::ObjectStoreReader;
3233

3334
pub(crate) struct OrcOpener {
34-
projection: Vec<usize>,
35+
projection: ProjectionExprs,
3536
batch_size: usize,
3637
table_schema: SchemaRef,
3738
object_store: Arc<dyn ObjectStore>,
3839
}
3940

4041
impl OrcOpener {
41-
pub(crate) fn new(
42+
pub(crate) fn try_new(
4243
object_store: Arc<dyn ObjectStore>,
43-
config: &FileScanConfig,
44+
table_schema: SchemaRef,
4445
batch_size: usize,
45-
) -> Self {
46-
let projection = config
47-
.file_column_projection_indices()
48-
.unwrap_or_else(|| (0..config.file_schema().fields().len()).collect());
49-
50-
Self {
46+
projection: ProjectionExprs,
47+
) -> Result<Self> {
48+
Ok(Self {
5149
projection,
52-
batch_size: config.batch_size.unwrap_or(batch_size),
53-
table_schema: config.file_schema().clone(),
50+
batch_size,
51+
table_schema,
5452
object_store,
55-
}
53+
})
5654
}
5755
}
5856

@@ -61,7 +59,7 @@ impl FileOpener for OrcOpener {
6159
let object_meta = &file.object_meta;
6260
let reader = ObjectStoreReader::new(self.object_store.clone(), object_meta.clone());
6361
let batch_size = self.batch_size;
64-
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);
62+
let projected_schema = self.projection.project_schema(&self.table_schema)?;
6563

6664
Ok(Box::pin(async move {
6765
let mut builder = ArrowReaderBuilder::try_new_async(reader)

0 commit comments

Comments
 (0)