Serialize dynamic filters on execution plan nodes (HashJoin, Aggregate, Sort)#2
Conversation
ff17e8a to
00b2a63
Compare
|
Note for reviewers: I'm unsure if I should be using |
| /// Returns the dynamic filter expression for this aggregate, if set. | ||
| pub fn dynamic_filter(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> { | ||
| self.dynamic_filter.as_ref().map(|df| &df.filter) | ||
| } |
There was a problem hiding this comment.
I think it would be cleaner to use apply_expressions (apache#20337), mainly because it's more generic and you can do basically anything with PhysicalExprs inside a plan, including detecting dynamic filters, and you wouldn't need to know beforehand which nodes are producers and consumers -- any custom logic can be done separately in the proto crate. It would also reduce overhead to people who wants to add a new ExecutionPlan that holds a DynamicFilterPhysicalExpr, they'd have to remember to also add the manual dynamic_filter() call here. Implementation for apply_expressions is part of ExecutionPlan and will not be optional, so users will not forget they have to do it in every node.
| pub fn with_dynamic_filter( | ||
| mut self, | ||
| filter: Arc<DynamicFilterPhysicalExpr>, | ||
| ) -> Result<Self> { |
There was a problem hiding this comment.
I see we do something similar for every producer/consumer, a more generic way to modify the expressions would probably implementing map_expressions like suggested here in ExecutionPlan to make it more generic?
Informs: datafusion-contrib/datafusion-distributed#180 Closes: apache#20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec
c5d0e2f to
fef4259
Compare
6cbe847 to
4889d13
Compare
4889d13 to
ed4c611
Compare
Fixups for the cherry-picked commits from PRs apache#19437, apache#20037, apache#20416, and #2 to work with branch-52's partition-index APIs: - Update remap_children callers to use instance method signature - Adapt DynamicFilterUpdate::Global enum for new code paths - Add missing partitioned_exprs/runtime_partition fields to new constructors - Remove null_aware field (not on branch-52) - Replace FilterExecBuilder with FilterExec::try_new - Remove non-compiling tests that depend on upstream-only APIs - Fix duplicate imports in roundtrip test file Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fixups for the cherry-picked commits from PRs apache#19437, apache#20037, apache#20416, and jayshrivastava#2 to work with branch-52's partition-index APIs: - Update remap_children callers to use instance method signature - Adapt DynamicFilterUpdate::Global enum for new code paths - Add missing partitioned_exprs/runtime_partition fields to new constructors - Remove null_aware field (not on branch-52) - Replace FilterExecBuilder with FilterExec::try_new - Remove non-compiling tests that depend on upstream-only APIs - Fix duplicate imports in roundtrip test file Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cb23b01 to
18b0289
Compare
d75e7f8 to
e0ec773
Compare
b419d4c to
dc683d3
Compare
…messages (apache#20387) ## Which issue does this PR close? - Closes apache#20386. ## Rationale for this change `memory_limit` (`RuntimeEnvBuilder::new().with_memory_limit()`) configuration uses `greedy` memory pool as `default`. However, if `memory_pool` (`RuntimeEnvBuilder::new().with_memory_pool()`) is set, it overrides by expected `memory_pool` config such as `fair`. Also, if both `memory_limit` and `memory_pool` configs are not set, `unbounded` memory pool will be used so it can be useful to expose `ultimately used/selected pool` as part of `ResourcesExhausted` error message for the end user awareness and the user may need to switch used memory pool (`greedy`, `fair`, `unbounded`), - Also, [this comparison table](lance-format/lance#3601 (comment)) is an example use-case for both `greedy` and `fair` memory pools runtime behaviors and this addition can help for this kind of comparison table by exposing used memory pool info as part of native logs. Please find following example use-cases by `datafusion-cli`: **Case1**: datafusion-cli result when `memory-limit` and `top-memory-consumers > 0` are set: ``` eren.avsarogullari@AWGNPWVK961 debug % ./datafusion-cli --memory-limit 10M --command 'select * from generate_series(1,500000) as t1(v1) order by v1;' --top-memory-consumers 3 DataFusion CLI v53.0.0 Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'. caused by Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: ExternalSorterMerge[0]#2(can spill: false) consumed 10.0 MB, peak 10.0 MB, DataFusion-Cli#0(can spill: false) consumed 0.0 B, peak 0.0 B, ExternalSorter[0]#1(can spill: true) consumed 0.0 B, peak 0.0 B. Error: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: greedy(used: 10.0 MB, pool_size: 10.0 MB) ``` **Case2**: datafusion-cli result when `memory-limit` and `top-memory-consumers = 0` (disabling top memory consumers logging) are set: ``` eren.avsarogullari@AWGNPWVK961 debug % ./datafusion-cli --memory-limit 10M --command 'select * from generate_series(1,500000) as t1(v1) order by v1;' --top-memory-consumers 0 DataFusion CLI v53.0.0 Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'. caused by Resources exhausted: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: greedy(used: 10.0 MB, pool_size: 10.0 MB) ``` **Case3**: datafusion-cli result when only `memory-limit`, `memory-pool` and `top-memory-consumers > 0` are set: ``` eren.avsarogullari@AWGNPWVK961 debug % ./datafusion-cli --memory-limit 10M --mem-pool-type fair --top-memory-consumers 3 --command 'select * from generate_series(1,500000) as t1(v1) order by v1;' DataFusion CLI v53.0.0 Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'. caused by Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: ExternalSorterMerge[0]#2(can spill: false) consumed 10.0 MB, peak 10.0 MB, ExternalSorter[0]#1(can spill: true) consumed 0.0 B, peak 0.0 B, DataFusion-Cli#0(can spill: false) consumed 0.0 B, peak 0.0 B. Error: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 10.0 MB) ``` ## What changes are included in this PR? - Adding name property to MemoryPool instances, - Expose used MemoryPool info to Resources Exhausted error messages ## Are these changes tested? Yes and updating existing test cases. ## Are there any user-facing changes? Yes, being updated Resources Exhausted error messages.
Which issue does this PR close?
Informs: datafusion-contrib/datafusion-distributed#180
Follow up for: apache#20416
Rationale for this change
I'm interested in serializing a physical plan (post-physical optimizer) and executing it on a remote node. To do so, I need dynamic filters and references/pointers to dynamic filters to be preserved in the plan. Currently, nodes which produce filters such as
HashJoinExec,AggregateExec, andSortExec, do not serialize their dynamic filters.This change intends to update the above nodes to serialize dynamic filters and adds tests for the scenario above.
What changes are included in this PR?
Proto schema (datafusion.proto)
Added PhysicalExprNode dynamic_filter field to:
Plan node public API
Added
with_dynamic_filter()anddynamic_filter()toHashJoinExec,AggregateExec,SortExec.with_dynamic_filter()alwaysSerde
Using the new plan node public APIs above
dynamic_filter()and serializes it via the proto converter
downcasts to
DynamicFilterPhysicalExpr, and sets it on the nodeAre these changes tested?
HashJoinExec,AggregateExec, andSortExec).with_dynamic_filter()anddynamic_filter()ondynamic_filter()toHashJoinExec,AggregateExec,SortExec.