Skip to content

Commit 4b2093d

Browse files
restore snapshot method
1 parent 68e3f72 commit 4b2093d

2 files changed

Lines changed: 69 additions & 37 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ impl DynamicFilterPhysicalExpr {
308308
) -> Result<Self> {
309309
// If there's any references to this filter or any watchers, we should not replace the
310310
// inner state.
311-
if self.is_used() || self.state_watch.receiver_count() > 0 {
311+
if self.is_used() {
312312
return internal_err!(
313313
"Cannot replace the inner state of a DynamicFilterPhysicalExpr that is in use"
314314
);
@@ -587,6 +587,16 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
587587
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588588
self.render(f, |expr, f| expr.fmt_sql(f))
589589
}
590+
591+
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
592+
// Return the current expression as a snapshot.
593+
Ok(Some(self.current()?))
594+
}
595+
596+
fn snapshot_generation(&self) -> u64 {
597+
// Return the current generation of the expression.
598+
self.inner.read().generation
599+
}
590600
}
591601

592602
#[cfg(test)]
@@ -716,6 +726,23 @@ mod test {
716726
assert!(arr_1.eq(&expected));
717727
}
718728

729+
#[test]
730+
fn test_snapshot() {
731+
let expr = lit(42) as Arc<dyn PhysicalExpr>;
732+
let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));
733+
734+
// Take a snapshot of the current expression
735+
let snapshot = dynamic_filter.snapshot().unwrap();
736+
assert_eq!(snapshot, Some(expr));
737+
738+
// Update the current expression
739+
let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
740+
dynamic_filter.update(Arc::clone(&new_expr)).unwrap();
741+
// Take another snapshot
742+
let snapshot = dynamic_filter.snapshot().unwrap();
743+
assert_eq!(snapshot, Some(new_expr));
744+
}
745+
719746
#[test]
720747
fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
721748
let dynamic_filter =

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,47 @@ pub fn serialize_physical_expr_with_converter(
258258
codec: &dyn PhysicalExtensionCodec,
259259
proto_converter: &dyn PhysicalProtoConverterExtension,
260260
) -> Result<protobuf::PhysicalExprNode> {
261+
// Check for DynamicFilterPhysicalExpr before snapshotting.
262+
// We need to handle it before snapshot_physical_expr because snapshot()
263+
// replaces the DynamicFilterPhysicalExpr with its inner expression.
264+
if let Some(df) = value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
265+
// Capture all state atomically
266+
let snapshot = DynamicFilterSnapshot::from(df);
267+
268+
let children = snapshot
269+
.children()
270+
.iter()
271+
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
272+
.collect::<Result<Vec<_>>>()?;
273+
274+
let remapped_children = if let Some(remapped) = snapshot.remapped_children() {
275+
remapped
276+
.iter()
277+
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
278+
.collect::<Result<Vec<_>>>()?
279+
} else {
280+
vec![]
281+
};
282+
283+
let inner_expr = Box::new(
284+
proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?,
285+
);
286+
287+
return Ok(protobuf::PhysicalExprNode {
288+
expr_id: None,
289+
dynamic_filter_inner_id: None,
290+
expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter(
291+
Box::new(protobuf::PhysicalDynamicFilterNode {
292+
children,
293+
remapped_children,
294+
generation: snapshot.generation(),
295+
inner_expr: Some(inner_expr),
296+
is_complete: snapshot.is_complete(),
297+
}),
298+
)),
299+
});
300+
}
301+
261302
// Snapshot the expr in case it has dynamic predicate state so
262303
// it can be serialized
263304
let value = snapshot_physical_expr(Arc::clone(value))?;
@@ -328,42 +369,6 @@ pub fn serialize_physical_expr_with_converter(
328369
binary_expr,
329370
)),
330371
})
331-
} else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
332-
// Capture all state atomically
333-
let snapshot = DynamicFilterSnapshot::from(df);
334-
335-
let children = snapshot
336-
.children()
337-
.iter()
338-
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
339-
.collect::<Result<Vec<_>>>()?;
340-
341-
let remapped_children = if let Some(remapped) = snapshot.remapped_children() {
342-
remapped
343-
.iter()
344-
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
345-
.collect::<Result<Vec<_>>>()?
346-
} else {
347-
vec![]
348-
};
349-
350-
let inner_expr = Box::new(
351-
proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?,
352-
);
353-
354-
Ok(protobuf::PhysicalExprNode {
355-
expr_id: None,
356-
dynamic_filter_inner_id: None,
357-
expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter(
358-
Box::new(protobuf::PhysicalDynamicFilterNode {
359-
children,
360-
remapped_children,
361-
generation: snapshot.generation(),
362-
inner_expr: Some(inner_expr),
363-
is_complete: snapshot.is_complete(),
364-
}),
365-
)),
366-
})
367372
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
368373
Ok(protobuf::PhysicalExprNode {
369374
expr_id: None,

0 commit comments

Comments
 (0)