Skip to content

Commit 9a1c5e2

Browse files
committed
Add conversion from physical expr to proto in converter. Doing some renaming
1 parent 3df1728 commit 9a1c5e2

7 files changed

Lines changed: 736 additions & 726 deletions

File tree

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ use std::sync::Arc;
3737
use arrow::array::record_batch;
3838
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3939
use datafusion::assert_batches_eq;
40-
use datafusion::common::Result;
41-
use datafusion::common::not_impl_err;
40+
use datafusion::common::{Result, not_impl_err};
4241
use datafusion::datasource::listing::{
4342
ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl,
4443
};
@@ -55,15 +54,17 @@ use datafusion_physical_expr_adapter::{
5554
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
5655
};
5756
use datafusion_proto::bytes::{
58-
physical_plan_from_bytes_with_extension_codec,
59-
physical_plan_to_bytes_with_extension_codec,
57+
physical_plan_from_bytes_with_proto_converter,
58+
physical_plan_to_bytes_with_proto_converter,
6059
};
60+
use datafusion_proto::physical_plan::from_proto::proto_to_physical_expr;
61+
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
6162
use datafusion_proto::physical_plan::{
62-
AsExecutionPlan, PhysicalExtensionCodec, PhysicalExtensionProtoCodec,
63+
AsExecutionPlan, PhysicalExtensionCodec, PhysicalProtoConverterExtension,
6364
};
65+
use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType;
6466
use datafusion_proto::protobuf::{
6567
PhysicalExprNode, PhysicalExtensionNode, PhysicalPlanNode,
66-
physical_plan_node::PhysicalPlanType,
6768
};
6869
use object_store::memory::InMemory;
6970
use object_store::path::Path;
@@ -126,7 +127,7 @@ pub async fn adapter_serialization() -> Result<()> {
126127
// Step 4: Serialize with our custom codec
127128
println!("\nStep 4: Serializing plan with AdapterPreservingCodec...");
128129
let codec = AdapterPreservingCodec;
129-
let bytes = physical_plan_to_bytes_with_extension_codec(
130+
let bytes = physical_plan_to_bytes_with_proto_converter(
130131
Arc::clone(&original_plan),
131132
&codec,
132133
&codec,
@@ -138,7 +139,7 @@ pub async fn adapter_serialization() -> Result<()> {
138139
println!("\nStep 5: Deserializing plan with AdapterPreservingCodec...");
139140
let task_ctx = ctx.task_ctx();
140141
let restored_plan =
141-
physical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx, &codec, &codec)?;
142+
physical_plan_from_bytes_with_proto_converter(&bytes, &task_ctx, &codec, &codec)?;
142143

143144
// Verify adapter is restored
144145
let has_adapter_after = verify_adapter_in_plan(&restored_plan, "restored");
@@ -314,10 +315,10 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
314315
}
315316
}
316317

317-
impl PhysicalExtensionProtoCodec for AdapterPreservingCodec {
318+
impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
318319
fn execution_plan_to_proto(
319320
&self,
320-
plan: Arc<dyn ExecutionPlan>,
321+
plan: &Arc<dyn ExecutionPlan>,
321322
extension_codec: &dyn PhysicalExtensionCodec,
322323
) -> Result<PhysicalPlanNode> {
323324
// Check if this is a DataSourceExec with adapter
@@ -378,7 +379,7 @@ impl PhysicalExtensionProtoCodec for AdapterPreservingCodec {
378379
}
379380

380381
// No adapter found - use default serialization
381-
PhysicalPlanNode::try_from_physical_plan(plan, extension_codec, self)
382+
PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), extension_codec, self)
382383
}
383384

384385
// Interception point: override deserialization to unwrap adapters
@@ -424,12 +425,20 @@ impl PhysicalExtensionProtoCodec for AdapterPreservingCodec {
424425

425426
fn proto_to_physical_expr(
426427
&self,
427-
_proto: &PhysicalExprNode,
428-
_ctx: &TaskContext,
429-
_input_schema: &Schema,
430-
_codec: &dyn PhysicalExtensionCodec,
428+
proto: &PhysicalExprNode,
429+
ctx: &TaskContext,
430+
input_schema: &Schema,
431+
codec: &dyn PhysicalExtensionCodec,
431432
) -> Result<Arc<dyn PhysicalExpr>> {
432-
todo!()
433+
proto_to_physical_expr(proto, ctx, input_schema, codec, self)
434+
}
435+
436+
fn physical_expr_to_proto(
437+
&self,
438+
expr: &Arc<dyn PhysicalExpr>,
439+
codec: &dyn PhysicalExtensionCodec,
440+
) -> Result<PhysicalExprNode> {
441+
serialize_physical_expr(expr, codec, self)
433442
}
434443
}
435444

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ use datafusion::physical_plan::filter::FilterExec;
4747
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
4848
use datafusion::prelude::SessionContext;
4949
use datafusion_proto::physical_plan::from_proto::proto_to_physical_expr;
50+
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
5051
use datafusion_proto::physical_plan::{
5152
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
52-
PhysicalExtensionProtoCodec,
53+
PhysicalProtoConverterExtension,
5354
};
5455
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
5556
use prost::Message;
@@ -103,13 +104,7 @@ pub async fn expression_deduplication() -> Result<()> {
103104

104105
let extension_codec = DefaultPhysicalExtensionCodec {};
105106
let caching_codec = CachingCodec::new();
106-
// let proto = PhysicalPlanNode::try_from_physical_plan(
107-
// filter_plan.clone(),
108-
// &extension_codec,
109-
// &caching_codec,
110-
// )?;
111-
let proto =
112-
caching_codec.execution_plan_to_proto(filter_plan.clone(), &extension_codec)?;
107+
let proto = caching_codec.execution_plan_to_proto(&filter_plan, &extension_codec)?;
113108

114109
// Serialize to bytes
115110
let mut bytes = Vec::new();
@@ -205,7 +200,7 @@ impl PhysicalExtensionCodec for CachingCodec {
205200
}
206201
}
207202

208-
impl PhysicalExtensionProtoCodec for CachingCodec {
203+
impl PhysicalProtoConverterExtension for CachingCodec {
209204
fn proto_to_execution_plan(
210205
&self,
211206
_ctx: &TaskContext,
@@ -217,10 +212,10 @@ impl PhysicalExtensionProtoCodec for CachingCodec {
217212

218213
fn execution_plan_to_proto(
219214
&self,
220-
plan: Arc<dyn ExecutionPlan>,
215+
plan: &Arc<dyn ExecutionPlan>,
221216
extension_codec: &dyn PhysicalExtensionCodec,
222217
) -> Result<PhysicalPlanNode> {
223-
PhysicalPlanNode::try_from_physical_plan(plan, extension_codec, self)
218+
PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), extension_codec, self)
224219
}
225220

226221
// CACHING IMPLEMENTATION: Intercept expression deserialization
@@ -263,4 +258,12 @@ impl PhysicalExtensionProtoCodec for CachingCodec {
263258

264259
Ok(expr)
265260
}
261+
262+
fn physical_expr_to_proto(
263+
&self,
264+
expr: &Arc<dyn PhysicalExpr>,
265+
codec: &dyn PhysicalExtensionCodec,
266+
) -> Result<PhysicalExprNode> {
267+
serialize_physical_expr(expr, codec, self)
268+
}
266269
}

datafusion/proto/src/bytes/mod.rs

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::logical_plan::{
2222
};
2323
use crate::physical_plan::{
2424
AsExecutionPlan, DefaultPhysicalExtensionCodec, DefaultPhysicalExtensionProtoCodec,
25-
PhysicalExtensionCodec, PhysicalExtensionProtoCodec,
25+
PhysicalExtensionCodec, PhysicalProtoConverterExtension,
2626
};
2727
use crate::protobuf;
2828
use datafusion_common::{Result, plan_datafusion_err};
@@ -90,8 +90,8 @@ pub trait Serializeable: Sized {
9090
impl Serializeable for Expr {
9191
fn to_bytes(&self) -> Result<Bytes> {
9292
let mut buffer = BytesMut::new();
93-
let extension_codec = DefaultLogicalExtensionCodec {};
94-
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
93+
let codec = DefaultLogicalExtensionCodec {};
94+
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &codec)
9595
.map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
9696

9797
protobuf
@@ -192,8 +192,8 @@ impl Serializeable for Expr {
192192
let protobuf = protobuf::LogicalExprNode::decode(bytes)
193193
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
194194

195-
let extension_codec = DefaultLogicalExtensionCodec {};
196-
logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
195+
let codec = DefaultLogicalExtensionCodec {};
196+
logical_plan::from_proto::parse_expr(&protobuf, registry, &codec)
197197
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
198198
}
199199
}
@@ -277,17 +277,20 @@ pub fn logical_plan_from_json_with_extension_codec(
277277
/// Serialize a PhysicalPlan as bytes
278278
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
279279
let extension_codec = DefaultPhysicalExtensionCodec {};
280-
let proto_codec = DefaultPhysicalExtensionProtoCodec {};
281-
physical_plan_to_bytes_with_extension_codec(plan, &extension_codec, &proto_codec)
280+
physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
282281
}
283282

284283
/// Serialize a PhysicalPlan as JSON
285284
#[cfg(feature = "json")]
286285
pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
287286
let extension_codec = DefaultPhysicalExtensionCodec {};
288-
let protobuf =
289-
protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
290-
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
287+
let proto_converter = DefaultPhysicalExtensionProtoCodec {};
288+
let protobuf = protobuf::PhysicalPlanNode::try_from_physical_plan(
289+
plan,
290+
&extension_codec,
291+
&proto_converter,
292+
)
293+
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
291294
serde_json::to_string(&protobuf)
292295
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
293296
}
@@ -296,9 +299,19 @@ pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
296299
pub fn physical_plan_to_bytes_with_extension_codec(
297300
plan: Arc<dyn ExecutionPlan>,
298301
extension_codec: &dyn PhysicalExtensionCodec,
299-
proto_codec: &dyn PhysicalExtensionProtoCodec,
300302
) -> Result<Bytes> {
301-
let protobuf = proto_codec.execution_plan_to_proto(plan, extension_codec)?;
303+
let proto_conveter = DefaultPhysicalExtensionProtoCodec {};
304+
physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_conveter)
305+
}
306+
307+
/// Serialize a PhysicalPlan as bytes, using the provided extension codec
308+
/// and protobuf converter.
309+
pub fn physical_plan_to_bytes_with_proto_converter(
310+
plan: Arc<dyn ExecutionPlan>,
311+
extension_codec: &dyn PhysicalExtensionCodec,
312+
proto_converter: &dyn PhysicalProtoConverterExtension,
313+
) -> Result<Bytes> {
314+
let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
302315
let mut buffer = BytesMut::new();
303316
protobuf
304317
.encode(&mut buffer)
@@ -315,7 +328,8 @@ pub fn physical_plan_from_json(
315328
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
316329
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
317330
let extension_codec = DefaultPhysicalExtensionCodec {};
318-
back.try_into_physical_plan(ctx, &extension_codec)
331+
let proto_converter = DefaultPhysicalExtensionProtoCodec {};
332+
back.try_into_physical_plan(ctx, &extension_codec, &proto_converter)
319333
}
320334

321335
/// Deserialize a PhysicalPlan from bytes
@@ -324,12 +338,12 @@ pub fn physical_plan_from_bytes(
324338
ctx: &TaskContext,
325339
) -> Result<Arc<dyn ExecutionPlan>> {
326340
let extension_codec = DefaultPhysicalExtensionCodec {};
327-
let proto_codec = DefaultPhysicalExtensionProtoCodec {};
328-
physical_plan_from_bytes_with_extension_codec(
341+
let proto_converter = DefaultPhysicalExtensionProtoCodec {};
342+
physical_plan_from_bytes_with_proto_converter(
329343
bytes,
330344
ctx,
331345
&extension_codec,
332-
&proto_codec,
346+
&proto_converter,
333347
)
334348
}
335349

@@ -338,9 +352,24 @@ pub fn physical_plan_from_bytes_with_extension_codec(
338352
bytes: &[u8],
339353
ctx: &TaskContext,
340354
extension_codec: &dyn PhysicalExtensionCodec,
341-
proto_codec: &dyn PhysicalExtensionProtoCodec,
355+
) -> Result<Arc<dyn ExecutionPlan>> {
356+
let proto_converter = DefaultPhysicalExtensionProtoCodec {};
357+
physical_plan_from_bytes_with_proto_converter(
358+
bytes,
359+
ctx,
360+
extension_codec,
361+
&proto_converter,
362+
)
363+
}
364+
365+
/// Deserialize a PhysicalPlan from bytes
366+
pub fn physical_plan_from_bytes_with_proto_converter(
367+
bytes: &[u8],
368+
ctx: &TaskContext,
369+
extension_codec: &dyn PhysicalExtensionCodec,
370+
proto_converter: &dyn PhysicalProtoConverterExtension,
342371
) -> Result<Arc<dyn ExecutionPlan>> {
343372
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
344373
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
345-
protobuf.try_into_physical_plan(ctx, extension_codec, proto_codec)
374+
protobuf.try_into_physical_plan(ctx, extension_codec, proto_converter)
346375
}

0 commit comments

Comments
 (0)