Skip to content

Commit a9720e8

Browse files
committed
Simplify wait_complete function (apache#19937)
## Which issue does this PR close? ## Rationale for this change The current v52 signature `pub async fn wait_complete(self: &Arc<Self>)` (introduced in apache#19546) is a bit unergonomic. The method requires `&Arc<DynamicFilterPhysicalExpr>`, but when working with `Arc<dyn PhysicalExpr>`, downcasting only gives you `&DynamicFilterPhysicalExpr`. Since you can't convert `&DynamicFilterPhysicalExpr` to `Arc<DynamicFilterPhysicalExpr>`, the method becomes impossible to call. The `&Arc<Self>` param was used to check` is_used()` via Arc strong count, but this was overly defensive. ## What changes are included in this PR? - Changed `DynamicFilterPhysicalExpr::wait_complete` signature from `pub async fn wait_complete(self: &Arc<Self>)` to `pub async fn wait_complete(&self)`. - Removed the `is_used()` check from `wait_complete()` - this method, like `wait_update()`, should only be called on filters that have consumers. If the caller doesn't know whether the filter has consumers, they should call `is_used()` first to avoid waiting indefinitely. This approach avoids complex signatures and dependencies between the APIs methods. ## Are these changes tested? Yes, existing tests cover this functionality, I removed the "mock" consumer from `test_hash_join_marks_filter_complete_empty_build_side` and `test_hash_join_marks_filter_complete` since the fix in apache#19734 makes is_used check the outer struct `strong_count` as well. ## Are there any user-facing changes? The signature of `wait_complete` changed. (cherry picked from commit bef1368)
1 parent 59667b8 commit a9720e8

2 files changed

Lines changed: 10 additions & 17 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ impl DynamicFilterPhysicalExpr {
272272
///
273273
/// This method will return when [`Self::update`] is called and the generation increases.
274274
/// It does not guarantee that the filter is complete.
275+
///
276+
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
277+
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
278+
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
275279
pub async fn wait_update(&self) {
276280
let mut rx = self.state_watch.subscribe();
277281
// Get the current generation
@@ -283,17 +287,16 @@ impl DynamicFilterPhysicalExpr {
283287

284288
/// Wait asynchronously until this dynamic filter is marked as complete.
285289
///
286-
/// This method returns immediately if the filter is already complete or if the filter
287-
/// is not being used by any consumers.
290+
/// This method returns immediately if the filter is already complete.
288291
/// Otherwise, it waits until [`Self::mark_complete`] is called.
289292
///
290293
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
291294
/// the filter is fully complete with no more updates expected.
292-
pub async fn wait_complete(self: &Arc<Self>) {
293-
if !self.is_used() {
294-
return;
295-
}
296-
295+
///
296+
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
297+
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
298+
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
299+
pub async fn wait_complete(&self) {
297300
if self.inner.read().is_complete {
298301
return;
299302
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4633,11 +4633,6 @@ mod tests {
46334633
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
46344634
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
46354635

4636-
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4637-
let _consumer = Arc::clone(&dynamic_filter)
4638-
.with_new_children(vec![])
4639-
.unwrap();
4640-
46414636
// Create HashJoinExec with the dynamic filter
46424637
let mut join = HashJoinExec::try_new(
46434638
left,
@@ -4686,11 +4681,6 @@ mod tests {
46864681
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
46874682
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
46884683

4689-
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4690-
let _consumer = Arc::clone(&dynamic_filter)
4691-
.with_new_children(vec![])
4692-
.unwrap();
4693-
46944684
// Create HashJoinExec with the dynamic filter
46954685
let mut join = HashJoinExec::try_new(
46964686
left,

0 commit comments

Comments
 (0)