Skip to content

Commit 069173b

Browse files
adriangbde-bgunter
authored andcommitted
Preserve PhysicalExpr graph in proto round trip using Arc pointers as unique identifiers (apache#20037)
Replaces apache#18192 using the APIs in apache#19437. Similar to apache#18192 the end goal here is specifically to enable deduplication of `DynamicFilterPhysicalExpr` so that distributed query engines can get one step closer to using dynamic filters. Because it's actually simpler we apply this deduplication to all `PhysicalExpr`s with the added benefit that we more faithfully preserve the original expression tree (instead of adding new duplicate branches) which will have the immediate impact of e.g. not duplicating large `InListExpr`s.
1 parent 974010b commit 069173b

8 files changed

Lines changed: 724 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ datafusion-proto-common = { workspace = true }
6666
object_store = { workspace = true }
6767
pbjson = { workspace = true, optional = true }
6868
prost = { workspace = true }
69+
rand = { workspace = true }
6970
serde = { version = "1.0", optional = true }
7071
serde_json = { workspace = true, optional = true }
7172

datafusion/proto/proto/datafusion.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,14 @@ message PhysicalExprNode {
850850
// Was date_time_interval_expr
851851
reserved 17;
852852

853+
// Unique identifier for this expression to do deduplication during deserialization.
854+
// When serializing, this is set to a unique identifier for each combination of
855+
// expression, process and serialization run.
856+
// When deserializing, if this ID has been seen before, the cached Arc is returned
857+
// instead of creating a new one, enabling reconstruction of referential integrity
858+
// across serde roundtrips.
859+
optional uint64 expr_id = 30;
860+
853861
oneof ExprType {
854862
// column references
855863
PhysicalColumn column = 1;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::cell::RefCell;
19+
use std::collections::HashMap;
1820
use std::fmt::Debug;
21+
use std::hash::{DefaultHasher, Hash, Hasher};
1922
use std::sync::Arc;
2023

2124
use arrow::compute::SortOptions;
@@ -2993,6 +2996,7 @@ impl protobuf::PhysicalPlanNode {
29932996
nulls_first: expr.options.nulls_first,
29942997
});
29952998
Ok(protobuf::PhysicalExprNode {
2999+
expr_id: None,
29963000
expr_type: Some(ExprType::Sort(sort_expr)),
29973001
})
29983002
})
@@ -3078,6 +3082,7 @@ impl protobuf::PhysicalPlanNode {
30783082
nulls_first: expr.options.nulls_first,
30793083
});
30803084
Ok(protobuf::PhysicalExprNode {
3085+
expr_id: None,
30813086
expr_type: Some(ExprType::Sort(sort_expr)),
30823087
})
30833088
})
@@ -3712,6 +3717,217 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
37123717
}
37133718
}
37143719

3720+
/// Internal serializer that adds expr_id to expressions.
3721+
/// Created fresh for each serialization operation.
3722+
struct DeduplicatingSerializer {
3723+
/// Random salt combined with pointer addresses and process ID to create globally unique expr_ids.
3724+
session_id: u64,
3725+
}
3726+
3727+
impl DeduplicatingSerializer {
3728+
fn new() -> Self {
3729+
Self {
3730+
session_id: rand::random(),
3731+
}
3732+
}
3733+
}
3734+
3735+
impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
3736+
fn proto_to_execution_plan(
3737+
&self,
3738+
_ctx: &TaskContext,
3739+
_codec: &dyn PhysicalExtensionCodec,
3740+
_proto: &protobuf::PhysicalPlanNode,
3741+
) -> Result<Arc<dyn ExecutionPlan>> {
3742+
internal_err!("DeduplicatingSerializer cannot deserialize execution plans")
3743+
}
3744+
3745+
fn execution_plan_to_proto(
3746+
&self,
3747+
plan: &Arc<dyn ExecutionPlan>,
3748+
codec: &dyn PhysicalExtensionCodec,
3749+
) -> Result<protobuf::PhysicalPlanNode>
3750+
where
3751+
Self: Sized,
3752+
{
3753+
protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3754+
Arc::clone(plan),
3755+
codec,
3756+
self,
3757+
)
3758+
}
3759+
3760+
fn proto_to_physical_expr(
3761+
&self,
3762+
_proto: &protobuf::PhysicalExprNode,
3763+
_ctx: &TaskContext,
3764+
_input_schema: &Schema,
3765+
_codec: &dyn PhysicalExtensionCodec,
3766+
) -> Result<Arc<dyn PhysicalExpr>>
3767+
where
3768+
Self: Sized,
3769+
{
3770+
internal_err!("DeduplicatingSerializer cannot deserialize physical expressions")
3771+
}
3772+
3773+
fn physical_expr_to_proto(
3774+
&self,
3775+
expr: &Arc<dyn PhysicalExpr>,
3776+
codec: &dyn PhysicalExtensionCodec,
3777+
) -> Result<protobuf::PhysicalExprNode> {
3778+
let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3779+
3780+
// Hash session_id, pointer address, and process ID together to create expr_id.
3781+
// - session_id: random per serializer, prevents collisions when merging serializations
3782+
// - ptr: unique address per Arc within a process
3783+
// - pid: prevents collisions if serializer is shared across processes
3784+
let mut hasher = DefaultHasher::new();
3785+
self.session_id.hash(&mut hasher);
3786+
(Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
3787+
std::process::id().hash(&mut hasher);
3788+
proto.expr_id = Some(hasher.finish());
3789+
3790+
Ok(proto)
3791+
}
3792+
}
3793+
3794+
/// Internal deserializer that caches expressions by expr_id.
3795+
/// Created fresh for each deserialization operation.
3796+
#[derive(Default)]
3797+
struct DeduplicatingDeserializer {
3798+
/// Cache mapping expr_id to deserialized expressions.
3799+
cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
3800+
}
3801+
3802+
impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
3803+
fn proto_to_execution_plan(
3804+
&self,
3805+
ctx: &TaskContext,
3806+
codec: &dyn PhysicalExtensionCodec,
3807+
proto: &protobuf::PhysicalPlanNode,
3808+
) -> Result<Arc<dyn ExecutionPlan>> {
3809+
proto.try_into_physical_plan_with_converter(ctx, codec, self)
3810+
}
3811+
3812+
fn execution_plan_to_proto(
3813+
&self,
3814+
_plan: &Arc<dyn ExecutionPlan>,
3815+
_codec: &dyn PhysicalExtensionCodec,
3816+
) -> Result<protobuf::PhysicalPlanNode>
3817+
where
3818+
Self: Sized,
3819+
{
3820+
internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
3821+
}
3822+
3823+
fn proto_to_physical_expr(
3824+
&self,
3825+
proto: &protobuf::PhysicalExprNode,
3826+
ctx: &TaskContext,
3827+
input_schema: &Schema,
3828+
codec: &dyn PhysicalExtensionCodec,
3829+
) -> Result<Arc<dyn PhysicalExpr>>
3830+
where
3831+
Self: Sized,
3832+
{
3833+
if let Some(expr_id) = proto.expr_id {
3834+
// Check cache first
3835+
if let Some(cached) = self.cache.borrow().get(&expr_id) {
3836+
return Ok(Arc::clone(cached));
3837+
}
3838+
// Deserialize and cache
3839+
let expr = parse_physical_expr_with_converter(
3840+
proto,
3841+
ctx,
3842+
input_schema,
3843+
codec,
3844+
self,
3845+
)?;
3846+
self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
3847+
Ok(expr)
3848+
} else {
3849+
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
3850+
}
3851+
}
3852+
3853+
fn physical_expr_to_proto(
3854+
&self,
3855+
_expr: &Arc<dyn PhysicalExpr>,
3856+
_codec: &dyn PhysicalExtensionCodec,
3857+
) -> Result<protobuf::PhysicalExprNode> {
3858+
internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
3859+
}
3860+
}
3861+
3862+
/// A proto converter that adds expression deduplication during serialization
3863+
/// and deserialization.
3864+
///
3865+
/// During serialization, each expression's Arc pointer address is XORed with a
3866+
/// random session_id to create a salted `expr_id`. This prevents cross-process
3867+
/// collisions when serialized plans are merged.
3868+
///
3869+
/// During deserialization, expressions with the same `expr_id` share the same
3870+
/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large
3871+
/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances.
3872+
///
3873+
/// This converter is stateless - it creates internal serializers/deserializers
3874+
/// on demand for each operation.
3875+
///
3876+
/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html
3877+
#[derive(Debug, Default, Clone, Copy)]
3878+
pub struct DeduplicatingProtoConverter {}
3879+
3880+
impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
3881+
fn proto_to_execution_plan(
3882+
&self,
3883+
ctx: &TaskContext,
3884+
codec: &dyn PhysicalExtensionCodec,
3885+
proto: &protobuf::PhysicalPlanNode,
3886+
) -> Result<Arc<dyn ExecutionPlan>> {
3887+
let deserializer = DeduplicatingDeserializer::default();
3888+
proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)
3889+
}
3890+
3891+
fn execution_plan_to_proto(
3892+
&self,
3893+
plan: &Arc<dyn ExecutionPlan>,
3894+
codec: &dyn PhysicalExtensionCodec,
3895+
) -> Result<protobuf::PhysicalPlanNode>
3896+
where
3897+
Self: Sized,
3898+
{
3899+
let serializer = DeduplicatingSerializer::new();
3900+
protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3901+
Arc::clone(plan),
3902+
codec,
3903+
&serializer,
3904+
)
3905+
}
3906+
3907+
fn proto_to_physical_expr(
3908+
&self,
3909+
proto: &protobuf::PhysicalExprNode,
3910+
ctx: &TaskContext,
3911+
input_schema: &Schema,
3912+
codec: &dyn PhysicalExtensionCodec,
3913+
) -> Result<Arc<dyn PhysicalExpr>>
3914+
where
3915+
Self: Sized,
3916+
{
3917+
let deserializer = DeduplicatingDeserializer::default();
3918+
deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec)
3919+
}
3920+
3921+
fn physical_expr_to_proto(
3922+
&self,
3923+
expr: &Arc<dyn PhysicalExpr>,
3924+
codec: &dyn PhysicalExtensionCodec,
3925+
) -> Result<protobuf::PhysicalExprNode> {
3926+
let serializer = DeduplicatingSerializer::new();
3927+
serializer.physical_expr_to_proto(expr, codec)
3928+
}
3929+
}
3930+
37153931
/// A PhysicalExtensionCodec that tries one of multiple inner codecs
37163932
/// until one works
37173933
#[derive(Debug)]

0 commit comments

Comments
 (0)