Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1488,12 +1488,13 @@ impl SessionContext {
})?;

let state = self.state.read();
let context = SimplifyContext::default()
let context = SimplifyContext::builder()
.with_schema(Arc::clone(prepared.plan.schema()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend we also deprecate SimplifyContext::with... methods (can do it as a follow on PR) and direct people to use the builder in their own code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes perfect sense, I'll add it in this one

.with_config_options(Arc::clone(state.config_options()))
.with_query_execution_start_time(
state.execution_props().query_execution_start_time,
);
)
.build();
let simplifier = ExprSimplifier::new(context);

// Only allow literals as parameters for now.
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,12 +743,13 @@ impl SessionState {
df_schema: &DFSchema,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
let config_options = self.config_options();
let simplify_context = SimplifyContext::default()
let simplify_context = SimplifyContext::builder()
.with_schema(Arc::new(df_schema.clone()))
.with_config_options(Arc::clone(config_options))
.with_query_execution_start_time(
self.execution_props().query_execution_start_time,
);
)
.build();
let simplifier = ExprSimplifier::new(simplify_context);
// apply type coercion here to ensure types match
let mut expr = simplifier.coerce(expr, df_schema)?;
Expand Down Expand Up @@ -1835,11 +1836,12 @@ impl ContextProvider for SessionContextProvider<'_> {
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
let simplify_context = SimplifyContext::default()
let simplify_context = SimplifyContext::builder()
.with_config_options(Arc::clone(self.state.config_options()))
.with_query_execution_start_time(
self.state.execution_props().query_execution_start_time,
);
)
.build();
let simplifier = ExprSimplifier::new(simplify_context);
let schema = DFSchema::empty();
let args = args
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ impl TestParquetFile {
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;

// run coercion on the filters to coerce types etc.
let context = SimplifyContext::default().with_schema(Arc::clone(&df_schema));
let context = SimplifyContext::builder()
.with_schema(Arc::clone(&df_schema))
.build();
if let Some(filter) = maybe_filter {
let simplifier = ExprSimplifier::new(context);
let filter = simplifier.coerce(filter, &df_schema).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,9 @@ fn create_simplified_expr_test(expr: Expr, expected_expr: &str) {
let df_schema = DFSchema::try_from(batch.schema()).unwrap();

// Simplify the expression first
let simplify_context = SimplifyContext::default().with_schema(Arc::new(df_schema));
let simplify_context = SimplifyContext::builder()
.with_schema(Arc::new(df_schema))
.build();
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
let simplified = simplifier.simplify(expr).unwrap();
create_expr_test(simplified, expected_expr);
Expand Down
21 changes: 13 additions & 8 deletions datafusion/core/tests/expr_api/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ fn test_evaluate_with_start_time(
expected_expr: Expr,
date_time: &DateTime<Utc>,
) {
let context = SimplifyContext::default()
let context = SimplifyContext::builder()
.with_schema(schema())
.with_query_execution_start_time(Some(*date_time));
.with_query_execution_start_time(Some(*date_time))
.build();
let simplifier = ExprSimplifier::new(context);
let simplified_expr = simplifier
.simplify(input_expr.clone())
Expand Down Expand Up @@ -153,9 +154,10 @@ fn to_timestamp_expr(arg: impl Into<String>) -> Expr {

#[test]
fn basic() {
let context = SimplifyContext::default()
let context = SimplifyContext::builder()
.with_schema(schema())
.with_query_execution_start_time(Some(Utc::now()));
.with_query_execution_start_time(Some(Utc::now()))
.build();

// The `Expr` is a core concept in DataFusion, and DataFusion can
// help simplify it.
Expand All @@ -171,7 +173,7 @@ fn basic() {

#[test]
fn fold_and_simplify() {
let context = SimplifyContext::default().with_schema(schema());
let context = SimplifyContext::builder().with_schema(schema()).build();

// What will it do with the expression `concat('foo', 'bar') == 'foobar')`?
let expr = concat(vec![lit("foo"), lit("bar")]).eq(lit("foobar"));
Expand Down Expand Up @@ -565,7 +567,9 @@ fn expr_test_schema() -> DFSchemaRef {
}

fn test_simplify(input_expr: Expr, expected_expr: Expr) {
let context = SimplifyContext::default().with_schema(expr_test_schema());
let context = SimplifyContext::builder()
.with_schema(expr_test_schema())
.build();
let simplifier = ExprSimplifier::new(context);
let simplified_expr = simplifier
.simplify(input_expr.clone())
Expand All @@ -581,9 +585,10 @@ fn test_simplify_with_cycle_count(
expected_expr: Expr,
expected_count: u32,
) {
let context = SimplifyContext::default()
let context = SimplifyContext::builder()
.with_schema(expr_test_schema())
.with_query_execution_start_time(Some(Utc::now()));
.with_query_execution_start_time(Some(Utc::now()))
.build();
let simplifier = ExprSimplifier::new(context);
let (simplified_expr, count) = simplifier
.simplify_with_cycle_count_transformed(input_expr.clone())
Expand Down
88 changes: 88 additions & 0 deletions datafusion/expr/src/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ pub struct SimplifyContext {
config_options: Arc<ConfigOptions>,
}

/// Builder for [`SimplifyContext`].
#[derive(Debug, Default)]
pub struct SimplifyContextBuilder {
schema: Option<DFSchemaRef>,
query_execution_start_time: Option<DateTime<Utc>>,
config_options: Option<Arc<ConfigOptions>>,
}

impl Default for SimplifyContext {
fn default() -> Self {
Self {
Expand All @@ -51,6 +59,11 @@ impl Default for SimplifyContext {
}

impl SimplifyContext {
/// Returns a builder for [`SimplifyContext`].
pub fn builder() -> SimplifyContextBuilder {
SimplifyContextBuilder::default()
}

/// Set the [`ConfigOptions`] for this context
pub fn with_config_options(mut self, config_options: Arc<ConfigOptions>) -> Self {
self.config_options = config_options;
Expand Down Expand Up @@ -110,6 +123,46 @@ impl SimplifyContext {
}
}

impl SimplifyContextBuilder {
/// Set the [`ConfigOptions`] for this context.
pub fn with_config_options(mut self, config_options: Arc<ConfigOptions>) -> Self {
self.config_options = Some(config_options);
self
}

/// Set the schema for this context.
pub fn with_schema(mut self, schema: DFSchemaRef) -> Self {
self.schema = Some(schema);
self
}

/// Set the query execution start time.
pub fn with_query_execution_start_time(
mut self,
query_execution_start_time: Option<DateTime<Utc>>,
) -> Self {
self.query_execution_start_time = query_execution_start_time;
self
}

/// Set the query execution start to the current time.
pub fn with_current_time(mut self) -> Self {
self.query_execution_start_time = Some(Utc::now());
self
}

/// Build a [`SimplifyContext`], filling in any unspecified fields with defaults.
pub fn build(self) -> SimplifyContext {
SimplifyContext {
schema: self.schema.unwrap_or_else(|| Arc::new(DFSchema::empty())),
query_execution_start_time: self.query_execution_start_time,
config_options: self
.config_options
.unwrap_or_else(|| Arc::new(ConfigOptions::default())),
}
}
}

/// Was the expression simplified?
#[derive(Debug)]
pub enum ExprSimplifyResult {
Expand All @@ -119,3 +172,38 @@ pub enum ExprSimplifyResult {
/// are return unmodified.
Original(Vec<Expr>),
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn simplify_context_builder_builds_default_context() {
let context = SimplifyContext::builder().build();
let default_options = ConfigOptions::default();

assert_eq!(context.schema().as_ref(), &DFSchema::empty());
assert_eq!(context.query_execution_start_time(), None);
assert_eq!(
context.config_options().optimizer.max_passes,
default_options.optimizer.max_passes
);
}

#[test]
fn simplify_context_builder_uses_overrides() {
let schema = Arc::new(DFSchema::empty());
let config_options = Arc::new(ConfigOptions::default());
let current_time = Utc::now();

let context = SimplifyContext::builder()
.with_schema(Arc::clone(&schema))
.with_config_options(Arc::clone(&config_options))
.with_query_execution_start_time(Some(current_time))
.build();

assert_eq!(context.schema().as_ref(), schema.as_ref());
assert_eq!(context.query_execution_start_time(), Some(current_time));
assert!(Arc::ptr_eq(context.config_options(), &config_options));
}
}
11 changes: 6 additions & 5 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ mod tests {
DataFusionError, ScalarValue, config::ConfigOptions,
utils::SingleRowListArrayBuilder,
};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, col, lit,
simplify::ExprSimplifyResult,
Expand All @@ -1017,7 +1018,7 @@ mod tests {
.build_list_scalar());
let needle = col("c");

let context = datafusion_expr::simplify::SimplifyContext::default();
let context = SimplifyContext::default();

let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
Expand All @@ -1040,7 +1041,7 @@ mod tests {
let haystack = make_array(vec![lit(1), lit(2), lit(3)]);
let needle = col("c");

let context = datafusion_expr::simplify::SimplifyContext::default();
let context = SimplifyContext::default();

let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
Expand All @@ -1063,7 +1064,7 @@ mod tests {
let haystack = Expr::Literal(ScalarValue::Null, None);
let needle = col("c");

let context = datafusion_expr::simplify::SimplifyContext::default();
let context = SimplifyContext::default();
let Ok(ExprSimplifyResult::Simplified(simplified)) =
ArrayHas::new().simplify(vec![haystack, needle], &context)
else {
Expand All @@ -1080,7 +1081,7 @@ mod tests {
let haystack = Expr::Literal(ScalarValue::List(Arc::new(haystack)), None);
let needle = col("c");

let context = datafusion_expr::simplify::SimplifyContext::default();
let context = SimplifyContext::default();
let Ok(ExprSimplifyResult::Simplified(simplified)) =
ArrayHas::new().simplify(vec![haystack, needle], &context)
else {
Expand All @@ -1095,7 +1096,7 @@ mod tests {
let haystack = col("c1");
let needle = col("c2");

let context = datafusion_expr::simplify::SimplifyContext::default();
let context = SimplifyContext::default();

let Ok(ExprSimplifyResult::Original(args)) =
ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/functions/src/datetime/current_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ mod tests {
Some(tz.to_string())
};
let schema = Arc::new(DFSchema::empty());
SimplifyContext::default()
SimplifyContext::builder()
.with_schema(schema)
.with_config_options(Arc::new(config))
.with_query_execution_start_time(Some(start_time))
.build()
}

#[test]
Expand Down
10 changes: 7 additions & 3 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ fn agg_exprs_evaluation_result_on_empty_batch(
.data()?;

let result_expr = result_expr.unalias();
let info = SimplifyContext::default().with_schema(Arc::clone(schema));
let info = SimplifyContext::builder()
.with_schema(Arc::clone(schema))
.build();
let simplifier = ExprSimplifier::new(info);
let result_expr = simplifier.simplify(result_expr)?;
expr_result_map_for_count_bug.insert(e.schema_name().to_string(), result_expr);
Expand Down Expand Up @@ -541,7 +543,9 @@ fn proj_exprs_evaluation_result_on_empty_batch(
.data()?;

if result_expr.ne(expr) {
let info = SimplifyContext::default().with_schema(Arc::clone(schema));
let info = SimplifyContext::builder()
.with_schema(Arc::clone(schema))
.build();
let simplifier = ExprSimplifier::new(info);
let result_expr = simplifier.simplify(result_expr)?;
let expr_name = match expr {
Expand Down Expand Up @@ -581,7 +585,7 @@ fn filter_exprs_evaluation_result_on_empty_batch(
.data()?;

let pull_up_expr = if result_expr.ne(filter_expr) {
let info = SimplifyContext::default().with_schema(schema);
let info = SimplifyContext::builder().with_schema(schema).build();
let simplifier = ExprSimplifier::new(info);
let result_expr = simplifier.simplify(result_expr)?;
match &result_expr {
Expand Down
Loading
Loading