|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
| 18 | +use std::cell::RefCell; |
| 19 | +use std::collections::HashMap; |
18 | 20 | use std::fmt::Debug; |
| 21 | +use std::hash::{DefaultHasher, Hash, Hasher}; |
19 | 22 | use std::sync::Arc; |
20 | 23 |
|
21 | 24 | use arrow::compute::SortOptions; |
@@ -2977,6 +2980,7 @@ impl protobuf::PhysicalPlanNode { |
2977 | 2980 | nulls_first: expr.options.nulls_first, |
2978 | 2981 | }); |
2979 | 2982 | Ok(protobuf::PhysicalExprNode { |
| 2983 | + expr_id: None, |
2980 | 2984 | expr_type: Some(ExprType::Sort(sort_expr)), |
2981 | 2985 | }) |
2982 | 2986 | }) |
@@ -3062,6 +3066,7 @@ impl protobuf::PhysicalPlanNode { |
3062 | 3066 | nulls_first: expr.options.nulls_first, |
3063 | 3067 | }); |
3064 | 3068 | Ok(protobuf::PhysicalExprNode { |
| 3069 | + expr_id: None, |
3065 | 3070 | expr_type: Some(ExprType::Sort(sort_expr)), |
3066 | 3071 | }) |
3067 | 3072 | }) |
@@ -3696,6 +3701,217 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { |
3696 | 3701 | } |
3697 | 3702 | } |
3698 | 3703 |
|
| 3704 | +/// Internal serializer that adds expr_id to expressions. |
| 3705 | +/// Created fresh for each serialization operation. |
| 3706 | +struct DeduplicatingSerializer { |
| 3707 | + /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. |
| 3708 | + session_id: u64, |
| 3709 | +} |
| 3710 | + |
| 3711 | +impl DeduplicatingSerializer { |
| 3712 | + fn new() -> Self { |
| 3713 | + Self { |
| 3714 | + session_id: rand::random(), |
| 3715 | + } |
| 3716 | + } |
| 3717 | +} |
| 3718 | + |
| 3719 | +impl PhysicalProtoConverterExtension for DeduplicatingSerializer { |
| 3720 | + fn proto_to_execution_plan( |
| 3721 | + &self, |
| 3722 | + _ctx: &TaskContext, |
| 3723 | + _codec: &dyn PhysicalExtensionCodec, |
| 3724 | + _proto: &protobuf::PhysicalPlanNode, |
| 3725 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3726 | + internal_err!("DeduplicatingSerializer cannot deserialize execution plans") |
| 3727 | + } |
| 3728 | + |
| 3729 | + fn execution_plan_to_proto( |
| 3730 | + &self, |
| 3731 | + plan: &Arc<dyn ExecutionPlan>, |
| 3732 | + codec: &dyn PhysicalExtensionCodec, |
| 3733 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3734 | + where |
| 3735 | + Self: Sized, |
| 3736 | + { |
| 3737 | + protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( |
| 3738 | + Arc::clone(plan), |
| 3739 | + codec, |
| 3740 | + self, |
| 3741 | + ) |
| 3742 | + } |
| 3743 | + |
| 3744 | + fn proto_to_physical_expr( |
| 3745 | + &self, |
| 3746 | + _proto: &protobuf::PhysicalExprNode, |
| 3747 | + _ctx: &TaskContext, |
| 3748 | + _input_schema: &Schema, |
| 3749 | + _codec: &dyn PhysicalExtensionCodec, |
| 3750 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3751 | + where |
| 3752 | + Self: Sized, |
| 3753 | + { |
| 3754 | + internal_err!("DeduplicatingSerializer cannot deserialize physical expressions") |
| 3755 | + } |
| 3756 | + |
| 3757 | + fn physical_expr_to_proto( |
| 3758 | + &self, |
| 3759 | + expr: &Arc<dyn PhysicalExpr>, |
| 3760 | + codec: &dyn PhysicalExtensionCodec, |
| 3761 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3762 | + let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; |
| 3763 | + |
| 3764 | + // Hash session_id, pointer address, and process ID together to create expr_id. |
| 3765 | + // - session_id: random per serializer, prevents collisions when merging serializations |
| 3766 | + // - ptr: unique address per Arc within a process |
| 3767 | + // - pid: prevents collisions if serializer is shared across processes |
| 3768 | + let mut hasher = DefaultHasher::new(); |
| 3769 | + self.session_id.hash(&mut hasher); |
| 3770 | + (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); |
| 3771 | + std::process::id().hash(&mut hasher); |
| 3772 | + proto.expr_id = Some(hasher.finish()); |
| 3773 | + |
| 3774 | + Ok(proto) |
| 3775 | + } |
| 3776 | +} |
| 3777 | + |
| 3778 | +/// Internal deserializer that caches expressions by expr_id. |
| 3779 | +/// Created fresh for each deserialization operation. |
| 3780 | +#[derive(Default)] |
| 3781 | +struct DeduplicatingDeserializer { |
| 3782 | + /// Cache mapping expr_id to deserialized expressions. |
| 3783 | + cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>, |
| 3784 | +} |
| 3785 | + |
| 3786 | +impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { |
| 3787 | + fn proto_to_execution_plan( |
| 3788 | + &self, |
| 3789 | + ctx: &TaskContext, |
| 3790 | + codec: &dyn PhysicalExtensionCodec, |
| 3791 | + proto: &protobuf::PhysicalPlanNode, |
| 3792 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3793 | + proto.try_into_physical_plan_with_converter(ctx, codec, self) |
| 3794 | + } |
| 3795 | + |
| 3796 | + fn execution_plan_to_proto( |
| 3797 | + &self, |
| 3798 | + _plan: &Arc<dyn ExecutionPlan>, |
| 3799 | + _codec: &dyn PhysicalExtensionCodec, |
| 3800 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3801 | + where |
| 3802 | + Self: Sized, |
| 3803 | + { |
| 3804 | + internal_err!("DeduplicatingDeserializer cannot serialize execution plans") |
| 3805 | + } |
| 3806 | + |
| 3807 | + fn proto_to_physical_expr( |
| 3808 | + &self, |
| 3809 | + proto: &protobuf::PhysicalExprNode, |
| 3810 | + ctx: &TaskContext, |
| 3811 | + input_schema: &Schema, |
| 3812 | + codec: &dyn PhysicalExtensionCodec, |
| 3813 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3814 | + where |
| 3815 | + Self: Sized, |
| 3816 | + { |
| 3817 | + if let Some(expr_id) = proto.expr_id { |
| 3818 | + // Check cache first |
| 3819 | + if let Some(cached) = self.cache.borrow().get(&expr_id) { |
| 3820 | + return Ok(Arc::clone(cached)); |
| 3821 | + } |
| 3822 | + // Deserialize and cache |
| 3823 | + let expr = parse_physical_expr_with_converter( |
| 3824 | + proto, |
| 3825 | + ctx, |
| 3826 | + input_schema, |
| 3827 | + codec, |
| 3828 | + self, |
| 3829 | + )?; |
| 3830 | + self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); |
| 3831 | + Ok(expr) |
| 3832 | + } else { |
| 3833 | + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) |
| 3834 | + } |
| 3835 | + } |
| 3836 | + |
| 3837 | + fn physical_expr_to_proto( |
| 3838 | + &self, |
| 3839 | + _expr: &Arc<dyn PhysicalExpr>, |
| 3840 | + _codec: &dyn PhysicalExtensionCodec, |
| 3841 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3842 | + internal_err!("DeduplicatingDeserializer cannot serialize physical expressions") |
| 3843 | + } |
| 3844 | +} |
| 3845 | + |
| 3846 | +/// A proto converter that adds expression deduplication during serialization |
| 3847 | +/// and deserialization. |
| 3848 | +/// |
| 3849 | +/// During serialization, each expression's Arc pointer address is XORed with a |
| 3850 | +/// random session_id to create a salted `expr_id`. This prevents cross-process |
| 3851 | +/// collisions when serialized plans are merged. |
| 3852 | +/// |
| 3853 | +/// During deserialization, expressions with the same `expr_id` share the same |
| 3854 | +/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large |
| 3855 | +/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. |
| 3856 | +/// |
| 3857 | +/// This converter is stateless - it creates internal serializers/deserializers |
| 3858 | +/// on demand for each operation. |
| 3859 | +/// |
| 3860 | +/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html |
| 3861 | +#[derive(Debug, Default, Clone, Copy)] |
| 3862 | +pub struct DeduplicatingProtoConverter {} |
| 3863 | + |
| 3864 | +impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { |
| 3865 | + fn proto_to_execution_plan( |
| 3866 | + &self, |
| 3867 | + ctx: &TaskContext, |
| 3868 | + codec: &dyn PhysicalExtensionCodec, |
| 3869 | + proto: &protobuf::PhysicalPlanNode, |
| 3870 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3871 | + let deserializer = DeduplicatingDeserializer::default(); |
| 3872 | + proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer) |
| 3873 | + } |
| 3874 | + |
| 3875 | + fn execution_plan_to_proto( |
| 3876 | + &self, |
| 3877 | + plan: &Arc<dyn ExecutionPlan>, |
| 3878 | + codec: &dyn PhysicalExtensionCodec, |
| 3879 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3880 | + where |
| 3881 | + Self: Sized, |
| 3882 | + { |
| 3883 | + let serializer = DeduplicatingSerializer::new(); |
| 3884 | + protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( |
| 3885 | + Arc::clone(plan), |
| 3886 | + codec, |
| 3887 | + &serializer, |
| 3888 | + ) |
| 3889 | + } |
| 3890 | + |
| 3891 | + fn proto_to_physical_expr( |
| 3892 | + &self, |
| 3893 | + proto: &protobuf::PhysicalExprNode, |
| 3894 | + ctx: &TaskContext, |
| 3895 | + input_schema: &Schema, |
| 3896 | + codec: &dyn PhysicalExtensionCodec, |
| 3897 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3898 | + where |
| 3899 | + Self: Sized, |
| 3900 | + { |
| 3901 | + let deserializer = DeduplicatingDeserializer::default(); |
| 3902 | + deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec) |
| 3903 | + } |
| 3904 | + |
| 3905 | + fn physical_expr_to_proto( |
| 3906 | + &self, |
| 3907 | + expr: &Arc<dyn PhysicalExpr>, |
| 3908 | + codec: &dyn PhysicalExtensionCodec, |
| 3909 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3910 | + let serializer = DeduplicatingSerializer::new(); |
| 3911 | + serializer.physical_expr_to_proto(expr, codec) |
| 3912 | + } |
| 3913 | +} |
| 3914 | + |
3699 | 3915 | /// A PhysicalExtensionCodec that tries one of multiple inner codecs |
3700 | 3916 | /// until one works |
3701 | 3917 | #[derive(Debug)] |
|
0 commit comments