Skip to content

Commit dc683d3

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Adding a new concept, a `PhysicalExprId`, which has two identifiers, a "shallow" identifier to indicate two equal expressions which may have different children, and an "exact" identifier to indicate two exprs that are exactly the same. 4. Updating the deduping deserializer and protos to now be aware of the new "shallow" id, deduping exprs which are the same but have different children accordingly. This change adds tests which roundtrip dynamic filters and assert that referential integrity is maintained.
1 parent e7f7fa9 commit dc683d3

10 files changed

Lines changed: 965 additions & 66 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::fmt;
2020
use std::fmt::{Debug, Display, Formatter};
21-
use std::hash::{Hash, Hasher};
21+
use std::hash::{DefaultHasher, Hash, Hasher};
2222
use std::sync::Arc;
2323

2424
use crate::utils::scatter;
@@ -438,6 +438,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
438438
fn placement(&self) -> ExpressionPlacement {
439439
ExpressionPlacement::KeepInPlace
440440
}
441+
442+
/// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression
443+
/// is dropped, then the returned id is no longer valid.
444+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
445+
Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None))
446+
}
447+
}
448+
449+
/// A composite identifier for [`PhysicalExpr`].
450+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
451+
pub struct PhysicalExprId {
452+
exact: u64,
453+
shallow: Option<u64>,
454+
}
455+
456+
impl PhysicalExprId {
457+
/// Create a new [`PhysicalExprId`]. Both ids must be globally unique within
458+
/// a process.
459+
pub fn new(exact: u64, shallow: Option<u64>) -> Self {
460+
Self { exact, shallow }
461+
}
462+
463+
/// Returns the identifier for the full expression tree, including children.
464+
pub fn exact(&self) -> u64 {
465+
self.exact
466+
}
467+
468+
/// Returns the identifier for just the expression root, ignoring children.
469+
pub fn shallow(&self) -> Option<u64> {
470+
self.shallow
471+
}
472+
}
473+
474+
/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes
475+
/// the [`Arc`] pointer to create a process-local identifier that remains valid
476+
/// only while that allocation is still alive.
477+
pub fn expr_id_from_arc<T: ?Sized>(expr: &Arc<T>, salt: &[u64]) -> u64 {
478+
let mut hasher = DefaultHasher::new();
479+
let ptr = Arc::as_ptr(expr) as *const () as u64;
480+
ptr.hash(&mut hasher);
481+
for &salt in salt {
482+
salt.hash(&mut hasher);
483+
}
484+
hasher.finish()
441485
}
442486

443487
#[deprecated(

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 231 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use datafusion_common::{
2626
tree_node::{Transformed, TransformedResult, TreeNode},
2727
};
2828
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::DynHash;
29+
use datafusion_physical_expr_common::physical_expr::{
30+
DynHash, PhysicalExprId, expr_id_from_arc,
31+
};
3032

3133
/// State of a dynamic filter, tracking both updates and completion.
3234
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -88,6 +90,118 @@ struct Inner {
8890
is_complete: bool,
8991
}
9092

93+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
94+
/// serialization / deserialization.
95+
pub struct DynamicFilterSnapshot {
96+
children: Vec<Arc<dyn PhysicalExpr>>,
97+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
98+
// Inner state.
99+
generation: u64,
100+
inner_expr: Arc<dyn PhysicalExpr>,
101+
is_complete: bool,
102+
}
103+
104+
impl DynamicFilterSnapshot {
105+
pub fn new(
106+
children: Vec<Arc<dyn PhysicalExpr>>,
107+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
108+
generation: u64,
109+
inner_expr: Arc<dyn PhysicalExpr>,
110+
is_complete: bool,
111+
) -> Self {
112+
Self {
113+
children,
114+
remapped_children,
115+
generation,
116+
inner_expr,
117+
is_complete,
118+
}
119+
}
120+
121+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
122+
&self.children
123+
}
124+
125+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
126+
self.remapped_children.as_deref()
127+
}
128+
129+
pub fn generation(&self) -> u64 {
130+
self.generation
131+
}
132+
133+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
134+
&self.inner_expr
135+
}
136+
137+
pub fn is_complete(&self) -> bool {
138+
self.is_complete
139+
}
140+
}
141+
142+
impl Display for DynamicFilterSnapshot {
143+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144+
write!(
145+
f,
146+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
147+
self.children,
148+
self.remapped_children,
149+
self.generation,
150+
self.inner_expr,
151+
self.is_complete
152+
)
153+
}
154+
}
155+
156+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
157+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
158+
let DynamicFilterSnapshot {
159+
children,
160+
remapped_children,
161+
generation,
162+
inner_expr,
163+
is_complete,
164+
} = snapshot;
165+
166+
let state = if is_complete {
167+
FilterState::Complete { generation }
168+
} else {
169+
FilterState::InProgress { generation }
170+
};
171+
let (state_watch, _) = watch::channel(state);
172+
173+
Self {
174+
children,
175+
remapped_children,
176+
inner: Arc::new(RwLock::new(Inner {
177+
generation,
178+
expr: inner_expr,
179+
is_complete,
180+
})),
181+
state_watch,
182+
data_type: Arc::new(RwLock::new(None)),
183+
nullable: Arc::new(RwLock::new(None)),
184+
}
185+
}
186+
}
187+
188+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
189+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
190+
// Snapshot everything in the mutex atomically.
191+
let (generation, inner_expr, is_complete) = {
192+
let inner = expr.inner.read();
193+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
194+
};
195+
DynamicFilterSnapshot {
196+
children: expr.children.clone(),
197+
remapped_children: expr.remapped_children.clone(),
198+
generation,
199+
inner_expr,
200+
is_complete,
201+
}
202+
}
203+
}
204+
91205
impl Inner {
92206
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93207
Self {
@@ -444,6 +558,15 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
444558
// Return the current generation of the expression.
445559
self.inner.read().generation
446560
}
561+
562+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
563+
Some(PhysicalExprId::new(
564+
// Capture the outer arc, which contains children and the expr.
565+
expr_id_from_arc(&self, salt),
566+
// Capture the inner arc, which contains the expr only.
567+
Some(expr_id_from_arc(&self.inner, salt)),
568+
))
569+
}
447570
}
448571

449572
#[cfg(test)]
@@ -861,4 +984,111 @@ mod test {
861984
"Hash should be stable after update (identity-based)"
862985
);
863986
}
987+
988+
#[test]
989+
fn test_current_snapshot_roundtrip() {
990+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
991+
let col_a = col("a", &schema).unwrap();
992+
993+
// Create a dynamic filter with children
994+
let expr = Arc::new(BinaryExpr::new(
995+
Arc::clone(&col_a),
996+
datafusion_expr::Operator::Gt,
997+
lit(10) as Arc<dyn PhysicalExpr>,
998+
));
999+
let filter = DynamicFilterPhysicalExpr::new(
1000+
vec![Arc::clone(&col_a)],
1001+
expr as Arc<dyn PhysicalExpr>,
1002+
);
1003+
1004+
// Update expression and mark complete
1005+
filter
1006+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1007+
.expect("Update should succeed");
1008+
filter.mark_complete();
1009+
1010+
// Change the children of the expr.
1011+
let reassigned_schema = Arc::new(Schema::new(vec![
1012+
Field::new("b", DataType::Int32, false),
1013+
Field::new("a", DataType::Int32, false),
1014+
]));
1015+
let reassigned = reassign_expr_columns(
1016+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
1017+
&reassigned_schema,
1018+
)
1019+
.expect("reassign_expr_columns should succeed");
1020+
let reassigned = reassigned
1021+
.as_any()
1022+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1023+
.expect("Expected dynamic filter after reassignment");
1024+
1025+
// Take a snapshot and reconstruct
1026+
let snapshot = DynamicFilterSnapshot::from(reassigned);
1027+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1028+
1029+
// Assert snapshots are equal.
1030+
assert_eq!(
1031+
DynamicFilterSnapshot::from(reassigned).to_string(),
1032+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1033+
);
1034+
}
1035+
1036+
#[tokio::test]
1037+
async fn test_expr_id() {
1038+
let source_schema =
1039+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1040+
let col_a = col("a", &source_schema).unwrap();
1041+
1042+
// Create a source filter
1043+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1044+
vec![Arc::clone(&col_a)],
1045+
lit(true) as Arc<dyn PhysicalExpr>,
1046+
));
1047+
let source_clone = Arc::clone(&source);
1048+
1049+
// Create a derived filter by reassigning the source filter to a different schema.
1050+
let derived_schema = Arc::new(Schema::new(vec![
1051+
Field::new("x", DataType::Int32, false),
1052+
Field::new("a", DataType::Int32, false),
1053+
]));
1054+
let derived = reassign_expr_columns(
1055+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1056+
&derived_schema,
1057+
)
1058+
.expect("reassign_expr_columns should succeed");
1059+
1060+
let derived_expr_id = Arc::clone(&derived)
1061+
.expr_id(&[])
1062+
.expect("combined filter should have an expr_id");
1063+
let source_expr_id = Arc::clone(&source)
1064+
.expr_id(&[])
1065+
.expect("source filter should have an expr_id");
1066+
let source_clone_expr_id = Arc::clone(&source_clone)
1067+
.expr_id(&[])
1068+
.expect("source clone should have an expr_id");
1069+
1070+
assert_eq!(
1071+
source_clone_expr_id.exact(),
1072+
source_expr_id.exact(),
1073+
"cloned filter should have the same exact id because the children are the same",
1074+
);
1075+
1076+
assert_eq!(
1077+
source_clone_expr_id.shallow(),
1078+
source_expr_id.shallow(),
1079+
"cloned filter should have the same shallow id because the exprs are the same",
1080+
);
1081+
1082+
assert_ne!(
1083+
derived_expr_id.exact(),
1084+
source_expr_id.exact(),
1085+
"filters should have different exact ids because the children are different",
1086+
);
1087+
1088+
assert_eq!(
1089+
derived_expr_id.shallow(),
1090+
source_expr_id.shallow(),
1091+
"filters should have the same shallow id because the exprs are the same",
1092+
);
1093+
}
8641094
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use case::{CaseExpr, case};
4343
pub use cast::{CastExpr, cast};
4444
pub use column::{Column, col, with_new_schema};
4545
pub use datafusion_expr::utils::format_state_name;
46-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
46+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
4747
pub use in_list::{InListExpr, in_list};
4848
pub use is_not_null::{IsNotNullExpr, is_not_null};
4949
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/proto/datafusion.proto

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -870,18 +870,18 @@ message PhysicalExtensionNode {
870870
repeated PhysicalPlanNode inputs = 2;
871871
}
872872

873-
// physical expressions
874873
message PhysicalExprNode {
875874
// Was date_time_interval_expr
876875
reserved 17;
877876

878-
// Unique identifier for this expression to do deduplication during deserialization.
879-
// When serializing, this is set to a unique identifier for each combination of
880-
// expression, process and serialization run.
881-
// When deserializing, if this ID has been seen before, the cached Arc is returned
882-
// instead of creating a new one, enabling reconstruction of referential integrity
883-
// across serde roundtrips.
877+
// Unique identifiers for this expression used during deserialization to restore
878+
// referential integrity across serde roundtrips.
879+
//
880+
// expr_id: if two exprs have the same expr_id, they are identical (including children)
881+
// shallow_expr_id: if two exprs have the same shallow_expr_id, they are identical but may
882+
// have different children
884883
optional uint64 expr_id = 30;
884+
optional uint64 shallow_expr_id = 31;
885885

886886
oneof ExprType {
887887
// column references
@@ -920,9 +920,19 @@ message PhysicalExprNode {
920920
UnknownColumn unknown_column = 20;
921921

922922
PhysicalHashExprNode hash_expr = 21;
923+
924+
PhysicalDynamicFilterNode dynamic_filter = 22;
923925
}
924926
}
925927

928+
message PhysicalDynamicFilterNode {
929+
repeated PhysicalExprNode children = 1;
930+
repeated PhysicalExprNode remapped_children = 2;
931+
uint64 generation = 3;
932+
PhysicalExprNode inner_expr = 4;
933+
bool is_complete = 5;
934+
}
935+
926936
message PhysicalScalarUdfNode {
927937
string name = 1;
928938
repeated PhysicalExprNode args = 2;
@@ -1477,4 +1487,4 @@ message AsyncFuncExecNode {
14771487
message BufferExecNode {
14781488
PhysicalPlanNode input = 1;
14791489
uint64 capacity = 2;
1480-
}
1490+
}

0 commit comments

Comments
 (0)