-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Compute Dynamic Filters only when a consumer supports them #19546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
e116a9e
e6130c8
1097b7d
061d2b1
c33c073
bb1db12
392b7c9
0ef0893
95581d3
a035dd1
c1712bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -508,6 +508,17 @@ impl HashJoinExec { | |
| self.null_equality | ||
| } | ||
|
|
||
| /// Get the dynamic filter expression for testing purposes. | ||
| /// Returns `None` if no dynamic filter has been set. | ||
| /// | ||
| /// This method is intended for testing only and should not be used in production code. | ||
| #[doc(hidden)] | ||
| pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> { | ||
| self.dynamic_filter | ||
| .as_ref() | ||
| .map(|df| Arc::clone(&df.filter)) | ||
| } | ||
|
|
||
| /// Calculate order preservation flags for this hash join. | ||
| fn maintains_input_order(join_type: JoinType) -> Vec<bool> { | ||
| vec![ | ||
|
|
@@ -921,7 +932,21 @@ impl ExecutionPlan for HashJoinExec { | |
| consider using CoalescePartitionsExec or the EnforceDistribution rule" | ||
| ); | ||
|
|
||
| let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); | ||
| // Only enable dynamic filter pushdown if: | ||
| // - The session config enables dynamic filter pushdown | ||
| // - A dynamic filter exists | ||
| // - At least one consumer is holding a reference to it, this avoids expensive filter | ||
| // computation when disabled or when no consumer will use it. | ||
| let enable_dynamic_filter_pushdown = context | ||
| .session_config() | ||
| .options() | ||
| .optimizer | ||
| .enable_join_dynamic_filter_pushdown | ||
| && self | ||
| .dynamic_filter | ||
| .as_ref() | ||
| .map(|df| df.filter.is_used()) | ||
| .unwrap_or(false); | ||
|
|
||
| let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); | ||
| let left_fut = match self.mode { | ||
|
|
@@ -4610,6 +4635,11 @@ mod tests { | |
| let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); | ||
| let dynamic_filter_clone = Arc::clone(&dynamic_filter); | ||
|
|
||
| // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) | ||
| let _consumer = Arc::clone(&dynamic_filter) | ||
| .with_new_children(vec![]) | ||
| .unwrap(); | ||
|
Comment on lines
+4639
to
+4641
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to add a consumer in these tests, otherwise |
||
|
|
||
| // Create HashJoinExec with the dynamic filter | ||
| let mut join = HashJoinExec::try_new( | ||
| left, | ||
|
|
@@ -4658,6 +4688,11 @@ mod tests { | |
| let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); | ||
| let dynamic_filter_clone = Arc::clone(&dynamic_filter); | ||
|
|
||
| // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) | ||
| let _consumer = Arc::clone(&dynamic_filter) | ||
| .with_new_children(vec![]) | ||
| .unwrap(); | ||
|
|
||
| // Create HashJoinExec with the dynamic filter | ||
| let mut join = HashJoinExec::try_new( | ||
| left, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.