Skip to content

Commit 5f57a8c

Browse files
committed
Preserve PhysicalExpr graph shape during proto round trip
Replaces apache#18192
1 parent f819061 commit 5f57a8c

9 files changed

Lines changed: 340 additions & 25 deletions

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,20 @@ message PhysicalExprNode {
838838
// Was date_time_interval_expr
839839
reserved 17;
840840

841+
// Arc pointer address from source plan for deduplication during deserialization.
842+
// When serializing, this is set to Arc::as_ptr(expr) as u64.
843+
// When deserializing, if this ID has been seen before, the cached Arc is returned
844+
// instead of creating a new one, enabling expression sharing.
845+
//
846+
// IMPORTANT: This field is ONLY valid as a deduplication key within a single
847+
// serialized plan from a single process. It MUST NOT be used to deduplicate
848+
// across different plans or across different nodes/processes, as Arc pointer
849+
// addresses can collide (the same address may be reused for different allocations
850+
// in different processes, or even within the same process over time).
851+
//
852+
// The deserializer should use a fresh dedup cache for each plan it deserializes.
853+
optional uint64 expr_arc_id = 30;
854+
841855
oneof ExprType {
842856
// column references
843857
PhysicalColumn column = 1;

datafusion/proto/src/bytes/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,15 @@ pub fn logical_plan_from_json_with_extension_codec(
277277
/// Serialize a PhysicalPlan as bytes
278278
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
279279
let extension_codec = DefaultPhysicalExtensionCodec {};
280-
let proto_converter = DefaultPhysicalProtoConverter {};
280+
let proto_converter = DefaultPhysicalProtoConverter::new();
281281
physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
282282
}
283283

284284
/// Serialize a PhysicalPlan as JSON
285285
#[cfg(feature = "json")]
286286
pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
287287
let extension_codec = DefaultPhysicalExtensionCodec {};
288-
let proto_converter = DefaultPhysicalProtoConverter {};
288+
let proto_converter = DefaultPhysicalProtoConverter::new();
289289
let protobuf = proto_converter
290290
.execution_plan_to_proto(&plan, &extension_codec)
291291
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
@@ -298,7 +298,7 @@ pub fn physical_plan_to_bytes_with_extension_codec(
298298
plan: Arc<dyn ExecutionPlan>,
299299
extension_codec: &dyn PhysicalExtensionCodec,
300300
) -> Result<Bytes> {
301-
let proto_converter = DefaultPhysicalProtoConverter {};
301+
let proto_converter = DefaultPhysicalProtoConverter::new();
302302
physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
303303
}
304304

@@ -326,7 +326,7 @@ pub fn physical_plan_from_json(
326326
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
327327
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
328328
let extension_codec = DefaultPhysicalExtensionCodec {};
329-
let proto_converter = DefaultPhysicalProtoConverter {};
329+
let proto_converter = DefaultPhysicalProtoConverter::new();
330330
proto_converter.proto_to_execution_plan(ctx, &extension_codec, &back)
331331
}
332332

@@ -336,7 +336,7 @@ pub fn physical_plan_from_bytes(
336336
ctx: &TaskContext,
337337
) -> Result<Arc<dyn ExecutionPlan>> {
338338
let extension_codec = DefaultPhysicalExtensionCodec {};
339-
let proto_converter = DefaultPhysicalProtoConverter {};
339+
let proto_converter = DefaultPhysicalProtoConverter::new();
340340
physical_plan_from_bytes_with_proto_converter(
341341
bytes,
342342
ctx,
@@ -351,7 +351,7 @@ pub fn physical_plan_from_bytes_with_extension_codec(
351351
ctx: &TaskContext,
352352
extension_codec: &dyn PhysicalExtensionCodec,
353353
) -> Result<Arc<dyn ExecutionPlan>> {
354-
let proto_converter = DefaultPhysicalProtoConverter {};
354+
let proto_converter = DefaultPhysicalProtoConverter::new();
355355
physical_plan_from_bytes_with_proto_converter(
356356
bytes,
357357
ctx,

datafusion/proto/src/generated/pbjson.rs

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ pub fn parse_physical_expr(
240240
ctx,
241241
input_schema,
242242
codec,
243-
&DefaultPhysicalProtoConverter {},
243+
&DefaultPhysicalProtoConverter::new(),
244244
)
245245
}
246246

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
1819
use std::fmt::Debug;
19-
use std::sync::Arc;
20+
use std::sync::{Arc, RwLock};
2021

2122
use arrow::compute::SortOptions;
2223
use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
@@ -135,7 +136,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
135136
self.try_into_physical_plan_with_converter(
136137
ctx,
137138
codec,
138-
&DefaultPhysicalProtoConverter {},
139+
&DefaultPhysicalProtoConverter::new(),
139140
)
140141
}
141142

@@ -149,7 +150,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
149150
Self::try_from_physical_plan_with_converter(
150151
plan,
151152
codec,
152-
&DefaultPhysicalProtoConverter {},
153+
&DefaultPhysicalProtoConverter::new(),
153154
)
154155
}
155156
}
@@ -2991,6 +2992,7 @@ impl protobuf::PhysicalPlanNode {
29912992
nulls_first: expr.options.nulls_first,
29922993
});
29932994
Ok(protobuf::PhysicalExprNode {
2995+
expr_arc_id: None,
29942996
expr_type: Some(ExprType::Sort(sort_expr)),
29952997
})
29962998
})
@@ -3076,6 +3078,7 @@ impl protobuf::PhysicalPlanNode {
30763078
nulls_first: expr.options.nulls_first,
30773079
});
30783080
Ok(protobuf::PhysicalExprNode {
3081+
expr_arc_id: None,
30793082
expr_type: Some(ExprType::Sort(sort_expr)),
30803083
})
30813084
})
@@ -3661,7 +3664,44 @@ struct DataEncoderTuple {
36613664
pub blob: Vec<u8>,
36623665
}
36633666

3664-
pub struct DefaultPhysicalProtoConverter;
3667+
/// Default implementation of [`PhysicalProtoConverterExtension`] that provides
3668+
/// expression deduplication during deserialization.
3669+
///
3670+
/// During serialization, the Arc pointer address of each expression is embedded
3671+
/// in the protobuf as `expr_arc_id`. During deserialization, if an expression
3672+
/// with the same `expr_arc_id` has been seen before, the cached Arc is returned
3673+
/// instead of creating a new one. This enables expression sharing and can
3674+
/// significantly reduce memory usage for plans with duplicate expressions
3675+
/// (e.g., large IN lists).
3676+
///
3677+
/// # Important: Scope of Deduplication
3678+
///
3679+
/// The `expr_arc_id` is only valid as a deduplication key **within a single
3680+
/// serialized plan from a single process**. Arc pointer addresses can collide:
3681+
/// - Different processes may allocate Arcs at the same address
3682+
/// - The same process may reuse addresses after deallocation
3683+
///
3684+
/// Therefore, you **must create a fresh `DefaultPhysicalProtoConverter` instance
3685+
/// for each plan you deserialize**. Do not reuse the same converter instance
3686+
/// across multiple plans from different sources, as this could incorrectly
3687+
/// deduplicate unrelated expressions that happen to share the same pointer address.
3688+
#[derive(Default)]
3689+
pub struct DefaultPhysicalProtoConverter {
3690+
/// Cache for expression deduplication during deserialization.
3691+
/// Maps expr_arc_id (the original Arc pointer address) to the deserialized expression.
3692+
///
3693+
/// This cache should only be used for a single plan deserialization.
3694+
/// Create a new converter instance for each plan to avoid cross-plan collisions.
3695+
dedup_cache: RwLock<HashMap<u64, Arc<dyn PhysicalExpr>>>,
3696+
}
3697+
3698+
impl DefaultPhysicalProtoConverter {
3699+
/// Creates a new `DefaultPhysicalProtoConverter` with an empty dedup cache.
3700+
pub fn new() -> Self {
3701+
Self::default()
3702+
}
3703+
}
3704+
36653705
impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
36663706
fn proto_to_execution_plan(
36673707
&self,
@@ -3697,16 +3737,48 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
36973737
where
36983738
Self: Sized,
36993739
{
3700-
// Default implementation calls the free function
3701-
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3740+
// Check if we've seen this expr_arc_id before (deduplication)
3741+
if let Some(arc_id) = proto.expr_arc_id {
3742+
// Try to get from cache first
3743+
{
3744+
let cache = self.dedup_cache.read().unwrap();
3745+
if let Some(cached) = cache.get(&arc_id) {
3746+
return Ok(Arc::clone(cached));
3747+
}
3748+
}
3749+
3750+
// Not in cache, deserialize the expression
3751+
let expr = parse_physical_expr_with_converter(
3752+
proto,
3753+
ctx,
3754+
input_schema,
3755+
codec,
3756+
self,
3757+
)?;
3758+
3759+
// Cache it for future lookups
3760+
{
3761+
let mut cache = self.dedup_cache.write().unwrap();
3762+
cache.insert(arc_id, Arc::clone(&expr));
3763+
}
3764+
3765+
Ok(expr)
3766+
} else {
3767+
// No arc_id, just deserialize normally (backward compatibility)
3768+
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3769+
}
37023770
}
37033771

37043772
fn physical_expr_to_proto(
37053773
&self,
37063774
expr: &Arc<dyn PhysicalExpr>,
37073775
codec: &dyn PhysicalExtensionCodec,
37083776
) -> Result<protobuf::PhysicalExprNode> {
3709-
serialize_physical_expr_with_converter(expr, codec, self)
3777+
let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3778+
// Set the expr_arc_id to the Arc pointer address for deduplication
3779+
// Cast through a thin pointer to get a unique identifier for this Arc
3780+
proto.expr_arc_id = Some(Arc::as_ptr(expr) as *const () as u64);
3781+
Ok(proto)
37103782
}
37113783
}
37123784

0 commit comments

Comments
 (0)