Skip to content

Commit 1a87e76

Browse files
adriangbclaude
andcommitted
proto: round-trip SortExec's TopK dynamic filter
Add optional PhysicalExprNode dynamic_filter field to SortExecNode. Emit from to_proto via SortExec::dynamic_filter(); on deserialize parse it and install via SortExec::with_dynamic_filter after the usual with_fetch / with_preserve_partitioning chain. The with_fetch step may auto-create a TopK filter when fetch is set; with_dynamic_filter then replaces it with the one from the sender so the id matches the pushed-down scan's copy (shared via the id cache). Add an end-to-end SQL test in core/tests/physical_optimizer/filter_pushdown.rs that writes two single-row parquet files (`a.parquet` with key=1, `b.parquet` with key=2), runs `ORDER BY n_nationkey ASC LIMIT 1` with `target_partitions=1`, round-trips via DeduplicatingProtoConverter, executes, and asserts the scan emitted exactly 1 row — b.parquet was pruned by row-group statistics after TopK saw a's row and tightened the shared dynamic filter. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d766b8d commit 1a87e76

5 files changed

Lines changed: 171 additions & 1 deletion

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3383,18 +3383,23 @@ fn test_filter_pushdown_through_sort_with_projection() {
33833383
mod proto_roundtrip {
33843384
use std::sync::Arc;
33853385

3386+
use arrow::array::{Int64Array, StringArray};
3387+
use arrow::datatypes::{DataType, Field, Schema};
3388+
use arrow::record_batch::RecordBatch;
33863389
use datafusion::datasource::source::DataSourceExec;
33873390
use datafusion::physical_plan::collect;
33883391
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
33893392
use datafusion_physical_plan::ExecutionPlan;
33903393
use datafusion_physical_plan::joins::HashJoinExec;
3394+
use datafusion_physical_plan::sorts::sort::SortExec;
33913395
use datafusion_proto::bytes::{
33923396
physical_plan_from_bytes_with_proto_converter,
33933397
physical_plan_to_bytes_with_proto_converter,
33943398
};
33953399
use datafusion_proto::physical_plan::{
33963400
DeduplicatingProtoConverter, DefaultPhysicalExtensionCodec,
33973401
};
3402+
use parquet::arrow::ArrowWriter;
33983403

33993404
/// End-to-end: a SQL hash join with a selective WHERE on the build side
34003405
/// produces a dynamic filter that's pushed into the probe-side
@@ -3465,6 +3470,104 @@ mod proto_roundtrip {
34653470
Ok(())
34663471
}
34673472

3473+
/// End-to-end: an `ORDER BY ... LIMIT 1` over two single-row parquet
3474+
/// files (`a.parquet` with key=1, `b.parquet` with key=2).
3475+
///
3476+
/// 1. `SortExec` with `TopK(fetch=1)` creates a dynamic filter pushed
3477+
/// into the underlying `ParquetSource`.
3478+
/// 2. After proto round-trip, the `SortExec`'s filter and the scan's
3479+
/// pushed predicate share the same inner state (cached by
3480+
/// `expression_id`).
3481+
/// 3. With `target_partitions=1`, both files are read sequentially. After
3482+
/// `a.parquet` is read, TopK's single best row has key=1 and the
3483+
/// filter tightens.
3484+
/// 4. When the scan opens `b.parquet` (min=max=2), row-group statistics
3485+
/// prune it — the scan never yields b's row.
3486+
///
3487+
/// Observable proof: `DataSourceExec::metrics().output_rows() == 1`.
3488+
/// Without the round-trip wiring, TopK would update a disconnected
3489+
/// filter and the scan would emit both rows.
3490+
#[tokio::test]
3491+
async fn topk_dynamic_filter_prunes_files_via_sql() -> datafusion_common::Result<()> {
3492+
let tmp = tempfile::TempDir::new()?;
3493+
let schema = Arc::new(Schema::new(vec![
3494+
Field::new("n_nationkey", DataType::Int64, false),
3495+
Field::new("n_name", DataType::Utf8, false),
3496+
]));
3497+
let write_file =
3498+
|name: &str, key: i64, value: &str| -> datafusion_common::Result<()> {
3499+
let path = tmp.path().join(name);
3500+
let batch = RecordBatch::try_new(
3501+
Arc::clone(&schema),
3502+
vec![
3503+
Arc::new(Int64Array::from(vec![key])),
3504+
Arc::new(StringArray::from(vec![value])),
3505+
],
3506+
)?;
3507+
let file = std::fs::File::create(&path)?;
3508+
let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
3509+
writer.write(&batch)?;
3510+
writer.close()?;
3511+
Ok(())
3512+
};
3513+
write_file("a.parquet", 1, "foo")?;
3514+
write_file("b.parquet", 2, "bar")?;
3515+
3516+
let config = SessionConfig::new()
3517+
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
3518+
.with_target_partitions(1);
3519+
let ctx = SessionContext::new_with_config(config);
3520+
ctx.register_parquet(
3521+
"t",
3522+
tmp.path().to_str().unwrap(),
3523+
ParquetReadOptions::default(),
3524+
)
3525+
.await?;
3526+
3527+
let sql = "SELECT n_name FROM t ORDER BY n_nationkey ASC LIMIT 1";
3528+
let plan = ctx.sql(sql).await?.create_physical_plan().await?;
3529+
3530+
let codec = DefaultPhysicalExtensionCodec {};
3531+
let proto_converter = DeduplicatingProtoConverter {};
3532+
let bytes = physical_plan_to_bytes_with_proto_converter(
3533+
Arc::clone(&plan),
3534+
&codec,
3535+
&proto_converter,
3536+
)?;
3537+
let result_plan = physical_plan_from_bytes_with_proto_converter(
3538+
bytes.as_ref(),
3539+
ctx.task_ctx().as_ref(),
3540+
&codec,
3541+
&proto_converter,
3542+
)?;
3543+
3544+
let sort =
3545+
find_sort(&result_plan).expect("deserialized plan should contain a SortExec");
3546+
assert!(
3547+
sort.dynamic_filter().is_some(),
3548+
"deserialized SortExec should carry a TopK dynamic filter"
3549+
);
3550+
3551+
let batches = collect(Arc::clone(&result_plan), ctx.task_ctx()).await?;
3552+
let total_out: usize = batches.iter().map(|b| b.num_rows()).sum();
3553+
assert_eq!(total_out, 1, "ORDER BY LIMIT 1 should emit one row");
3554+
3555+
let scan = find_data_source(&result_plan)
3556+
.expect("deserialized plan should contain a DataSourceExec");
3557+
let rows = scan
3558+
.metrics()
3559+
.and_then(|m| m.output_rows())
3560+
.expect("DataSourceExec should record output_rows");
3561+
assert_eq!(
3562+
rows, 1,
3563+
"scan emitted {rows} rows; expected exactly 1 (only a.parquet), \
3564+
because the TopK dynamic filter should prune b.parquet once TopK \
3565+
has seen a's row"
3566+
);
3567+
3568+
Ok(())
3569+
}
3570+
34683571
fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) -> Option<&HashJoinExec> {
34693572
if let Some(hj) = plan.downcast_ref::<HashJoinExec>() {
34703573
return Some(hj);
@@ -3477,6 +3580,18 @@ mod proto_roundtrip {
34773580
None
34783581
}
34793582

3583+
fn find_sort(plan: &Arc<dyn ExecutionPlan>) -> Option<&SortExec> {
3584+
if let Some(s) = plan.downcast_ref::<SortExec>() {
3585+
return Some(s);
3586+
}
3587+
for child in plan.children() {
3588+
if let Some(s) = find_sort(child) {
3589+
return Some(s);
3590+
}
3591+
}
3592+
None
3593+
}
3594+
34803595
fn find_data_source(plan: &Arc<dyn ExecutionPlan>) -> Option<&DataSourceExec> {
34813596
if let Some(dse) = plan.downcast_ref::<DataSourceExec>() {
34823597
return Some(dse);

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,11 @@ message SortExecNode {
13521352
// Maximum number of highest/lowest rows to fetch; negative means no limit
13531353
int64 fetch = 3;
13541354
bool preserve_partitioning = 4;
1355+
// TopK dynamic filter built by the sort and observed by pushed-down scans.
1356+
// Only present when `fetch` is set. Carried here so receiver's SortExec
1357+
// shares mutable state with the scan sites via the DeduplicatingProtoConverter's
1358+
// expr_id cache.
1359+
optional PhysicalExprNode dynamic_filter = 5;
13551360
}
13561361

13571362
message SortPreservingMergeExecNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1680,10 +1680,28 @@ impl protobuf::PhysicalPlanNode {
16801680
return internal_err!("SortExec requires an ordering");
16811681
};
16821682
let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1683-
let new_sort = SortExec::new(ordering, input)
1683+
let input_schema = input.schema();
1684+
let mut new_sort = SortExec::new(ordering, input)
16841685
.with_fetch(fetch)
16851686
.with_preserve_partitioning(sort.preserve_partitioning);
16861687

1688+
if let Some(proto_filter) = &sort.dynamic_filter {
1689+
let filter_expr = proto_converter.proto_to_physical_expr(
1690+
proto_filter,
1691+
ctx,
1692+
&input_schema,
1693+
codec,
1694+
)?;
1695+
let df = Arc::downcast::<DynamicFilterPhysicalExpr>(filter_expr).map_err(
1696+
|_| {
1697+
proto_error(
1698+
"SortExec.dynamic_filter was not a DynamicFilterPhysicalExpr",
1699+
)
1700+
},
1701+
)?;
1702+
new_sort = new_sort.with_dynamic_filter(df);
1703+
}
1704+
16871705
Ok(Arc::new(new_sort))
16881706
}
16891707

@@ -3125,6 +3143,13 @@ impl protobuf::PhysicalPlanNode {
31253143
})
31263144
})
31273145
.collect::<Result<Vec<_>>>()?;
3146+
let dynamic_filter = exec
3147+
.dynamic_filter()
3148+
.map(|df| {
3149+
let df_expr: Arc<dyn PhysicalExpr> = df as _;
3150+
proto_converter.physical_expr_to_proto(&df_expr, codec)
3151+
})
3152+
.transpose()?;
31283153
Ok(protobuf::PhysicalPlanNode {
31293154
physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
31303155
protobuf::SortExecNode {
@@ -3135,6 +3160,7 @@ impl protobuf::PhysicalPlanNode {
31353160
_ => -1,
31363161
},
31373162
preserve_partitioning: exec.preserve_partitioning(),
3163+
dynamic_filter,
31383164
},
31393165
))),
31403166
})

0 commit comments

Comments
 (0)