Skip to content

Commit 68e3f72

Browse files
address pr comments - add From trait and improve safety check on new_from_source
1 parent 53c3bc7 commit 68e3f72

5 files changed

Lines changed: 91 additions & 92 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 65 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,54 @@ impl Display for DynamicFilterSnapshot {
151151
}
152152
}
153153

154+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
155+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
156+
let DynamicFilterSnapshot {
157+
children,
158+
remapped_children,
159+
generation,
160+
inner_expr,
161+
is_complete,
162+
} = snapshot;
163+
164+
let state = if is_complete {
165+
FilterState::Complete { generation }
166+
} else {
167+
FilterState::InProgress { generation }
168+
};
169+
let (state_watch, _) = watch::channel(state);
170+
171+
Self {
172+
children,
173+
remapped_children,
174+
inner: Arc::new(RwLock::new(Inner {
175+
generation,
176+
expr: inner_expr,
177+
is_complete,
178+
})),
179+
state_watch,
180+
data_type: Arc::new(RwLock::new(None)),
181+
nullable: Arc::new(RwLock::new(None)),
182+
}
183+
}
184+
}
185+
186+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
187+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
188+
let (generation, inner_expr, is_complete) = {
189+
let inner = expr.inner.read();
190+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
191+
};
192+
DynamicFilterSnapshot {
193+
children: expr.children.clone(),
194+
remapped_children: expr.remapped_children.clone(),
195+
generation,
196+
inner_expr,
197+
is_complete,
198+
}
199+
}
200+
}
201+
154202
impl Inner {
155203
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
156204
Self {
@@ -243,58 +291,6 @@ impl DynamicFilterPhysicalExpr {
243291
}
244292
}
245293

246-
/// Reconstructs a [`DynamicFilterPhysicalExpr`] from a snapshot.
247-
///
248-
/// This is a low-level API intended for use by the proto deserialization layer.
249-
#[doc(hidden)]
250-
pub fn new_from_snapshot(snapshot: DynamicFilterSnapshot) -> Self {
251-
let DynamicFilterSnapshot {
252-
children,
253-
remapped_children,
254-
generation,
255-
inner_expr,
256-
is_complete,
257-
} = snapshot;
258-
259-
let state = if is_complete {
260-
FilterState::Complete { generation }
261-
} else {
262-
FilterState::InProgress { generation }
263-
};
264-
let (state_watch, _) = watch::channel(state);
265-
266-
Self {
267-
children,
268-
remapped_children,
269-
inner: Arc::new(RwLock::new(Inner {
270-
generation,
271-
expr: inner_expr,
272-
is_complete,
273-
})),
274-
state_watch,
275-
data_type: Arc::new(RwLock::new(None)),
276-
nullable: Arc::new(RwLock::new(None)),
277-
}
278-
}
279-
280-
/// Atomically captures all state needed for serialization into a [`DynamicFilterSnapshot`].
281-
///
282-
/// This is a low-level API intended for use by the proto deserialization layer.
283-
#[doc(hidden)]
284-
pub fn current_snapshot(&self) -> DynamicFilterSnapshot {
285-
let (generation, inner_expr, is_complete) = {
286-
let inner = self.inner.read();
287-
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
288-
};
289-
DynamicFilterSnapshot {
290-
children: self.children.clone(),
291-
remapped_children: self.remapped_children.clone(),
292-
generation,
293-
inner_expr,
294-
is_complete,
295-
}
296-
}
297-
298294
/// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it overwrites the
299295
/// internal state with the source filter's state.
300296
///
@@ -303,16 +299,21 @@ impl DynamicFilterPhysicalExpr {
303299
/// # Safety
304300
///
305301
/// The dynamic filter should not be in use when calling this method, otherwise there
306-
/// may be undefined behavior. This method may do the following or worse:
302+
/// may be undefined behavior. Changing the inner state of a filter may do the following:
307303
/// - transition the state to complete without notifying the watch
308304
/// - cause a generation number to be emitted which is out of order
309-
pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) -> Result<Self> {
310-
// Best effort check that no one is subscribed.
311-
if self.state_watch.receiver_count() > 0 {
305+
pub fn new_from_source(
306+
self: &Arc<Self>,
307+
source: &DynamicFilterPhysicalExpr,
308+
) -> Result<Self> {
309+
// If there's any references to this filter or any watchers, we should not replace the
310+
// inner state.
311+
if self.is_used() || self.state_watch.receiver_count() > 0 {
312312
return internal_err!(
313-
"Cannot replace the inner state of a DynamicFilterPhysicalExpr that has subscribers"
313+
"Cannot replace the inner state of a DynamicFilterPhysicalExpr that is in use"
314314
);
315315
};
316+
316317
Ok(Self {
317318
children: self.children.clone(),
318319
remapped_children: self.remapped_children.clone(),
@@ -1020,13 +1021,13 @@ mod test {
10201021
filter.mark_complete();
10211022

10221023
// Take a snapshot and reconstruct
1023-
let snapshot = filter.current_snapshot();
1024-
let reconstructed = DynamicFilterPhysicalExpr::new_from_snapshot(snapshot);
1024+
let snapshot = DynamicFilterSnapshot::from(&filter);
1025+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
10251026

10261027
// String representations should be equal
10271028
assert_eq!(
1028-
filter.current_snapshot().to_string(),
1029-
reconstructed.current_snapshot().to_string(),
1029+
DynamicFilterSnapshot::from(&filter).to_string(),
1030+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
10301031
);
10311032
}
10321033

@@ -1045,10 +1046,10 @@ mod test {
10451046
// Create a target filter with different children
10461047
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
10471048
let col_x = col("x", &schema).unwrap();
1048-
let target = DynamicFilterPhysicalExpr::new(
1049+
let target = Arc::new(DynamicFilterPhysicalExpr::new(
10491050
vec![Arc::clone(&col_x)],
10501051
lit(0) as Arc<dyn PhysicalExpr>,
1051-
);
1052+
));
10521053

10531054
// Create new filter from source's inner state
10541055
let combined = target.new_from_source(&source).unwrap();
@@ -1061,7 +1062,7 @@ mod test {
10611062
);
10621063

10631064
// Verify children are from target, not source
1064-
let combined_snapshot = combined.current_snapshot();
1065+
let combined_snapshot = DynamicFilterSnapshot::from(&combined);
10651066
assert_eq!(
10661067
combined_snapshot.children().len(),
10671068
1,

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -529,16 +529,16 @@ pub fn parse_physical_expr_with_converter(
529529
)?;
530530

531531
// Recreate filter from snapshot
532-
let base_filter = Arc::new(DynamicFilterPhysicalExpr::new_from_snapshot(
533-
DynamicFilterSnapshot::new(
534-
children,
535-
remapped_children,
536-
dynamic_filter.generation,
537-
inner_expr,
538-
dynamic_filter.is_complete,
539-
),
540-
));
541-
base_filter as Arc<dyn PhysicalExpr>
532+
let snapshot = DynamicFilterSnapshot::new(
533+
children,
534+
remapped_children,
535+
dynamic_filter.generation,
536+
inner_expr,
537+
dynamic_filter.is_complete,
538+
);
539+
let base_filter: Arc<dyn PhysicalExpr> =
540+
Arc::new(DynamicFilterPhysicalExpr::from(snapshot));
541+
base_filter
542542
}
543543
ExprType::Extension(extension) => {
544544
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::any::Any;
12
// Licensed to the Apache Software Foundation (ASF) under one
23
// or more contributor license agreements. See the NOTICE file
34
// distributed with this work for additional information
@@ -3962,13 +3963,8 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
39623963
};
39633964

39643965
// Get the base filter's structure
3965-
let Some(dynamic_filter_expr) =
3966-
expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>()
3967-
else {
3968-
return internal_err!(
3969-
"dynamic_filter_id present in proto, but the expression was not a DynamicFilterPhysicalExpr"
3970-
);
3971-
};
3966+
let dynamic_filter_expr = (expr as Arc<dyn Any + Send + Sync>).downcast::<DynamicFilterPhysicalExpr>()
3967+
.map_err(|_| internal_datafusion_err!("dynamic_filter_id present in proto, but the expression was not a DynamicFilterPhysicalExpr"))?;
39723968
expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)?)
39733969
as Arc<dyn PhysicalExpr>;
39743970
} else {

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo
3636
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr,
40-
IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr,
41-
UnKnownColumn,
39+
BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr,
40+
DynamicFilterSnapshot, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
41+
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4242
};
4343
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4444
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -330,7 +330,7 @@ pub fn serialize_physical_expr_with_converter(
330330
})
331331
} else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
332332
// Capture all state atomically
333-
let snapshot = df.current_snapshot();
333+
let snapshot = DynamicFilterSnapshot::from(df);
334334

335335
let children = snapshot
336336
.children()

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ use crate::cases::{
130130
MyRegexUdfNode,
131131
};
132132

133-
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
133+
use datafusion_physical_expr::expressions::{
134+
DynamicFilterPhysicalExpr, DynamicFilterSnapshot,
135+
};
134136
use datafusion_physical_expr::utils::reassign_expr_columns;
135137

136138
/// Perform a serde roundtrip and assert that the string representation of the before and after plans
@@ -3066,8 +3068,8 @@ fn test_dynamic_filter_roundtrip() -> Result<()> {
30663068
.expect("Should be DynamicFilterPhysicalExpr");
30673069

30683070
assert_eq!(
3069-
df.current_snapshot().to_string(),
3070-
deserialized_df.current_snapshot().to_string(),
3071+
DynamicFilterSnapshot::from(df).to_string(),
3072+
DynamicFilterSnapshot::from(deserialized_df).to_string(),
30713073
"Snapshots should be equal"
30723074
);
30733075

@@ -3221,12 +3223,12 @@ fn test_deduplication_of_dynamic_filter_expression(
32213223
.unwrap();
32223224

32233225
assert_eq!(
3224-
filter_1_before_roundtrip.current_snapshot().to_string(),
3225-
df1.current_snapshot().to_string()
3226+
DynamicFilterSnapshot::from(filter_1_before_roundtrip).to_string(),
3227+
DynamicFilterSnapshot::from(df1).to_string()
32263228
);
32273229
assert_eq!(
3228-
filter_2_before_roundtrip.current_snapshot().to_string(),
3229-
df2.current_snapshot().to_string()
3230+
DynamicFilterSnapshot::from(filter_2_before_roundtrip).to_string(),
3231+
DynamicFilterSnapshot::from(df2).to_string()
32303232
);
32313233

32323234
Ok(())

0 commit comments

Comments
 (0)