Skip to content

Commit 07cf68c

Browse files
adriangbclaude
andcommitted
proto: round-trip SortExec's TopK dynamic filter
Add optional PhysicalExprNode dynamic_filter field to SortExecNode. Emit from to_proto via SortExec::dynamic_filter(); on deserialize parse it and install via SortExec::with_dynamic_filter after the usual with_fetch / with_preserve_partitioning chain. The with_fetch step may auto-create a TopK filter when fetch is set; with_dynamic_filter then replaces it with the one from the sender so the id matches the pushed-down scan's copy (shared via the id cache). The accompanying SQL test uses an ORDER BY ... LIMIT query over a parquet file, round-trips via DeduplicatingProtoConverter, executes, and verifies: - SortExec and the scan's pushed predicate share the same expression_id - generation == 1 before execution (filter not yet updated) - generation > 1 after execution (TopK's update took effect) AND the scan-side wrapper observes the same generation — proving that updates propagate across the round-trip via shared `inner`. Scan row-count pruning isn't asserted because the single row-group parquet file decodes all rows before the filter is updated (a TopK scan-pruning optimization concern, orthogonal to proto correctness). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5af73db commit 07cf68c

5 files changed

Lines changed: 158 additions & 1 deletion

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,11 @@ message SortExecNode {
13521352
// Maximum number of highest/lowest rows to fetch; negative means no limit
13531353
int64 fetch = 3;
13541354
bool preserve_partitioning = 4;
1355+
// TopK dynamic filter built by the sort and observed by pushed-down scans.
1356+
// Only present when `fetch` is set. Carried here so receiver's SortExec
1357+
// shares mutable state with the scan sites via the DeduplicatingProtoConverter's
1358+
// expr_id cache.
1359+
optional PhysicalExprNode dynamic_filter = 5;
13551360
}
13561361

13571362
message SortPreservingMergeExecNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1680,10 +1680,28 @@ impl protobuf::PhysicalPlanNode {
16801680
return internal_err!("SortExec requires an ordering");
16811681
};
16821682
let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1683-
let new_sort = SortExec::new(ordering, input)
1683+
let input_schema = input.schema();
1684+
let mut new_sort = SortExec::new(ordering, input)
16841685
.with_fetch(fetch)
16851686
.with_preserve_partitioning(sort.preserve_partitioning);
16861687

1688+
if let Some(proto_filter) = &sort.dynamic_filter {
1689+
let filter_expr = proto_converter.proto_to_physical_expr(
1690+
proto_filter,
1691+
ctx,
1692+
&input_schema,
1693+
codec,
1694+
)?;
1695+
let df = Arc::downcast::<DynamicFilterPhysicalExpr>(filter_expr).map_err(
1696+
|_| {
1697+
proto_error(
1698+
"SortExec.dynamic_filter was not a DynamicFilterPhysicalExpr",
1699+
)
1700+
},
1701+
)?;
1702+
new_sort = new_sort.with_dynamic_filter(df);
1703+
}
1704+
16871705
Ok(Arc::new(new_sort))
16881706
}
16891707

@@ -3125,6 +3143,13 @@ impl protobuf::PhysicalPlanNode {
31253143
})
31263144
})
31273145
.collect::<Result<Vec<_>>>()?;
3146+
let dynamic_filter = exec
3147+
.dynamic_filter()
3148+
.map(|df| {
3149+
let df_expr: Arc<dyn PhysicalExpr> = df as _;
3150+
proto_converter.physical_expr_to_proto(&df_expr, codec)
3151+
})
3152+
.transpose()?;
31283153
Ok(protobuf::PhysicalPlanNode {
31293154
physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
31303155
protobuf::SortExecNode {
@@ -3135,6 +3160,7 @@ impl protobuf::PhysicalPlanNode {
31353160
_ => -1,
31363161
},
31373162
preserve_partitioning: exec.preserve_partitioning(),
3163+
dynamic_filter,
31383164
},
31393165
))),
31403166
})

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3178,6 +3178,18 @@ fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) -> Option<&HashJoinExec> {
31783178
None
31793179
}
31803180

3181+
fn find_sort(plan: &Arc<dyn ExecutionPlan>) -> Option<&SortExec> {
3182+
if let Some(s) = plan.downcast_ref::<SortExec>() {
3183+
return Some(s);
3184+
}
3185+
for child in plan.children() {
3186+
if let Some(s) = find_sort(child) {
3187+
return Some(s);
3188+
}
3189+
}
3190+
None
3191+
}
3192+
31813193
fn find_data_source(plan: &Arc<dyn ExecutionPlan>) -> Option<&DataSourceExec> {
31823194
if let Some(dse) = plan.downcast_ref::<DataSourceExec>() {
31833195
return Some(dse);
@@ -3190,6 +3202,96 @@ fn find_data_source(plan: &Arc<dyn ExecutionPlan>) -> Option<&DataSourceExec> {
31903202
None
31913203
}
31923204

3205+
/// End-to-end: a SQL `ORDER BY ... LIMIT` produces a TopK dynamic filter on
3206+
/// the `SortExec` that's pushed into the underlying `ParquetSource`. After
3207+
/// proto round-trip we verify:
3208+
/// 1. The `SortExec`'s dynamic filter survived (wired through the builder).
3209+
/// 2. The `ParquetSource`'s pushed predicate is the same shared expression.
3210+
/// 3. Executing the plan drives TopK's `update()`, and that update is
3211+
/// observable via the pushed predicate — proving inner-state sharing
3212+
/// across proto round-trip for the TopK path.
3213+
///
3214+
/// We don't assert scan row-count pruning here: with a single row-group
3215+
/// parquet file the DataSource has already decoded all rows by the time the
3216+
/// TopK updates its filter, so the scan metric isn't a reliable pruning
3217+
/// signal. The filter-update observation is the invariant this PR adds.
3218+
#[tokio::test]
3219+
async fn roundtrip_topk_dynamic_filter_survives_via_sql() -> Result<()> {
3220+
use datafusion::physical_expr::expressions::DynamicFilterPhysicalExpr;
3221+
use datafusion::physical_plan::collect;
3222+
use datafusion::prelude::SessionConfig;
3223+
use datafusion_datasource::file_scan_config::FileScanConfig;
3224+
3225+
let config = SessionConfig::new()
3226+
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
3227+
.with_batch_size(4);
3228+
let ctx = SessionContext::new_with_config(config);
3229+
let parquet_path = concat!(
3230+
env!("CARGO_MANIFEST_DIR"),
3231+
"/../core/tests/data/tpch_nation_small.parquet"
3232+
);
3233+
ctx.register_parquet("nation", parquet_path, ParquetReadOptions::default())
3234+
.await?;
3235+
3236+
let sql = "SELECT n_name FROM nation ORDER BY n_nationkey ASC LIMIT 3";
3237+
let plan = ctx.sql(sql).await?.create_physical_plan().await?;
3238+
3239+
let codec = DefaultPhysicalExtensionCodec {};
3240+
let proto_converter = DeduplicatingProtoConverter {};
3241+
let bytes = physical_plan_to_bytes_with_proto_converter(
3242+
Arc::clone(&plan),
3243+
&codec,
3244+
&proto_converter,
3245+
)?;
3246+
let result_plan = physical_plan_from_bytes_with_proto_converter(
3247+
bytes.as_ref(),
3248+
ctx.task_ctx().as_ref(),
3249+
&codec,
3250+
&proto_converter,
3251+
)?;
3252+
3253+
let sort =
3254+
find_sort(&result_plan).expect("deserialized plan should contain a SortExec");
3255+
let sort_filter = sort
3256+
.dynamic_filter()
3257+
.expect("deserialized SortExec should carry a TopK dynamic filter");
3258+
3259+
let scan = find_data_source(&result_plan)
3260+
.expect("deserialized plan should contain a DataSourceExec");
3261+
let scan_filter = scan
3262+
.data_source()
3263+
.downcast_ref::<FileScanConfig>()
3264+
.expect("DataSourceExec should have a FileScanConfig")
3265+
.file_source()
3266+
.filter()
3267+
.expect("ParquetSource should have a pushed-down dynamic filter");
3268+
let scan_df = scan_filter
3269+
.downcast_ref::<DynamicFilterPhysicalExpr>()
3270+
.expect("pushed filter should be a DynamicFilterPhysicalExpr");
3271+
3272+
// Identity is shared: same id on both sites.
3273+
assert_eq!(sort_filter.expression_id(), scan_df.expression_id());
3274+
3275+
// Before execution the filter hasn't been updated (generation == 1).
3276+
assert_eq!(sort_filter.snapshot_generation(), 1);
3277+
3278+
// Execute the deserialized plan.
3279+
let _batches = collect(Arc::clone(&result_plan), ctx.task_ctx()).await?;
3280+
3281+
// TopK updated its filter during execution; the scan-side wrapper sees
3282+
// the update because they share `inner`.
3283+
assert!(
3284+
sort_filter.snapshot_generation() > 1,
3285+
"SortExec's dynamic filter should have been updated during execution"
3286+
);
3287+
assert_eq!(
3288+
scan_df.snapshot_generation(),
3289+
sort_filter.snapshot_generation()
3290+
);
3291+
3292+
Ok(())
3293+
}
3294+
31933295
/// Tests that `lead` window function with offset and default value args
31943296
/// survives a protobuf round-trip. This is a regression test for a bug
31953297
/// where `expressions()` (used during serialization) returns only the

0 commit comments

Comments
 (0)