Skip to content

proto: serialize and dedupe dynamic filters v2#21807

Open
jayshrivastava wants to merge 6 commits intoapache:mainfrom
jayshrivastava:js/dedupe-dynamic-filter-inner-state-v2
Open

proto: serialize and dedupe dynamic filters v2#21807
jayshrivastava wants to merge 6 commits intoapache:mainfrom
jayshrivastava:js/dedupe-dynamic-filter-inner-state-v2

Conversation

@jayshrivastava
Copy link
Copy Markdown
Contributor

@jayshrivastava jayshrivastava commented Apr 23, 2026

Which issue does this PR close?

Informs: datafusion-contrib/datafusion-distributed#180
Closes: #20418

Rationale for this change

Consider you have a plan with a HashJoinExec and DataSourceExec

HashJoinExec(dynamic_filter_1 on a@0)
  (...left side of join)
  ProjectionExec(a := Column("a", source_index))
    DataSourceExec
      ParquetSource(predicate = dynamic_filter_2)

You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning:

  1. When you deserialize the plan, both the HashJoinExec and DataSourceExec should have pointers to the same DynamicFilterPhysicalExpr
  2. The DynamicFilterPhysicalExpr should be updated during execution by the HashJoinExec and the DataSourceExec should filter out rows

This does not happen today for a few reasons, a couple of which this PR aims to address

  1. DynamicFilterPhysicalExpr is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as Literal) due to the PhysicalExpr::snapshot() API
  2. Even if DynamicFilterPhysicalExpr survives round-tripping, the one pushed down to the DataSourceExec often has different children. In this case, you have two DynamicFilterPhysicalExpr which
    do not survive deduping, causing referential integrity to be lost.

What changes are included in this PR?

This PR aims to fix those problems by:

  1. Removing the snapshot() call from the serialization process
  2. Adding protos for DynamicFilterPhysicalExpr so it can be serialized and deserialized
  3. Removing Arc-based deduplication. We now only dedupe on
    expression_id if the PhysicalExpr reports a expression_id.
    After this change, only DynamicFilterPhysicalExpr reports an expression_id
    to be deduped.
  4. expression_id is now just a random u64. Since a given query likely
    only has a few DynamicFilterPhysicalExpr instances, the odds of a
    collision are very low
  5. There's no need for a DedupingSerializer anymore since the
    expression_id is already stored in the dynamic filter proto itself

Future work:

  1. Serialize dynamic filters in HashJoinExec, AggregateExec and SortExec
  2. Add tests which actually execute plans after deserialization and assert that dynamic filtering is functional
  3. Add proto converters to the PhysicalExtensionCodec trait so implementors can utilize deduping logic

Are these changes tested?

  • adds tests which roundtrip dynamic filters and assert that referential
    integrity is maintained
  • removes tests that test Arc-based deduplication and session id
    rotation since we don't support that anymore

Are there any user-facing changes?

  • The default codec does not call snapshot() on PhysicalExpr during serialization anymore. This means that DynamicFilterPhysicalExpr are now serialized and deserialized without snapshotting.
  • All PhysicalExpr are not deduped anymore. Only DynamicFilterPhysicalExpr is

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates proto Related to proto crate physical-plan Changes to the physical-plan crate labels Apr 23, 2026
@jayshrivastava jayshrivastava force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch from 004aa52 to 23fc7f1 Compare April 23, 2026 23:33
@github-actions github-actions Bot removed the physical-plan Changes to the physical-plan crate label Apr 23, 2026

/// Internal serializer that adds expr_id to expressions.
/// Created fresh for each serialization operation.
struct DeduplicatingSerializer {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the DynamicFilterPhysicalExpr self-generates an expr id and exposes it via expression_id, we don't need this anymore. We can just dedupe during deserialization.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per comment above I think we should keep expr_id on PhysicalExprNode. But I think the drop in complexity and the fact that this is added to the public PhysicalExpr trait justifies dropping this specialized serializer and instead having the default one call PhysicalExpr::expr_id() and put that into the proto.

@jayshrivastava jayshrivastava force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch 2 times, most recently from 64b5889 to c042c08 Compare April 24, 2026 00:14
@jayshrivastava jayshrivastava changed the title Js/dedupe dynamic filter inner state v2 proto: serialize and dedupe dynamic filters Apr 24, 2026
@jayshrivastava jayshrivastava marked this pull request as ready for review April 24, 2026 00:17
@jayshrivastava jayshrivastava changed the title proto: serialize and dedupe dynamic filters proto: serialize and dedupe dynamic filters v2 Apr 24, 2026

/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
/// serialization / deserialization.
pub struct DynamicFilterSnapshot {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary or makes things any simpler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that this is for atomicity. That's a good bug catch! But that sort of thing should be called out in the docstring, otherwise readers need to dig into implementations to understand why this module exists.

It's an unfortunate situation that all expressions / execution nodes have to make their internal details public to all callers just so that proto serialization can serialize them. I think we (DataFusion) should at least consider some hybrid where we split the proto crate into proto-models and proto so that anything can depend on proto-models (behind a feature flag?) and thus expressions can define their own serialization into prost structs. cc @alamb

All of this said: maybe it would be easier to just make Inner public and have an inner() -> Inner method that clones it? I would suggest if we do this we add warnings in docstrings to not assume this will always be public/stable, it's only for proto, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added public methods and added warnings in the docstrings.

Copy link
Copy Markdown
Contributor

@adriangb adriangb Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I opened #21835 to track the idea of splitting up proto ser to avoid leaking internal state all over

Comment on lines +497 to +500
/// Generate a new expression id for this filter.
pub fn new_expression_id() -> u64 {
random::<u64>()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a public function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Made it private.

Comment thread datafusion/proto/proto/datafusion.proto Outdated
Comment on lines +878 to +879
// Was expr_id
reserved 30;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this? For reasons previously discussed it might be helpful to have an expression id on arbitrary expressions, e.g. to deduplicate large lists or strings. Leaving it here also minimizes changes / breakage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the PhysicalExpr has expression_id and the actual implementations like DynamicFilterPhysicalExpr store it, it felt natural to have expression_ids live in the implementor's proto (ie. PhysicalDynamicFilterNode) rather than here, one level up. This is more significant now since we don't dedupe every PhysicalExpr anymore. I thought "If only dynamic filters have this, then it can live in the dynamic filter proto".

It also avoids having to put expr_id: None everywhere.

I'll defer to your judgement but those were my thoughts.


/// Internal serializer that adds expr_id to expressions.
/// Created fresh for each serialization operation.
struct DeduplicatingSerializer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per comment above I think we should keep expr_id on PhysicalExprNode. But I think the drop in complexity and the fact that this is added to the public PhysicalExpr trait justifies dropping this specialized serializer and instead having the default one call PhysicalExpr::expr_id() and put that into the proto.

)),
};
return Ok(protobuf::PhysicalExprNode {
expr_id: None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping expr_id will also reduce the diff.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

/// [`PhysicalExpr::with_new_children`].
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
self.remapped_children.as_deref()
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two getters aren't horrible to keep around, so I didn't mark them as unstable. Lmk if you think we should add

/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think defaulting to at least marking things as private via comments is a good defensive approach. The situation with proto serialization is unfortunate, I'll see if I can do something about it...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method looks like a bit of a smell but it gets the job done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately I think this is once again just the shape the current proto stuff forces everything into.

Copy link
Copy Markdown
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very nice @jayshrivastava 👍🏻

Lets leave this open for a day to see if anyone has thoughts on the public APIs being tweaked.

cc @stuhood iirc you mentioned you also plan to ser/de dynamic filters across process (but not node) boundaries

Comment on lines +167 to +168
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
/// the identifier should be preserved by the new expression.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately I think this is once again just the shape the current proto stuff forces everything into.

@jayshrivastava
Copy link
Copy Markdown
Contributor Author

jayshrivastava commented Apr 24, 2026

@adriangb I've addressed the comments in the last commit. Let me know what you think!

@jayshrivastava
Copy link
Copy Markdown
Contributor Author

Oh TYFR! That was fast

Lets leave this open for a day to see if anyone has thoughts on the public APIs being tweaked.

No problem. I can merge some time next week 👍🏽

Informs: datafusion-contrib/datafusion-distributed#180
Closes: apache#20418

Consider you have a plan with a `HashJoinExec` and `DataSourceExec`
```
HashJoinExec(dynamic_filter_1 on a@0)
  (...left side of join)
  ProjectionExec(a := Column("a", source_index))
    DataSourceExec
      ParquetSource(predicate = dynamic_filter_2)
```

You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning:
1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr`
2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec`  and the `DataSourceExec` should filter out rows

This does not happen today for a few reasons, a couple of which this PR aims to address
1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API
2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which
do not survive deduping, causing referential integrity to be lost.

This PR aims to fix those problems by:
1. Removing the `snapshot()` call from the serialization process
2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized
3. Removing `Arc`-based deduplication. We now only dedupe on
   `expression_id` if the `PhysicalExpr` reports a `expression_id`.
   After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id`
   to be deduped.
4. `expression_id` is now just a random u64. Since a given query likely
   only has a few `DynamicFilterPhysicalExpr` instances, the odds of a
   collision are very low
5. There's no need for a `DedupingSerializer` anymore since the
   `expression_id` is already stored in the dynamic filter proto itself

Testing
- adds tests which roundtrip dynamic filters and assert that referential
  integrity is maintained
- removes tests that test `Arc`-based deduplication and session id
  rotation
@adriangb adriangb force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch from 586648b to 09c2d34 Compare April 24, 2026 19:47
@adriangb
Copy link
Copy Markdown
Contributor

FYI I hit rebase

Copy link
Copy Markdown
Contributor

@stuhood stuhood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread datafusion/physical-expr-common/src/physical_expr.rs Outdated
Comment on lines +68 to 73
/// Unique identifier for this dynamic filter.
///
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
expression_id: u64,
/// The source of dynamic filters.
inner: Arc<RwLock<Inner>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if it were clearer exactly how the Inner relates to the expression_id.

With (mostly) an outsider's perspective, it seems like the expression_id in this case should actually literally be an id for the inner state? So as with #21650 , it feels like this is more like a mutable_state_id, perhaps?

Because you might have two different DynamicFilterPhysicalExpr (different wrapping expressions) wrapped around the same inner state... and at that point, the expression_id is not an "id for the expression".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you might have two different DynamicFilterPhysicalExpr (different wrapping expressions) wrapped around the same inner state... and at that point, the expression_id is not an "id for the expression".

I think the inner state is actually the real expression. The only thing that may change via wrapping is the children expressions. Ex. if you call reassign_expr_columns. When you update a dynamic filter, the new expression is written directly into Inner::expr

Copy link
Copy Markdown
Contributor Author

@jayshrivastava jayshrivastava Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With (mostly) an outsider's perspective, it seems like the expression_id in this case should actually literally be an id for the inner state? So as with #21650 , it feels like this is more like a mutable_state_id, perhaps?

We could use the Arc address of Inner, but it's a smell to rely on pointers. The approach in this PR is randomly generate an id for Inner and ensure that this ID propagates when new expressions are derived using the same Inner.

You make a good point though. Do you think it would be more clear if we put the expression_id inside the Inner struct? I feel like that's better

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be more clear if we put the expression_id inside the Inner struct? I feel like that's better

I agree that's cleaner. The con is that you need to acquire the lock just to read it. But maybe that's fine.

pub is_complete: bool,
}

// TODO: Include expression_id in debug output.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally even in the DisplayAs representation for plans, I would think. It's actually possibly the most important thing to have in the plan when rendering a dynamic filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep this PR small, so I hid the expression_id to avoid 1 test that was failing. I have code ready to address this TODO which I plan to publish as a next step after this PR is merged. This PR is just an incremental step.

// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
// like `HashJoinExec`, `AggregateExec`,  `SortExec` do not serialize their
// dynamic filter. This causes round trips to fail on the `expression_id`
// because it is regenerated on deserialization.

Comment on lines +401 to +406
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
/// proto deserialization to preserve `expression_id` across a roundtrip
/// rather than minting a fresh one.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be generic via the shared state proposal? Not sure what is up with that. Ditto fn inner.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared state discussion seems promising! I'm hoping it will be an easy migration. I imagine that we would end up storing all Inners in the TaskContext instead of in the actual DynamicFilterPhysicalExpr and looking up the Inners via the expression_id. It's still an ongoing discussion though so I can't be sure how the migration will look.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that would make sense to me

itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
petgraph = "0.8.3"
rand = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the conventions are, but: introducing random order to the physical-expr crate seems like a big deal? Is this being used to generate something that could be deterministic from some sort of context instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question! I was thinking about a few alternatives 😄

  • We could hash all the exprs in the DynamicFilterPhysicalExpr to get an id. However, this wouldn't solve the shared Inner state linking problem. We don't want this type of identifier - we want an identifier for the inner state specifically.
  • We could use the Arc address of the Inner struct, but it's a bit of a smell to rely on pointer addresses - for example, IDs derived from Arc pointers are only valid until the Arc is dropped. This is what the old code used and something I used as well in initial versions of this PR

A rand u64 is not bad. Realistically, we just need to not have a collision between distinct dynamic filters in a plan. I figure that the probability of more than 2 or 3 distinct dynamic filters in a query must be very low already. And the probability of 2 or 3 rand u64s colliding is negligible.

Lmk what you think!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm less worried about collisions, and more worried about non-determinism (causing flaky tests, different plans to be generated randomly, etc). The other annoying aspect of random ids is that they are huge. If you go ahead and actually render this everywhere, a random ID is going to take up a lot more space than one generated on a context (...starting from 0, etc).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the ids are also random (generated from a mashup of arc pointer address, process id, etc.). So in that sense it's no better or worse. But I do agree something deterministically generated from context would be better.

The only alternative that occurs to me is a process level atomic. Not sure if that might cause some locking, etc. Do you have any other suggestions?

Lastly: we can always change this. As long as there is no API contract on what this number is going to be we could replace it at any point if it becomes a problem.

Co-authored-by: Stu Hood <stuhood@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-expr Changes to the physical-expr crates proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serialize dynamic filters across network boundaries

3 participants