Skip to content

Commit b419d4c

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Adding a new concept, a `PhysicalExprId`, which has two identifiers, a "shallow" identifier to indicate two equal expressions which may have different children, and an "exact" identifier to indicate two exprs that are exactly the same. 4. Updating the deduping deserializer and protos to now be aware of the new "shallow" id, deduping exprs which are the same but have different children accordingly. This change adds tests which roundtrip dynamic filters and assert that referential integrity is maintained.
1 parent c792700 commit b419d4c

10 files changed

Lines changed: 990 additions & 67 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::fmt;
2020
use std::fmt::{Debug, Display, Formatter};
21-
use std::hash::{Hash, Hasher};
21+
use std::hash::{DefaultHasher, Hash, Hasher};
2222
use std::sync::Arc;
2323

2424
use crate::utils::scatter;
@@ -441,6 +441,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
441441
fn placement(&self) -> ExpressionPlacement {
442442
ExpressionPlacement::KeepInPlace
443443
}
444+
445+
/// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression
446+
/// is dropped, then the returned id is no longer valid.
447+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
448+
Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None))
449+
}
450+
}
451+
452+
/// A composite identifier for [`PhysicalExpr`].
453+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
454+
pub struct PhysicalExprId {
455+
exact: u64,
456+
shallow: Option<u64>,
457+
}
458+
459+
impl PhysicalExprId {
460+
/// Create a new [`PhysicalExprId`]. Both ids must be globally unique within
461+
/// a process.
462+
pub fn new(exact: u64, shallow: Option<u64>) -> Self {
463+
Self { exact, shallow }
464+
}
465+
466+
/// Returns the identifier for the full expression tree, including children.
467+
pub fn exact(&self) -> u64 {
468+
self.exact
469+
}
470+
471+
/// Returns the identifier for just the expression root, ignoring children.
472+
pub fn shallow(&self) -> Option<u64> {
473+
self.shallow
474+
}
475+
}
476+
477+
/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes
478+
/// the [`Arc`] pointer to create a process-local identifier that remains valid
479+
/// only while that allocation is still alive.
480+
pub fn expr_id_from_arc<T: ?Sized>(expr: &Arc<T>, salt: &[u64]) -> u64 {
481+
let mut hasher = DefaultHasher::new();
482+
let ptr = Arc::as_ptr(expr) as *const () as u64;
483+
ptr.hash(&mut hasher);
484+
for &salt in salt {
485+
salt.hash(&mut hasher);
486+
}
487+
hasher.finish()
444488
}
445489

446490
#[deprecated(

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 231 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use datafusion_common::{
2626
tree_node::{Transformed, TransformedResult, TreeNode},
2727
};
2828
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::DynHash;
29+
use datafusion_physical_expr_common::physical_expr::{
30+
DynHash, PhysicalExprId, expr_id_from_arc,
31+
};
3032

3133
/// State of a dynamic filter, tracking both updates and completion.
3234
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -88,6 +90,118 @@ struct Inner {
8890
is_complete: bool,
8991
}
9092

93+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
94+
/// serialization / deserialization.
95+
pub struct DynamicFilterSnapshot {
96+
children: Vec<Arc<dyn PhysicalExpr>>,
97+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
98+
// Inner state.
99+
generation: u64,
100+
inner_expr: Arc<dyn PhysicalExpr>,
101+
is_complete: bool,
102+
}
103+
104+
impl DynamicFilterSnapshot {
105+
pub fn new(
106+
children: Vec<Arc<dyn PhysicalExpr>>,
107+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
108+
generation: u64,
109+
inner_expr: Arc<dyn PhysicalExpr>,
110+
is_complete: bool,
111+
) -> Self {
112+
Self {
113+
children,
114+
remapped_children,
115+
generation,
116+
inner_expr,
117+
is_complete,
118+
}
119+
}
120+
121+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
122+
&self.children
123+
}
124+
125+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
126+
self.remapped_children.as_deref()
127+
}
128+
129+
pub fn generation(&self) -> u64 {
130+
self.generation
131+
}
132+
133+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
134+
&self.inner_expr
135+
}
136+
137+
pub fn is_complete(&self) -> bool {
138+
self.is_complete
139+
}
140+
}
141+
142+
impl Display for DynamicFilterSnapshot {
143+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144+
write!(
145+
f,
146+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
147+
self.children,
148+
self.remapped_children,
149+
self.generation,
150+
self.inner_expr,
151+
self.is_complete
152+
)
153+
}
154+
}
155+
156+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
157+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
158+
let DynamicFilterSnapshot {
159+
children,
160+
remapped_children,
161+
generation,
162+
inner_expr,
163+
is_complete,
164+
} = snapshot;
165+
166+
let state = if is_complete {
167+
FilterState::Complete { generation }
168+
} else {
169+
FilterState::InProgress { generation }
170+
};
171+
let (state_watch, _) = watch::channel(state);
172+
173+
Self {
174+
children,
175+
remapped_children,
176+
inner: Arc::new(RwLock::new(Inner {
177+
generation,
178+
expr: inner_expr,
179+
is_complete,
180+
})),
181+
state_watch,
182+
data_type: Arc::new(RwLock::new(None)),
183+
nullable: Arc::new(RwLock::new(None)),
184+
}
185+
}
186+
}
187+
188+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
189+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
190+
// Snapshot everything in the mutex atomically.
191+
let (generation, inner_expr, is_complete) = {
192+
let inner = expr.inner.read();
193+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
194+
};
195+
DynamicFilterSnapshot {
196+
children: expr.children.clone(),
197+
remapped_children: expr.remapped_children.clone(),
198+
generation,
199+
inner_expr,
200+
is_complete,
201+
}
202+
}
203+
}
204+
91205
impl Inner {
92206
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93207
Self {
@@ -448,6 +562,15 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
448562
// Return the current generation of the expression.
449563
self.inner.read().generation
450564
}
565+
566+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
567+
Some(PhysicalExprId::new(
568+
// Capture the outer arc, which contains children and the expr.
569+
expr_id_from_arc(&self, salt),
570+
// Capture the inner arc, which contains the expr only.
571+
Some(expr_id_from_arc(&self.inner, salt)),
572+
))
573+
}
451574
}
452575

453576
#[cfg(test)]
@@ -867,4 +990,111 @@ mod test {
867990
"Hash should be stable after update (identity-based)"
868991
);
869992
}
993+
994+
#[test]
995+
fn test_current_snapshot_roundtrip() {
996+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
997+
let col_a = col("a", &schema).unwrap();
998+
999+
// Create a dynamic filter with children
1000+
let expr = Arc::new(BinaryExpr::new(
1001+
Arc::clone(&col_a),
1002+
datafusion_expr::Operator::Gt,
1003+
lit(10) as Arc<dyn PhysicalExpr>,
1004+
));
1005+
let filter = DynamicFilterPhysicalExpr::new(
1006+
vec![Arc::clone(&col_a)],
1007+
expr as Arc<dyn PhysicalExpr>,
1008+
);
1009+
1010+
// Update expression and mark complete
1011+
filter
1012+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1013+
.expect("Update should succeed");
1014+
filter.mark_complete();
1015+
1016+
// Change the children of the expr.
1017+
let reassigned_schema = Arc::new(Schema::new(vec![
1018+
Field::new("b", DataType::Int32, false),
1019+
Field::new("a", DataType::Int32, false),
1020+
]));
1021+
let reassigned = reassign_expr_columns(
1022+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
1023+
&reassigned_schema,
1024+
)
1025+
.expect("reassign_expr_columns should succeed");
1026+
let reassigned = reassigned
1027+
.as_any()
1028+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1029+
.expect("Expected dynamic filter after reassignment");
1030+
1031+
// Take a snapshot and reconstruct
1032+
let snapshot = DynamicFilterSnapshot::from(reassigned);
1033+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1034+
1035+
// Assert snapshots are equal.
1036+
assert_eq!(
1037+
DynamicFilterSnapshot::from(reassigned).to_string(),
1038+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1039+
);
1040+
}
1041+
1042+
#[tokio::test]
1043+
async fn test_expr_id() {
1044+
let source_schema =
1045+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1046+
let col_a = col("a", &source_schema).unwrap();
1047+
1048+
// Create a source filter
1049+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1050+
vec![Arc::clone(&col_a)],
1051+
lit(true) as Arc<dyn PhysicalExpr>,
1052+
));
1053+
let source_clone = Arc::clone(&source);
1054+
1055+
// Create a derived filter by reassigning the source filter to a different schema.
1056+
let derived_schema = Arc::new(Schema::new(vec![
1057+
Field::new("x", DataType::Int32, false),
1058+
Field::new("a", DataType::Int32, false),
1059+
]));
1060+
let derived = reassign_expr_columns(
1061+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1062+
&derived_schema,
1063+
)
1064+
.expect("reassign_expr_columns should succeed");
1065+
1066+
let derived_expr_id = Arc::clone(&derived)
1067+
.expr_id(&[])
1068+
.expect("combined filter should have an expr_id");
1069+
let source_expr_id = Arc::clone(&source)
1070+
.expr_id(&[])
1071+
.expect("source filter should have an expr_id");
1072+
let source_clone_expr_id = Arc::clone(&source_clone)
1073+
.expr_id(&[])
1074+
.expect("source clone should have an expr_id");
1075+
1076+
assert_eq!(
1077+
source_clone_expr_id.exact(),
1078+
source_expr_id.exact(),
1079+
"cloned filter should have the same exact id because the children are the same",
1080+
);
1081+
1082+
assert_eq!(
1083+
source_clone_expr_id.shallow(),
1084+
source_expr_id.shallow(),
1085+
"cloned filter should have the same shallow id because the exprs are the same",
1086+
);
1087+
1088+
assert_ne!(
1089+
derived_expr_id.exact(),
1090+
source_expr_id.exact(),
1091+
"filters should have different exact ids because the children are different",
1092+
);
1093+
1094+
assert_eq!(
1095+
derived_expr_id.shallow(),
1096+
source_expr_id.shallow(),
1097+
"filters should have the same shallow id because the exprs are the same",
1098+
);
1099+
}
8701100
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
4545
pub use cast_column::CastColumnExpr;
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
48+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/proto/datafusion.proto

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -870,18 +870,18 @@ message PhysicalExtensionNode {
870870
repeated PhysicalPlanNode inputs = 2;
871871
}
872872

873-
// physical expressions
874873
message PhysicalExprNode {
875874
// Was date_time_interval_expr
876875
reserved 17;
877876

878-
// Unique identifier for this expression to do deduplication during deserialization.
879-
// When serializing, this is set to a unique identifier for each combination of
880-
// expression, process and serialization run.
881-
// When deserializing, if this ID has been seen before, the cached Arc is returned
882-
// instead of creating a new one, enabling reconstruction of referential integrity
883-
// across serde roundtrips.
877+
// Unique identifiers for this expression used during deserialization to restore
878+
// referential integrity across serde roundtrips.
879+
//
880+
// expr_id: if two exprs have the same expr_id, they are identical (including children)
881+
// shallow_expr_id: if two exprs have the same shallow_expr_id, they are identical but may
882+
// have different children
884883
optional uint64 expr_id = 30;
884+
optional uint64 shallow_expr_id = 31;
885885

886886
oneof ExprType {
887887
// column references
@@ -920,9 +920,19 @@ message PhysicalExprNode {
920920
UnknownColumn unknown_column = 20;
921921

922922
PhysicalHashExprNode hash_expr = 21;
923+
924+
PhysicalDynamicFilterNode dynamic_filter = 22;
923925
}
924926
}
925927

928+
message PhysicalDynamicFilterNode {
929+
repeated PhysicalExprNode children = 1;
930+
repeated PhysicalExprNode remapped_children = 2;
931+
uint64 generation = 3;
932+
PhysicalExprNode inner_expr = 4;
933+
bool is_complete = 5;
934+
}
935+
926936
message PhysicalScalarUdfNode {
927937
string name = 1;
928938
repeated PhysicalExprNode args = 2;
@@ -1470,4 +1480,4 @@ message AsyncFuncExecNode {
14701480
message BufferExecNode {
14711481
PhysicalPlanNode input = 1;
14721482
uint64 capacity = 2;
1473-
}
1483+
}

0 commit comments

Comments
 (0)