Skip to content

Commit ad8b804

Browse files
update datafusion (#142)
1 parent 5b805e7 commit ad8b804

File tree

6 files changed

+148
-155
lines changed

6 files changed

+148
-155
lines changed

Cargo.toml

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ rust-version = "1.73"
3232
all-features = true
3333

3434
[dependencies]
35-
arrow = { version = "53", features = ["prettyprint", "chrono-tz"] }
35+
arrow = { version = "56.0", features = ["prettyprint", "chrono-tz"] }
3636
async-trait = { version = "0.1.77" }
3737
bytes = "1.4"
38-
datafusion = { version = "42.0" }
39-
datafusion-expr = { version = "42.0" }
40-
datafusion-physical-expr = { version = "42.0" }
38+
datafusion = "50.0"
39+
datafusion-datasource = "50.0"
4140
futures = { version = "0.3", default-features = false, features = ["std"] }
4241
futures-util = { version = "0.3" }
43-
object_store = { version = "0.11" }
44-
orc-rust = { version = "0.5", features = ["async"] }
42+
object_store = { version = "0.12" }
43+
orc-rust = { version = "0.6", features = ["async"] }
4544
tokio = { version = "1.28", features = [
4645
"io-util",
4746
"sync",
@@ -51,15 +50,6 @@ tokio = { version = "1.28", features = [
5150
"rt-multi-thread",
5251
] }
5352

54-
[dev-dependencies]
55-
arrow-ipc = { version = "53.0.0", features = ["lz4"] }
56-
arrow-json = "53.0.0"
57-
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
58-
opendal = { version = "0.48", default-features = false, features = ["services-memory"] }
59-
pretty_assertions = "1.3.0"
60-
proptest = "1.0.0"
61-
serde_json = { version = "1.0", default-features = false, features = ["std"] }
62-
6353
[[example]]
6454
name = "datafusion_integration"
6555
# Some issue when publishing and path isn't specified, so adding here

src/file_format.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,21 @@ use datafusion::arrow::datatypes::SchemaRef;
2525
use datafusion::common::Statistics;
2626
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
2727
use datafusion::datasource::file_format::FileFormat;
28-
use datafusion::datasource::physical_plan::FileScanConfig;
28+
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
2929
use datafusion::error::{DataFusionError, Result};
30-
use datafusion::execution::context::SessionState;
3130
use datafusion::physical_plan::ExecutionPlan;
32-
use datafusion_physical_expr::PhysicalExpr;
3331
use futures::TryStreamExt;
3432
use orc_rust::reader::metadata::read_metadata_async;
3533

34+
use crate::OrcSource;
3635
use async_trait::async_trait;
36+
use datafusion::catalog::Session;
37+
use datafusion::datasource::source::DataSourceExec;
3738
use futures_util::StreamExt;
3839
use object_store::path::Path;
3940
use object_store::{ObjectMeta, ObjectStore};
4041

4142
use super::object_store_reader::ObjectStoreReader;
42-
use super::physical_exec::OrcExec;
4343

4444
async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
4545
let loc_path = file.location.clone();
@@ -54,13 +54,7 @@ async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result
5454
}
5555

5656
#[derive(Clone, Debug)]
57-
pub struct OrcFormat {}
58-
59-
impl OrcFormat {
60-
pub fn new() -> Self {
61-
Self {}
62-
}
63-
}
57+
pub struct OrcFormat;
6458

6559
#[async_trait]
6660
impl FileFormat for OrcFormat {
@@ -76,9 +70,13 @@ impl FileFormat for OrcFormat {
7670
Ok("orc".to_string())
7771
}
7872

73+
fn compression_type(&self) -> Option<FileCompressionType> {
74+
None
75+
}
76+
7977
async fn infer_schema(
8078
&self,
81-
state: &SessionState,
79+
state: &dyn Session,
8280
store: &Arc<dyn ObjectStore>,
8381
objects: &[ObjectMeta],
8482
) -> Result<SchemaRef> {
@@ -109,7 +107,7 @@ impl FileFormat for OrcFormat {
109107

110108
async fn infer_stats(
111109
&self,
112-
_state: &SessionState,
110+
_state: &dyn Session,
113111
_store: &Arc<dyn ObjectStore>,
114112
table_schema: SchemaRef,
115113
_object: &ObjectMeta,
@@ -119,10 +117,13 @@ impl FileFormat for OrcFormat {
119117

120118
async fn create_physical_plan(
121119
&self,
122-
_state: &SessionState,
120+
_state: &dyn Session,
123121
conf: FileScanConfig,
124-
_filters: Option<&Arc<dyn PhysicalExpr>>,
125122
) -> Result<Arc<dyn ExecutionPlan>> {
126-
Ok(Arc::new(OrcExec::new(conf)))
123+
Ok(DataSourceExec::from_data_source(conf))
124+
}
125+
126+
fn file_source(&self) -> Arc<dyn FileSource> {
127+
Arc::new(OrcSource::default())
127128
}
128129
}

src/file_source.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::physical_exec::OrcOpener;
19+
use arrow::datatypes::SchemaRef;
20+
use datafusion::common::Statistics;
21+
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
22+
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
23+
use object_store::ObjectStore;
24+
use std::any::Any;
25+
use std::sync::Arc;
26+
27+
#[derive(Debug, Clone)]
28+
pub struct OrcSource {
29+
metrics: ExecutionPlanMetricsSet,
30+
statistics: Statistics,
31+
batch_size: usize,
32+
}
33+
34+
impl Default for OrcSource {
35+
fn default() -> Self {
36+
Self {
37+
metrics: ExecutionPlanMetricsSet::default(),
38+
statistics: Statistics::default(),
39+
batch_size: 1024,
40+
}
41+
}
42+
}
43+
44+
impl FileSource for OrcSource {
45+
fn create_file_opener(
46+
&self,
47+
object_store: Arc<dyn ObjectStore>,
48+
config: &FileScanConfig,
49+
_partition: usize,
50+
) -> Arc<dyn FileOpener> {
51+
Arc::new(OrcOpener::new(object_store, config, self.batch_size))
52+
}
53+
54+
fn as_any(&self) -> &dyn Any {
55+
self
56+
}
57+
58+
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
59+
Arc::new(Self {
60+
batch_size,
61+
..self.clone()
62+
})
63+
}
64+
65+
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
66+
Arc::new(self.clone())
67+
}
68+
69+
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
70+
Arc::new(self.clone())
71+
}
72+
73+
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
74+
Arc::new(Self {
75+
statistics,
76+
..self.clone()
77+
})
78+
}
79+
80+
fn metrics(&self) -> &ExecutionPlanMetricsSet {
81+
&self.metrics
82+
}
83+
84+
fn statistics(&self) -> datafusion::common::Result<Statistics> {
85+
Ok(self.statistics.clone())
86+
}
87+
88+
fn file_type(&self) -> &str {
89+
"orc"
90+
}
91+
}

src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ use datafusion::execution::options::ReadOptions;
5858

5959
use async_trait::async_trait;
6060

61-
use self::file_format::OrcFormat;
62-
6361
mod file_format;
62+
mod file_source;
6463
mod object_store_reader;
6564
mod physical_exec;
6665

66+
pub use file_format::OrcFormat;
67+
pub use file_source::OrcSource;
68+
6769
/// Configuration options for reading ORC files.
6870
#[derive(Clone)]
6971
pub struct OrcReadOptions<'a> {
@@ -85,8 +87,7 @@ impl ReadOptions<'_> for OrcReadOptions<'_> {
8587
_config: &SessionConfig,
8688
_table_options: TableOptions,
8789
) -> ListingOptions {
88-
let file_format = OrcFormat::new();
89-
ListingOptions::new(Arc::new(file_format)).with_file_extension(self.file_extension)
90+
ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(self.file_extension)
9091
}
9192

9293
async fn get_resolved_schema(
@@ -126,8 +127,7 @@ impl SessionContextOrcExt for SessionContext {
126127
// SessionContext::_read_type
127128
let table_paths = table_paths.to_urls()?;
128129
let session_config = self.copied_config();
129-
let listing_options =
130-
ListingOptions::new(Arc::new(OrcFormat::new())).with_file_extension(".orc");
130+
let listing_options = ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(".orc");
131131

132132
let option_extension = listing_options.file_extension.clone();
133133

src/object_store_reader.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use futures::future::BoxFuture;
2222
use futures::{FutureExt, TryFutureExt};
2323
use orc_rust::reader::AsyncChunkReader;
2424

25-
use object_store::{ObjectMeta, ObjectStore};
25+
use object_store::{GetOptions, ObjectMeta, ObjectStore};
2626

2727
/// Implements [`AsyncChunkReader`] to allow reading ORC files via `object_store` API.
2828
pub struct ObjectStoreReader {
@@ -38,16 +38,18 @@ impl ObjectStoreReader {
3838

3939
impl AsyncChunkReader for ObjectStoreReader {
4040
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
41-
async move { Ok(self.file.size as u64) }.boxed()
41+
self.store
42+
.get_opts(&self.file.location, GetOptions::default())
43+
.map(|result| result.map(|x| x.meta.size))
44+
.map_err(|e| e.into())
45+
.boxed()
4246
}
4347

4448
fn get_bytes(
4549
&mut self,
4650
offset_from_start: u64,
4751
length: u64,
4852
) -> BoxFuture<'_, std::io::Result<Bytes>> {
49-
let offset_from_start = offset_from_start as usize;
50-
let length = length as usize;
5153
let range = offset_from_start..(offset_from_start + length);
5254
self.store
5355
.get_range(&self.file.location, range)

0 commit comments

Comments
 (0)