|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
| 18 | +use std::cell::RefCell; |
| 19 | +use std::collections::HashMap; |
18 | 20 | use std::fmt::Debug; |
| 21 | +use std::hash::{DefaultHasher, Hash, Hasher}; |
19 | 22 | use std::sync::Arc; |
20 | 23 |
|
21 | 24 | use arrow::compute::SortOptions; |
@@ -2993,6 +2996,7 @@ impl protobuf::PhysicalPlanNode { |
2993 | 2996 | nulls_first: expr.options.nulls_first, |
2994 | 2997 | }); |
2995 | 2998 | Ok(protobuf::PhysicalExprNode { |
| 2999 | + expr_id: None, |
2996 | 3000 | expr_type: Some(ExprType::Sort(sort_expr)), |
2997 | 3001 | }) |
2998 | 3002 | }) |
@@ -3078,6 +3082,7 @@ impl protobuf::PhysicalPlanNode { |
3078 | 3082 | nulls_first: expr.options.nulls_first, |
3079 | 3083 | }); |
3080 | 3084 | Ok(protobuf::PhysicalExprNode { |
| 3085 | + expr_id: None, |
3081 | 3086 | expr_type: Some(ExprType::Sort(sort_expr)), |
3082 | 3087 | }) |
3083 | 3088 | }) |
@@ -3712,6 +3717,217 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { |
3712 | 3717 | } |
3713 | 3718 | } |
3714 | 3719 |
|
| 3720 | +/// Internal serializer that adds expr_id to expressions. |
| 3721 | +/// Created fresh for each serialization operation. |
| 3722 | +struct DeduplicatingSerializer { |
| 3723 | + /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. |
| 3724 | + session_id: u64, |
| 3725 | +} |
| 3726 | + |
| 3727 | +impl DeduplicatingSerializer { |
| 3728 | + fn new() -> Self { |
| 3729 | + Self { |
| 3730 | + session_id: rand::random(), |
| 3731 | + } |
| 3732 | + } |
| 3733 | +} |
| 3734 | + |
| 3735 | +impl PhysicalProtoConverterExtension for DeduplicatingSerializer { |
| 3736 | + fn proto_to_execution_plan( |
| 3737 | + &self, |
| 3738 | + _ctx: &TaskContext, |
| 3739 | + _codec: &dyn PhysicalExtensionCodec, |
| 3740 | + _proto: &protobuf::PhysicalPlanNode, |
| 3741 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3742 | + internal_err!("DeduplicatingSerializer cannot deserialize execution plans") |
| 3743 | + } |
| 3744 | + |
| 3745 | + fn execution_plan_to_proto( |
| 3746 | + &self, |
| 3747 | + plan: &Arc<dyn ExecutionPlan>, |
| 3748 | + codec: &dyn PhysicalExtensionCodec, |
| 3749 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3750 | + where |
| 3751 | + Self: Sized, |
| 3752 | + { |
| 3753 | + protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( |
| 3754 | + Arc::clone(plan), |
| 3755 | + codec, |
| 3756 | + self, |
| 3757 | + ) |
| 3758 | + } |
| 3759 | + |
| 3760 | + fn proto_to_physical_expr( |
| 3761 | + &self, |
| 3762 | + _proto: &protobuf::PhysicalExprNode, |
| 3763 | + _ctx: &TaskContext, |
| 3764 | + _input_schema: &Schema, |
| 3765 | + _codec: &dyn PhysicalExtensionCodec, |
| 3766 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3767 | + where |
| 3768 | + Self: Sized, |
| 3769 | + { |
| 3770 | + internal_err!("DeduplicatingSerializer cannot deserialize physical expressions") |
| 3771 | + } |
| 3772 | + |
| 3773 | + fn physical_expr_to_proto( |
| 3774 | + &self, |
| 3775 | + expr: &Arc<dyn PhysicalExpr>, |
| 3776 | + codec: &dyn PhysicalExtensionCodec, |
| 3777 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3778 | + let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; |
| 3779 | + |
| 3780 | + // Hash session_id, pointer address, and process ID together to create expr_id. |
| 3781 | + // - session_id: random per serializer, prevents collisions when merging serializations |
| 3782 | + // - ptr: unique address per Arc within a process |
| 3783 | + // - pid: prevents collisions if serializer is shared across processes |
| 3784 | + let mut hasher = DefaultHasher::new(); |
| 3785 | + self.session_id.hash(&mut hasher); |
| 3786 | + (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); |
| 3787 | + std::process::id().hash(&mut hasher); |
| 3788 | + proto.expr_id = Some(hasher.finish()); |
| 3789 | + |
| 3790 | + Ok(proto) |
| 3791 | + } |
| 3792 | +} |
| 3793 | + |
| 3794 | +/// Internal deserializer that caches expressions by expr_id. |
| 3795 | +/// Created fresh for each deserialization operation. |
| 3796 | +#[derive(Default)] |
| 3797 | +struct DeduplicatingDeserializer { |
| 3798 | + /// Cache mapping expr_id to deserialized expressions. |
| 3799 | + cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>, |
| 3800 | +} |
| 3801 | + |
| 3802 | +impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { |
| 3803 | + fn proto_to_execution_plan( |
| 3804 | + &self, |
| 3805 | + ctx: &TaskContext, |
| 3806 | + codec: &dyn PhysicalExtensionCodec, |
| 3807 | + proto: &protobuf::PhysicalPlanNode, |
| 3808 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3809 | + proto.try_into_physical_plan_with_converter(ctx, codec, self) |
| 3810 | + } |
| 3811 | + |
| 3812 | + fn execution_plan_to_proto( |
| 3813 | + &self, |
| 3814 | + _plan: &Arc<dyn ExecutionPlan>, |
| 3815 | + _codec: &dyn PhysicalExtensionCodec, |
| 3816 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3817 | + where |
| 3818 | + Self: Sized, |
| 3819 | + { |
| 3820 | + internal_err!("DeduplicatingDeserializer cannot serialize execution plans") |
| 3821 | + } |
| 3822 | + |
| 3823 | + fn proto_to_physical_expr( |
| 3824 | + &self, |
| 3825 | + proto: &protobuf::PhysicalExprNode, |
| 3826 | + ctx: &TaskContext, |
| 3827 | + input_schema: &Schema, |
| 3828 | + codec: &dyn PhysicalExtensionCodec, |
| 3829 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3830 | + where |
| 3831 | + Self: Sized, |
| 3832 | + { |
| 3833 | + if let Some(expr_id) = proto.expr_id { |
| 3834 | + // Check cache first |
| 3835 | + if let Some(cached) = self.cache.borrow().get(&expr_id) { |
| 3836 | + return Ok(Arc::clone(cached)); |
| 3837 | + } |
| 3838 | + // Deserialize and cache |
| 3839 | + let expr = parse_physical_expr_with_converter( |
| 3840 | + proto, |
| 3841 | + ctx, |
| 3842 | + input_schema, |
| 3843 | + codec, |
| 3844 | + self, |
| 3845 | + )?; |
| 3846 | + self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); |
| 3847 | + Ok(expr) |
| 3848 | + } else { |
| 3849 | + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) |
| 3850 | + } |
| 3851 | + } |
| 3852 | + |
| 3853 | + fn physical_expr_to_proto( |
| 3854 | + &self, |
| 3855 | + _expr: &Arc<dyn PhysicalExpr>, |
| 3856 | + _codec: &dyn PhysicalExtensionCodec, |
| 3857 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3858 | + internal_err!("DeduplicatingDeserializer cannot serialize physical expressions") |
| 3859 | + } |
| 3860 | +} |
| 3861 | + |
| 3862 | +/// A proto converter that adds expression deduplication during serialization |
| 3863 | +/// and deserialization. |
| 3864 | +/// |
| 3865 | +/// During serialization, each expression's Arc pointer address is XORed with a |
| 3866 | +/// random session_id to create a salted `expr_id`. This prevents cross-process |
| 3867 | +/// collisions when serialized plans are merged. |
| 3868 | +/// |
| 3869 | +/// During deserialization, expressions with the same `expr_id` share the same |
| 3870 | +/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large |
| 3871 | +/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. |
| 3872 | +/// |
| 3873 | +/// This converter is stateless - it creates internal serializers/deserializers |
| 3874 | +/// on demand for each operation. |
| 3875 | +/// |
| 3876 | +/// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html |
| 3877 | +#[derive(Debug, Default, Clone, Copy)] |
| 3878 | +pub struct DeduplicatingProtoConverter {} |
| 3879 | + |
| 3880 | +impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { |
| 3881 | + fn proto_to_execution_plan( |
| 3882 | + &self, |
| 3883 | + ctx: &TaskContext, |
| 3884 | + codec: &dyn PhysicalExtensionCodec, |
| 3885 | + proto: &protobuf::PhysicalPlanNode, |
| 3886 | + ) -> Result<Arc<dyn ExecutionPlan>> { |
| 3887 | + let deserializer = DeduplicatingDeserializer::default(); |
| 3888 | + proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer) |
| 3889 | + } |
| 3890 | + |
| 3891 | + fn execution_plan_to_proto( |
| 3892 | + &self, |
| 3893 | + plan: &Arc<dyn ExecutionPlan>, |
| 3894 | + codec: &dyn PhysicalExtensionCodec, |
| 3895 | + ) -> Result<protobuf::PhysicalPlanNode> |
| 3896 | + where |
| 3897 | + Self: Sized, |
| 3898 | + { |
| 3899 | + let serializer = DeduplicatingSerializer::new(); |
| 3900 | + protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( |
| 3901 | + Arc::clone(plan), |
| 3902 | + codec, |
| 3903 | + &serializer, |
| 3904 | + ) |
| 3905 | + } |
| 3906 | + |
| 3907 | + fn proto_to_physical_expr( |
| 3908 | + &self, |
| 3909 | + proto: &protobuf::PhysicalExprNode, |
| 3910 | + ctx: &TaskContext, |
| 3911 | + input_schema: &Schema, |
| 3912 | + codec: &dyn PhysicalExtensionCodec, |
| 3913 | + ) -> Result<Arc<dyn PhysicalExpr>> |
| 3914 | + where |
| 3915 | + Self: Sized, |
| 3916 | + { |
| 3917 | + let deserializer = DeduplicatingDeserializer::default(); |
| 3918 | + deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec) |
| 3919 | + } |
| 3920 | + |
| 3921 | + fn physical_expr_to_proto( |
| 3922 | + &self, |
| 3923 | + expr: &Arc<dyn PhysicalExpr>, |
| 3924 | + codec: &dyn PhysicalExtensionCodec, |
| 3925 | + ) -> Result<protobuf::PhysicalExprNode> { |
| 3926 | + let serializer = DeduplicatingSerializer::new(); |
| 3927 | + serializer.physical_expr_to_proto(expr, codec) |
| 3928 | + } |
| 3929 | +} |
| 3930 | + |
3715 | 3931 | /// A PhysicalExtensionCodec that tries one of multiple inner codecs |
3716 | 3932 | /// until one works |
3717 | 3933 | #[derive(Debug)] |
|
0 commit comments