Background: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
Currently dynamic filters don't work with distributed-datafusion. In order to make them work we need upper stages to be able to update the lower stages:
┌─────────────────────────────────────────────────────────────────────────┐
│ Stage 2 │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ │ │
│ │ SortExec (TopK) │ │
│ │ ┌──────────────────────┐ │ │
│ │ │DynamicFilter [ col > │─────────────────────┼─┼─────────────┐
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │
│ │ │ │ │
│ │ NetworkShuffleExec │ │ │
│ │ │ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │ │
└─────────────────────────────────────────────────────────────────────────┘ │
Push
┌─────────────────────────────────────────────────────────────────────────┐ updates
│ Stage 1 │ across
│ │ network
│ ┌─────────────────────────────────────────────────────────────────────┐ │ boundary
│ │ │ │ │
│ │ │ │ │
│ │ RepartitionExec │ │ │
│ │ │ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │
│ │ │ │ │
│ │ DataSourceExec │ │ │
│ │ ┌──────────────────────┐ │ │ │
│ │ │DynamicFilter [ true ]│◀────────────────────┼─┼─────────────┘
│ └────────────────────────┴──────────────────────┴─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
There's several moving pieces to this:
- We need to add some hooks to DataFusion's
DynamicFilterPhysicalExpr to get a callback when a filter is updated.
- We need some way to identify each
DynamicFilterPhysicalExpr so that we can relate them across the wire.
- When we serialize out we need to detect each
DynamicFilterPhysicalExpr and kick off some bookkeeping. I imagine this might be something like hook into updates via (1) and then set up some sort of broadcast channel for updates.
- When we deserialize we need to detect each
DynamicFilterPhysicalExpr and subscribe to updates via its broadcast channel.
The main thing that is not clear in my mind is:
- Is serialization and deserialization the right place to discover all
DynamicFilterPhysicalExprs? Do we have enough information to set up the broadcast / network communication there? If not I think we'd have to add APIs to DataFusion something like ExecutionPlan::filters() to discover all of the dynamic filters. Maybe we can re-use ExecutionPlan::gather_filters_for_pushdown? I'm not sure. I'm also not sure where we would do this discovery from (optimizer rule? some new place? NetworkShuffleExec::execute?)
- How are we going to do the network communication? Some sort of broadcast channel? A gRPC call that just hangs until there is an update?
I also think we should be conscious of how invasive the changes to DataFusion are. It would be great to minimize how much we add APIs and complexity just to support distributed execution.
Background: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
Currently dynamic filters don't work with distributed-datafusion. In order to make them work we need upper stages to be able to update the lower stages:
There's several moving pieces to this:
DynamicFilterPhysicalExprto get a callback when a filter is updated.DynamicFilterPhysicalExprso that we can relate them across the wire.DynamicFilterPhysicalExprand kick off some bookkeeping. I imagine this might be something like hook into updates via (1) and then set up some sort of broadcast channel for updates.DynamicFilterPhysicalExprand subscribe to updates via its broadcast channel.The main thing that is not clear in my mind is:
DynamicFilterPhysicalExprs? Do we have enough information to set up the broadcast / network communication there? If not I think we'd have to add APIs to DataFusion something likeExecutionPlan::filters()to discover all of the dynamic filters. Maybe we can re-useExecutionPlan::gather_filters_for_pushdown? I'm not sure. I'm also not sure where we would do this discovery from (optimizer rule? some new place?NetworkShuffleExec::execute?)I also think we should be conscious of how invasive the changes to DataFusion are. It would be great to minimize how much we add APIs and complexity just to support distributed execution.