Skip to content

Commit e1b387a

Browse files
adriangbjayshrivastava
authored andcommitted
Preserve PhysicalExpr graph in proto round trip using Arc pointers as unique identifiers (apache#20037)
Replaces apache#18192 using the APIs in apache#19437. Similar to apache#18192 the end goal here is specifically to enable deduplication of `DynamicFilterPhysicalExpr` so that distributed query engines can get one step closer to using dynamic filters. Because it's actually simpler we apply this deduplication to all `PhysicalExpr`s with the added benefit that we more faithfully preserve the original expression tree (instead of adding new duplicate branches) which will have the immediate impact of e.g. not duplicating large `InListExpr`s.
1 parent 120b151 commit e1b387a

8 files changed

Lines changed: 724 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ datafusion-proto-common = { workspace = true }
6969
object_store = { workspace = true }
7070
pbjson = { workspace = true, optional = true }
7171
prost = { workspace = true }
72+
rand = { workspace = true }
7273
serde = { version = "1.0", optional = true }
7374
serde_json = { workspace = true, optional = true }
7475

datafusion/proto/proto/datafusion.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,14 @@ message PhysicalExprNode {
837837
// Was date_time_interval_expr
838838
reserved 17;
839839

840+
// Unique identifier for this expression to do deduplication during deserialization.
841+
// When serializing, this is set to a unique identifier for each combination of
842+
// expression, process and serialization run.
843+
// When deserializing, if this ID has been seen before, the cached Arc is returned
844+
// instead of creating a new one, enabling reconstruction of referential integrity
845+
// across serde roundtrips.
846+
optional uint64 expr_id = 30;
847+
840848
oneof ExprType {
841849
// column references
842850
PhysicalColumn column = 1;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::cell::RefCell;
19+
use std::collections::HashMap;
1820
use std::fmt::Debug;
21+
use std::hash::{DefaultHasher, Hash, Hasher};
1922
use std::sync::Arc;
2023

2124
use arrow::compute::SortOptions;
@@ -2977,6 +2980,7 @@ impl protobuf::PhysicalPlanNode {
29772980
nulls_first: expr.options.nulls_first,
29782981
});
29792982
Ok(protobuf::PhysicalExprNode {
2983+
expr_id: None,
29802984
expr_type: Some(ExprType::Sort(sort_expr)),
29812985
})
29822986
})
@@ -3062,6 +3066,7 @@ impl protobuf::PhysicalPlanNode {
30623066
nulls_first: expr.options.nulls_first,
30633067
});
30643068
Ok(protobuf::PhysicalExprNode {
3069+
expr_id: None,
30653070
expr_type: Some(ExprType::Sort(sort_expr)),
30663071
})
30673072
})
@@ -3696,6 +3701,217 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
36963701
}
36973702
}
36983703

3704+
/// Internal serializer that adds expr_id to expressions.
3705+
/// Created fresh for each serialization operation.
3706+
struct DeduplicatingSerializer {
3707+
/// Random salt combined with pointer addresses and process ID to create globally unique expr_ids.
3708+
session_id: u64,
3709+
}
3710+
3711+
impl DeduplicatingSerializer {
3712+
fn new() -> Self {
3713+
Self {
3714+
session_id: rand::random(),
3715+
}
3716+
}
3717+
}
3718+
3719+
impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
3720+
fn proto_to_execution_plan(
3721+
&self,
3722+
_ctx: &TaskContext,
3723+
_codec: &dyn PhysicalExtensionCodec,
3724+
_proto: &protobuf::PhysicalPlanNode,
3725+
) -> Result<Arc<dyn ExecutionPlan>> {
3726+
internal_err!("DeduplicatingSerializer cannot deserialize execution plans")
3727+
}
3728+
3729+
fn execution_plan_to_proto(
3730+
&self,
3731+
plan: &Arc<dyn ExecutionPlan>,
3732+
codec: &dyn PhysicalExtensionCodec,
3733+
) -> Result<protobuf::PhysicalPlanNode>
3734+
where
3735+
Self: Sized,
3736+
{
3737+
protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3738+
Arc::clone(plan),
3739+
codec,
3740+
self,
3741+
)
3742+
}
3743+
3744+
fn proto_to_physical_expr(
3745+
&self,
3746+
_proto: &protobuf::PhysicalExprNode,
3747+
_ctx: &TaskContext,
3748+
_input_schema: &Schema,
3749+
_codec: &dyn PhysicalExtensionCodec,
3750+
) -> Result<Arc<dyn PhysicalExpr>>
3751+
where
3752+
Self: Sized,
3753+
{
3754+
internal_err!("DeduplicatingSerializer cannot deserialize physical expressions")
3755+
}
3756+
3757+
fn physical_expr_to_proto(
3758+
&self,
3759+
expr: &Arc<dyn PhysicalExpr>,
3760+
codec: &dyn PhysicalExtensionCodec,
3761+
) -> Result<protobuf::PhysicalExprNode> {
3762+
let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3763+
3764+
// Hash session_id, pointer address, and process ID together to create expr_id.
3765+
// - session_id: random per serializer, prevents collisions when merging serializations
3766+
// - ptr: unique address per Arc within a process
3767+
// - pid: prevents collisions if serializer is shared across processes
3768+
let mut hasher = DefaultHasher::new();
3769+
self.session_id.hash(&mut hasher);
3770+
(Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
3771+
std::process::id().hash(&mut hasher);
3772+
proto.expr_id = Some(hasher.finish());
3773+
3774+
Ok(proto)
3775+
}
3776+
}
3777+
3778+
/// Internal deserializer that caches expressions by expr_id.
3779+
/// Created fresh for each deserialization operation.
3780+
#[derive(Default)]
3781+
struct DeduplicatingDeserializer {
3782+
/// Cache mapping expr_id to deserialized expressions.
3783+
cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
3784+
}
3785+
3786+
impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
3787+
fn proto_to_execution_plan(
3788+
&self,
3789+
ctx: &TaskContext,
3790+
codec: &dyn PhysicalExtensionCodec,
3791+
proto: &protobuf::PhysicalPlanNode,
3792+
) -> Result<Arc<dyn ExecutionPlan>> {
3793+
proto.try_into_physical_plan_with_converter(ctx, codec, self)
3794+
}
3795+
3796+
fn execution_plan_to_proto(
3797+
&self,
3798+
_plan: &Arc<dyn ExecutionPlan>,
3799+
_codec: &dyn PhysicalExtensionCodec,
3800+
) -> Result<protobuf::PhysicalPlanNode>
3801+
where
3802+
Self: Sized,
3803+
{
3804+
internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
3805+
}
3806+
3807+
fn proto_to_physical_expr(
3808+
&self,
3809+
proto: &protobuf::PhysicalExprNode,
3810+
ctx: &TaskContext,
3811+
input_schema: &Schema,
3812+
codec: &dyn PhysicalExtensionCodec,
3813+
) -> Result<Arc<dyn PhysicalExpr>>
3814+
where
3815+
Self: Sized,
3816+
{
3817+
if let Some(expr_id) = proto.expr_id {
3818+
// Check cache first
3819+
if let Some(cached) = self.cache.borrow().get(&expr_id) {
3820+
return Ok(Arc::clone(cached));
3821+
}
3822+
// Deserialize and cache
3823+
let expr = parse_physical_expr_with_converter(
3824+
proto,
3825+
ctx,
3826+
input_schema,
3827+
codec,
3828+
self,
3829+
)?;
3830+
self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
3831+
Ok(expr)
3832+
} else {
3833+
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3834+
}
3835+
}
3836+
3837+
fn physical_expr_to_proto(
3838+
&self,
3839+
_expr: &Arc<dyn PhysicalExpr>,
3840+
_codec: &dyn PhysicalExtensionCodec,
3841+
) -> Result<protobuf::PhysicalExprNode> {
3842+
internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
3843+
}
3844+
}
3845+
3846+
/// A proto converter that adds expression deduplication during serialization
3847+
/// and deserialization.
3848+
///
3849+
/// During serialization, each expression's Arc pointer address is XORed with a
3850+
/// random session_id to create a salted `expr_id`. This prevents cross-process
3851+
/// collisions when serialized plans are merged.
3852+
///
3853+
/// During deserialization, expressions with the same `expr_id` share the same
3854+
/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large
3855+
/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances.
3856+
///
3857+
/// This converter is stateless - it creates internal serializers/deserializers
3858+
/// on demand for each operation.
3859+
///
3860+
/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html
3861+
#[derive(Debug, Default, Clone, Copy)]
3862+
pub struct DeduplicatingProtoConverter {}
3863+
3864+
impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
3865+
fn proto_to_execution_plan(
3866+
&self,
3867+
ctx: &TaskContext,
3868+
codec: &dyn PhysicalExtensionCodec,
3869+
proto: &protobuf::PhysicalPlanNode,
3870+
) -> Result<Arc<dyn ExecutionPlan>> {
3871+
let deserializer = DeduplicatingDeserializer::default();
3872+
proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)
3873+
}
3874+
3875+
fn execution_plan_to_proto(
3876+
&self,
3877+
plan: &Arc<dyn ExecutionPlan>,
3878+
codec: &dyn PhysicalExtensionCodec,
3879+
) -> Result<protobuf::PhysicalPlanNode>
3880+
where
3881+
Self: Sized,
3882+
{
3883+
let serializer = DeduplicatingSerializer::new();
3884+
protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3885+
Arc::clone(plan),
3886+
codec,
3887+
&serializer,
3888+
)
3889+
}
3890+
3891+
fn proto_to_physical_expr(
3892+
&self,
3893+
proto: &protobuf::PhysicalExprNode,
3894+
ctx: &TaskContext,
3895+
input_schema: &Schema,
3896+
codec: &dyn PhysicalExtensionCodec,
3897+
) -> Result<Arc<dyn PhysicalExpr>>
3898+
where
3899+
Self: Sized,
3900+
{
3901+
let deserializer = DeduplicatingDeserializer::default();
3902+
deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec)
3903+
}
3904+
3905+
fn physical_expr_to_proto(
3906+
&self,
3907+
expr: &Arc<dyn PhysicalExpr>,
3908+
codec: &dyn PhysicalExtensionCodec,
3909+
) -> Result<protobuf::PhysicalExprNode> {
3910+
let serializer = DeduplicatingSerializer::new();
3911+
serializer.physical_expr_to_proto(expr, codec)
3912+
}
3913+
}
3914+
36993915
/// A PhysicalExtensionCodec that tries one of multiple inner codecs
37003916
/// until one works
37013917
#[derive(Debug)]

0 commit comments

Comments
 (0)