Skip to content

Commit 9453b2c

Browse files
alambion-elgreco
authored andcommitted
refactor: remove unecessary uses of datafusion subcrates
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 7e8ded1 commit 9453b2c

43 files changed

Lines changed: 237 additions & 265 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,8 @@ parquet = { version = "=55.0.0" }
4545

4646
# datafusion
4747
datafusion = "47.0.0"
48-
datafusion-expr = "47.0.0"
49-
datafusion-common = "47.0.0"
5048
datafusion-ffi = "47.0.0"
51-
datafusion-functions = "47.0.0"
52-
datafusion-functions-aggregate = "47.0.0"
53-
datafusion-physical-expr = "47.0.0"
54-
datafusion-physical-plan = "47.0.0"
5549
datafusion-proto = "47.0.0"
56-
datafusion-sql = "47.0.0"
5750

5851
# serde
5952
serde = { version = "1.0.194", features = ["derive"] }

crates/benchmarks/Cargo.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ serde_json = { workspace = true }
3434

3535
# datafusion
3636
datafusion = { workspace = true }
37-
datafusion-expr = { workspace = true }
38-
datafusion-common = { workspace = true }
39-
datafusion-proto = { workspace = true }
40-
datafusion-sql = { workspace = true }
41-
datafusion-physical-expr = { workspace = true }
4237

4338
[dependencies.deltalake-core]
4439
path = "../core"

crates/benchmarks/src/bin/merge.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use arrow::datatypes::Schema as ArrowSchema;
77
use arrow_array::{RecordBatch, StringArray, UInt32Array};
88
use chrono::Duration;
99
use clap::{command, Args, Parser, Subcommand};
10+
use datafusion::common::DataFusionError;
1011
use datafusion::functions::expr_fn::random;
12+
use datafusion::logical_expr::{cast, col, lit};
1113
use datafusion::{datasource::MemTable, prelude::DataFrame};
12-
use datafusion_common::DataFusionError;
13-
use datafusion_expr::{cast, col, lit};
1414
use deltalake_core::protocol::SaveMode;
1515
use deltalake_core::{
1616
arrow::{
@@ -223,7 +223,7 @@ async fn benchmark_merge_tpcds(
223223

224224
let row_sample = ctx.table("t1").await?.join(
225225
ctx.table("file_sample").await?,
226-
datafusion_common::JoinType::Inner,
226+
datafusion::common::JoinType::Inner,
227227
&["file_path"],
228228
&["file"],
229229
None,
@@ -619,7 +619,7 @@ async fn main() {
619619
before_stats
620620
.join(
621621
after_stats,
622-
datafusion_common::JoinType::Inner,
622+
datafusion::common::JoinType::Inner,
623623
&["before_name"],
624624
&["after_name"],
625625
None,

crates/catalog-unity/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ reqwest-middleware = { version = "0.4.0", features = ["json"] }
3030
rand = "0.8"
3131
dashmap = "6"
3232
datafusion = { workspace = true, optional = true }
33-
datafusion-common = { workspace = true, optional = true }
3433
moka = { version = "0.12", optional = true, features = ["future"] }
3534

3635
[dev-dependencies]
@@ -45,7 +44,7 @@ aws = ["deltalake-aws"]
4544
azure = ["deltalake-azure"]
4645
gcp = ["deltalake-gcp"]
4746
r2 = ["deltalake-aws"]
48-
datafusion = ["dep:datafusion", "datafusion-common", "deltalake-core/datafusion", "moka"]
47+
datafusion = ["dep:datafusion", "deltalake-core/datafusion", "moka"]
4948

5049
[[example]]
5150
name = "uc_example"

crates/catalog-unity/src/datafusion.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use chrono::prelude::*;
44
use dashmap::DashMap;
55
use datafusion::catalog::SchemaProvider;
66
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
7+
use datafusion::common::DataFusionError;
78
use datafusion::datasource::TableProvider;
8-
use datafusion_common::DataFusionError;
99
use futures::FutureExt;
1010
use moka::future::Cache;
1111
use moka::Expiry;
@@ -204,7 +204,10 @@ impl SchemaProvider for UnitySchemaProvider {
204204
self.table_names.clone()
205205
}
206206

207-
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
207+
async fn table(
208+
&self,
209+
name: &str,
210+
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
208211
let maybe_table = self
209212
.client
210213
.get_table(&self.catalog_name, &self.schema_name, name)

crates/catalog-unity/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub enum UnityCatalogError {
109109

110110
#[cfg(feature = "datafusion")]
111111
#[error("Datafusion error: {0}")]
112-
DatafusionError(#[from] datafusion_common::DataFusionError),
112+
DatafusionError(#[from] ::datafusion::common::DataFusionError),
113113

114114
/// Cannot initialize DynamoDbConfiguration due to some sort of threading issue
115115
#[error("Unable to initialize Unity Catalog, potentially a threading issue")]

crates/core/Cargo.toml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,7 @@ pin-project-lite = "^0.2.7"
3737

3838
# datafusion
3939
datafusion = { workspace = true, optional = true }
40-
datafusion-expr = { workspace = true, optional = true }
41-
datafusion-common = { workspace = true, optional = true }
4240
datafusion-proto = { workspace = true, optional = true }
43-
datafusion-sql = { workspace = true, optional = true }
44-
datafusion-physical-expr = { workspace = true, optional = true }
45-
datafusion-physical-plan = { workspace = true, optional = true }
46-
datafusion-functions = { workspace = true, optional = true }
47-
datafusion-functions-aggregate = { workspace = true, optional = true }
4841

4942
# serde
5043
serde = { workspace = true, features = ["derive"] }
@@ -115,14 +108,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
115108
default = ["rustls"]
116109
datafusion = [
117110
"dep:datafusion",
118-
"datafusion-expr",
119-
"datafusion-common",
120111
"datafusion-proto",
121-
"datafusion-physical-expr",
122-
"datafusion-physical-plan",
123-
"datafusion-sql",
124-
"datafusion-functions",
125-
"datafusion-functions-aggregate",
126112
]
127113
datafusion-ext = ["datafusion"]
128114
json = ["parquet/json"]

crates/core/src/data_catalog/storage/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::sync::Arc;
77
use async_trait::async_trait;
88
use dashmap::DashMap;
99
use datafusion::catalog::SchemaProvider;
10+
use datafusion::common::DataFusionError;
1011
use datafusion::datasource::TableProvider;
11-
use datafusion_common::DataFusionError;
1212
use futures::TryStreamExt;
1313
use object_store::ObjectStore;
1414

@@ -59,7 +59,7 @@ impl ListingSchemaProvider {
5959
}
6060

6161
/// Reload table information from ObjectStore
62-
pub async fn refresh(&self) -> datafusion_common::Result<()> {
62+
pub async fn refresh(&self) -> datafusion::common::Result<()> {
6363
let entries: Vec<_> = self.store.list(None).try_collect().await?;
6464
let mut tables = HashSet::new();
6565
for file in entries.iter() {
@@ -110,7 +110,10 @@ impl SchemaProvider for ListingSchemaProvider {
110110
self.tables.iter().map(|t| t.key().clone()).collect()
111111
}
112112

113-
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
113+
async fn table(
114+
&self,
115+
name: &str,
116+
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
114117
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
115118
return Ok(None);
116119
};
@@ -123,7 +126,7 @@ impl SchemaProvider for ListingSchemaProvider {
123126
&self,
124127
_name: String,
125128
_table: Arc<dyn TableProvider>,
126-
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
129+
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
127130
Err(DataFusionError::Execution(
128131
"schema provider does not support registering tables".to_owned(),
129132
))
@@ -132,7 +135,7 @@ impl SchemaProvider for ListingSchemaProvider {
132135
fn deregister_table(
133136
&self,
134137
_name: &str,
135-
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
138+
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
136139
Err(DataFusionError::Execution(
137140
"schema provider does not support deregistering tables".to_owned(),
138141
))

crates/core/src/delta_datafusion/cdf/scan.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ use arrow_schema::{Schema, SchemaRef};
55
use async_trait::async_trait;
66
use datafusion::catalog::Session;
77
use datafusion::catalog::TableProvider;
8+
use datafusion::common::{exec_datafusion_err, Column, DFSchema, Result as DataFusionResult};
89
use datafusion::execution::SessionState;
9-
use datafusion_common::{exec_datafusion_err, Column, DFSchema, Result as DataFusionResult};
10-
use datafusion_expr::utils::conjunction;
11-
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
12-
use datafusion_physical_expr::PhysicalExpr;
13-
use datafusion_physical_plan::filter::FilterExec;
14-
use datafusion_physical_plan::limit::GlobalLimitExec;
15-
use datafusion_physical_plan::projection::ProjectionExec;
16-
use datafusion_physical_plan::ExecutionPlan;
10+
use datafusion::logical_expr::utils::conjunction;
11+
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
12+
use datafusion::physical_expr::PhysicalExpr;
13+
use datafusion::physical_plan::filter::FilterExec;
14+
use datafusion::physical_plan::limit::GlobalLimitExec;
15+
use datafusion::physical_plan::projection::ProjectionExec;
16+
use datafusion::physical_plan::ExecutionPlan;
1717

1818
use crate::DeltaTableError;
1919
use crate::{

crates/core/src/delta_datafusion/cdf/scan_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33

44
use arrow_schema::{DataType, Field, Schema, SchemaRef};
55
use chrono::TimeZone;
6+
use datafusion::common::ScalarValue;
67
use datafusion::datasource::listing::PartitionedFile;
7-
use datafusion_common::ScalarValue;
88
use object_store::path::Path;
99
use object_store::ObjectMeta;
1010
use serde_json::Value;

0 commit comments

Comments
 (0)