Skip to content
94 changes: 94 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3512,3 +3512,97 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
",
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_not_used() {
Comment thread
LiaCastaneda marked this conversation as resolved.
Outdated
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Configure session with dynamic filter pushdown enabled
let session_config = SessionConfig::default()
.with_batch_size(10)
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
.set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true);

let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(vec![
record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"])).unwrap(),
])
.build();

let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(false) // probe side does not support dynamic filtering
.with_batches(vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"])
)
.unwrap(),
])
.build();

let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Apply filter pushdown optimization
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Execute the plan to trigger is_used() check
let session_ctx = SessionContext::new_with_config(session_config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// After execution, the dynamic filter should remain empty because is_used() returns false.
// Even though dynamic filter pushdown is enabled, the filter is not populated because
// the probe side doesn't support it (no consumer holds a reference to the inner Arc).
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=false
"
);
}
57 changes: 57 additions & 0 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,22 @@ impl DynamicFilterPhysicalExpr {
.await;
}

/// Check if this dynamic filter is being actively used by any consumers.
///
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
/// that created the filter). This is useful to avoid computing expensive filter
/// expressions when no consumer will actually use them.
///
/// Note: We check the inner Arc's strong_count, not the outer Arc's count, because
/// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown),
/// new outer Arc instances are created via with_new_children(), but they all share the
/// same inner Arc<RwLock<Inner>>. This is what allows filter updates to propagate to
/// consumers even after transformation.
pub fn is_used(self: &Arc<Self>) -> bool {
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
Arc::strong_count(&self.inner) > 1
}

fn render(
&self,
f: &mut std::fmt::Formatter<'_>,
Expand Down Expand Up @@ -691,4 +707,45 @@ mod test {
"Expected b + d = [1010, 2020, 3030], got {arr_2:?}",
);
}

#[test]
fn test_is_used() {
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(true) as Arc<dyn PhysicalExpr>,
));

// Initially, only one reference to the inner Arc exists
assert!(
!filter.is_used(),
"Filter should not be used with only one inner reference"
);

// Simulate a consumer created via transformation (what happens during filter pushdown).
// When filters are pushed down and transformed via reassign_expr_columns/transform_down,
// with_new_children() is called which creates a new outer Arc but clones the inner Arc.
let consumer1_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
let _consumer1 = consumer1_expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Should be DynamicFilterPhysicalExpr");

// Now the inner Arc is shared (inner_count = 2)
assert!(
filter.is_used(),
"Filter should be used when inner Arc is shared with transformed consumer"
);

// Create another transformed consumer
let consumer2_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
let _consumer2 = consumer2_expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Should be DynamicFilterPhysicalExpr");

assert!(
filter.is_used(),
"Filter should still be used with multiple consumers"
);
}
}
16 changes: 15 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,21 @@ impl ExecutionPlan for HashJoinExec {
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);

let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
// - A dynamic filter exists
// - At least one consumer is holding a reference to it, this avoids expensive filter
// computation when disabled or when no consumer will use it.
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
Expand Down
Loading