Skip to content

Commit 021cc3b

Browse files
authored
feat: session-first DataFusion integration + session resolution policies (delta-io#4145)
# Description Make delta-rs' DataFusion integration consistently honor caller provided sessions and introduce a session first API for registering Delta object stores. **Changes:** Session resolution: - Add `resolve_session_state(...)` & `SessionFallbackPolicy` (`InternalDefaults` / `DeriveFromTrait` / `RequireSessionState`) - Builders expose `with_session_fallback_policy(...)` to control strictness - Migrate operations to use resolver (optimize/merge/update/write/delete) Session first registration: - Add `DeltaSessionExt` trait: - `ensure_object_store_registered(...)` - `ensure_log_store_registered(...)` - Deprecate `DeltaTable::update_datafusion_session(...)` (shim kept for compatibility) Predicate parsing: - Non-`SessionState` sessions can preserve UDFs when configured via `DeriveFromTrait` **Compatibility:** - Default is backward compatible: `SessionFallbackPolicy::InternalDefaults` warns but doesn't break - Strict mode available: `with_session_fallback_policy(RequireSessionState)` errors instead of falling back - `DeltaTable::update_datafusion_session` remains but is deprecated **Tests:** - Regression tests for fallback policy wiring across builders (`RequireSessionState` path) - Existing `deltalake-core` DataFusion test suite passes with `--features datafusion` # Related Issue(s) **Addresses:** - delta-io#4081 - delta-io#4139 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> # Follow ups: - Flip default to `RequireSessionState` (breaking change) - Remove deprecated `DeltaTable::update_datafusion_session` after deprecation window --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent c095787 commit 021cc3b

17 files changed

Lines changed: 946 additions & 108 deletions

File tree

crates/core/src/delta_datafusion/expr.rs

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -194,30 +194,34 @@ pub(crate) struct DeltaContextProvider<'a> {
194194
}
195195

196196
impl<'a> DeltaContextProvider<'a> {
197-
fn new(session: &'a dyn Session) -> Self {
198-
// default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner,
199-
// UserDefinedFunctionPlanner]
197+
fn try_new(session: &'a dyn Session) -> DeltaResult<Self> {
200198
let planners: Vec<Arc<dyn ExprPlanner>> = vec![
201199
Arc::new(CoreFunctionPlanner::default()),
202200
Arc::new(CustomNestedFunctionPlanner::default()),
203201
Arc::new(FieldAccessPlanner),
204202
Arc::new(datafusion::functions::unicode::planner::UnicodeFunctionPlanner),
205203
Arc::new(datafusion::functions::datetime::planner::DatetimeFunctionPlanner),
206204
];
207-
// Disable the above for testing
208-
//let planners = state.expr_planners();
209-
let new_state = session
210-
.as_any()
211-
.downcast_ref::<SessionState>()
212-
.map(|state| SessionStateBuilder::new_from_existing(state.clone()))
213-
.unwrap_or_default()
205+
let (base_state, _) = crate::delta_datafusion::resolve_session_state(
206+
Some(session),
207+
crate::delta_datafusion::SessionFallbackPolicy::DeriveFromTrait,
208+
|| SessionStateBuilder::new().with_default_features().build(),
209+
crate::delta_datafusion::SessionResolveContext {
210+
operation: "parse_predicate_expression",
211+
table_uri: None,
212+
cdc: false,
213+
},
214+
)?;
215+
216+
let new_state = SessionStateBuilder::new_from_existing(base_state)
214217
.with_expr_planners(planners.clone())
215218
.build();
216-
DeltaContextProvider {
219+
220+
Ok(DeltaContextProvider {
217221
planners,
218222
state: new_state,
219223
_phantom: PhantomData,
220-
}
224+
})
221225
}
222226
}
223227

@@ -283,7 +287,7 @@ pub fn parse_predicate_expression(
283287
source: Box::new(err),
284288
})?;
285289

286-
let context_provider = DeltaContextProvider::new(session);
290+
let context_provider = DeltaContextProvider::try_new(session)?;
287291
let sql_to_rel =
288292
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());
289293

@@ -585,7 +589,9 @@ impl fmt::Display for ScalarValueFormat<'_> {
585589
#[cfg(test)]
586590
mod test {
587591
use arrow_schema::DataType as ArrowDataType;
588-
use datafusion::common::{Column, ScalarValue, ToDFSchema};
592+
use datafusion::common::{Column, DFSchema, ScalarValue, ToDFSchema};
593+
use datafusion::execution::SessionStateBuilder;
594+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
589595
use datafusion::functions::core::arrow_cast;
590596
use datafusion::functions::core::expr_ext::FieldAccessor;
591597
use datafusion::functions::encoding::expr_fn::decode;
@@ -594,13 +600,18 @@ mod test {
594600
use datafusion::functions_nested::expr_fn::cardinality;
595601
use datafusion::logical_expr::expr::ScalarFunction;
596602
use datafusion::logical_expr::{BinaryExpr, Cast, Expr, ExprSchemable, col, lit};
597-
use datafusion::prelude::SessionContext;
603+
use datafusion::prelude::{SessionConfig, SessionContext};
598604

599605
use crate::DeltaTable;
606+
use crate::delta_datafusion::planner::DeltaPlanner;
600607
use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
601608
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
609+
use crate::test_utils::datafusion::{WrapperSession, make_test_scalar_udf};
602610

603611
use super::fmt_expr_to_sql;
612+
use super::parse_predicate_expression;
613+
614+
const TEST_UDF_NAME: &str = "delta_rs_parse_expr_test_udf";
604615

605616
struct ParseTest {
606617
expr: Expr,
@@ -722,6 +733,34 @@ mod test {
722733
table
723734
}
724735

736+
fn make_incompatible_session_with_udf() -> WrapperSession {
737+
let runtime_env = RuntimeEnvBuilder::new().build_arc().unwrap();
738+
let config = SessionConfig::new();
739+
let udf = make_test_scalar_udf(TEST_UDF_NAME);
740+
741+
let state = SessionStateBuilder::new()
742+
.with_default_features()
743+
.with_config(config)
744+
.with_runtime_env(runtime_env)
745+
.with_query_planner(DeltaPlanner::new())
746+
.with_scalar_functions(vec![udf])
747+
.build();
748+
749+
WrapperSession::new(state)
750+
}
751+
752+
#[test]
753+
fn parse_predicate_expression_preserves_udfs_for_non_session_state() {
754+
let wrapper = make_incompatible_session_with_udf();
755+
let schema = DFSchema::empty();
756+
757+
let expr = parse_predicate_expression(&schema, format!("{TEST_UDF_NAME}(1) = 1"), &wrapper);
758+
assert!(
759+
expr.is_ok(),
760+
"Expected UDF to be available during parsing but got: {expr:?}"
761+
);
762+
}
763+
725764
#[tokio::test]
726765
async fn test_expr_sql() {
727766
let table = setup_table().await;

crates/core/src/delta_datafusion/mod.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use datafusion::datasource::TableProvider;
4242
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
4343
use datafusion::execution::TaskContext;
4444
use datafusion::execution::context::SessionContext;
45-
use datafusion::execution::runtime_env::RuntimeEnv;
4645
use datafusion::logical_expr::logical_plan::CreateExternalTable;
4746
use datafusion::logical_expr::utils::conjunction;
4847
use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
@@ -52,17 +51,16 @@ use datafusion_proto::logical_plan::LogicalExtensionCodec;
5251
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
5352
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
5453
use either::Either;
55-
use url::Url;
5654

5755
use crate::delta_datafusion::expr::parse_predicate_expression;
5856
use crate::delta_datafusion::table_provider::DeltaScanWire;
5957
use crate::ensure_table_uri;
6058
use crate::errors::{DeltaResult, DeltaTableError};
6159
use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
62-
use crate::logstore::{LogStore, LogStoreRef};
6360
use crate::table::state::DeltaTableState;
6461
use crate::{open_table, open_table_with_storage_options};
6562

63+
pub(crate) use self::session::DeltaSessionExt;
6664
pub use self::session::{
6765
DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
6866
create_session,
@@ -93,6 +91,8 @@ pub mod logical;
9391
pub mod physical;
9492
pub mod planner;
9593
mod session;
94+
pub use session::SessionFallbackPolicy;
95+
pub(crate) use session::{SessionResolveContext, resolve_session_state};
9696
mod table_provider;
9797
pub(crate) mod utils;
9898

@@ -307,14 +307,6 @@ impl DeltaTableState {
307307
}
308308
}
309309

310-
// each delta table must register a specific object store, since paths are internally
311-
// handled relative to the table root.
312-
pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) {
313-
let object_store_url = store.object_store_url();
314-
let url: &Url = object_store_url.as_ref();
315-
env.register_object_store(url, store.object_store(None));
316-
}
317-
318310
pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
319311
match t {
320312
ArrowDataType::Null => Ok(ScalarValue::Null),
@@ -641,6 +633,7 @@ mod tests {
641633
use std::fmt::{self, Debug, Display, Formatter};
642634
use std::ops::Range;
643635
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
636+
use url::Url;
644637

645638
use super::*;
646639

0 commit comments

Comments
 (0)