|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +//! End-to-end tests asserting that `datafusion-proto` round-trip preserves |
| 19 | +//! the identity + shared mutable state of `DynamicFilterPhysicalExpr` |
| 20 | +//! instances attached to operators that produce them (`HashJoinExec` and |
| 21 | +//! `SortExec`'s TopK). Running the deserialized plan drives updates on the |
| 22 | +//! shared filter, and the pushed predicate on the scan side prunes rows |
| 23 | +//! accordingly. |
| 24 | +//! |
| 25 | +//! These tests live in a standalone crate (`datafusion-tests`) because they |
| 26 | +//! need dev-dependencies on both `datafusion` and `datafusion-proto`; putting |
| 27 | +//! them in either of those crates' own `tests/` directory would close a |
| 28 | +//! dev-dependency cycle caught by the workspace's circular-dependency check. |
| 29 | +
|
| 30 | +use std::sync::Arc; |
| 31 | + |
| 32 | +use arrow::array::{Int64Array, StringArray}; |
| 33 | +use arrow::datatypes::{DataType, Field, Schema}; |
| 34 | +use arrow::record_batch::RecordBatch; |
| 35 | +use datafusion::datasource::source::DataSourceExec; |
| 36 | +use datafusion::physical_plan::collect; |
| 37 | +use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; |
| 38 | +use datafusion_physical_plan::ExecutionPlan; |
| 39 | +use datafusion_physical_plan::joins::HashJoinExec; |
| 40 | +use datafusion_physical_plan::sorts::sort::SortExec; |
| 41 | +use datafusion_proto::bytes::{ |
| 42 | + physical_plan_from_bytes_with_proto_converter, |
| 43 | + physical_plan_to_bytes_with_proto_converter, |
| 44 | +}; |
| 45 | +use datafusion_proto::physical_plan::{ |
| 46 | + DeduplicatingProtoConverter, DefaultPhysicalExtensionCodec, |
| 47 | +}; |
| 48 | +use parquet::arrow::ArrowWriter; |
| 49 | + |
| 50 | +/// End-to-end: a SQL hash join with a selective WHERE on the build side |
| 51 | +/// produces a dynamic filter that's pushed into the probe-side |
| 52 | +/// `ParquetSource`. After proto round-trip, the `HashJoinExec`'s dynamic |
| 53 | +/// filter Arc and the pushed predicate still share mutable state, so |
| 54 | +/// build-side `update()` during execution is visible to the scan and the |
| 55 | +/// scan actually prunes rows. |
| 56 | +#[tokio::test] |
| 57 | +async fn hash_join_dynamic_filter_prunes_via_sql() -> datafusion_common::Result<()> { |
| 58 | + let config = SessionConfig::new() |
| 59 | + .set_bool("datafusion.execution.parquet.pushdown_filters", true); |
| 60 | + let ctx = SessionContext::new_with_config(config); |
| 61 | + let parquet_path = concat!( |
| 62 | + env!("CARGO_MANIFEST_DIR"), |
| 63 | + "/../core/tests/data/tpch_nation_small.parquet" |
| 64 | + ); |
| 65 | + ctx.register_parquet("build", parquet_path, ParquetReadOptions::default()) |
| 66 | + .await?; |
| 67 | + ctx.register_parquet("probe", parquet_path, ParquetReadOptions::default()) |
| 68 | + .await?; |
| 69 | + |
| 70 | + // Self-join with a selective WHERE on the build side. The build side |
| 71 | + // reduces to a single row; the dynamic filter derived from it is pushed |
| 72 | + // into the probe-side scan. |
| 73 | + let sql = "SELECT p.n_name FROM probe p \ |
| 74 | + INNER JOIN build b ON p.n_nationkey = b.n_nationkey \ |
| 75 | + WHERE b.n_nationkey = 5"; |
| 76 | + let plan = ctx.sql(sql).await?.create_physical_plan().await?; |
| 77 | + |
| 78 | + let codec = DefaultPhysicalExtensionCodec {}; |
| 79 | + let proto_converter = DeduplicatingProtoConverter {}; |
| 80 | + let bytes = physical_plan_to_bytes_with_proto_converter( |
| 81 | + Arc::clone(&plan), |
| 82 | + &codec, |
| 83 | + &proto_converter, |
| 84 | + )?; |
| 85 | + let result_plan = physical_plan_from_bytes_with_proto_converter( |
| 86 | + bytes.as_ref(), |
| 87 | + ctx.task_ctx().as_ref(), |
| 88 | + &codec, |
| 89 | + &proto_converter, |
| 90 | + )?; |
| 91 | + |
| 92 | + let hj = find_hash_join(&result_plan) |
| 93 | + .expect("deserialized plan should contain a HashJoinExec"); |
| 94 | + assert!( |
| 95 | + hj.dynamic_filter().is_some(), |
| 96 | + "deserialized HashJoinExec should carry a dynamic filter" |
| 97 | + ); |
| 98 | + |
| 99 | + let _batches = collect(Arc::clone(&result_plan), ctx.task_ctx()).await?; |
| 100 | + |
| 101 | + // The probe side is HashJoinExec.right; walk down to its DataSourceExec. |
| 102 | + let probe_scan = find_data_source(hj.children()[1]) |
| 103 | + .expect("probe-side should terminate in a DataSourceExec"); |
| 104 | + let rows = probe_scan |
| 105 | + .metrics() |
| 106 | + .and_then(|m| m.output_rows()) |
| 107 | + .expect("DataSourceExec should record output_rows"); |
| 108 | + |
| 109 | + // Full probe table is 20 rows; the dynamic filter must prune below that. |
| 110 | + assert!( |
| 111 | + rows < 20, |
| 112 | + "probe-side scan emitted {rows} rows; dynamic filter did not prune \ |
| 113 | + (full table is 20 rows)" |
| 114 | + ); |
| 115 | + |
| 116 | + Ok(()) |
| 117 | +} |
| 118 | + |
| 119 | +/// End-to-end: an `ORDER BY ... LIMIT 1` over two single-row parquet files |
| 120 | +/// (`a.parquet` with key=1, `b.parquet` with key=2). |
| 121 | +/// |
| 122 | +/// 1. `SortExec` with `TopK(fetch=1)` creates a dynamic filter pushed into |
| 123 | +/// the underlying `ParquetSource`. |
| 124 | +/// 2. After proto round-trip, the `SortExec`'s filter and the scan's pushed |
| 125 | +/// predicate share the same inner state (cached by `expression_id`). |
| 126 | +/// 3. With `target_partitions=1`, both files are read sequentially. After |
| 127 | +/// `a.parquet` is read, TopK's single best row has key=1 and the filter |
| 128 | +/// tightens. |
| 129 | +/// 4. When the scan opens `b.parquet` (min=max=2), row-group statistics |
| 130 | +/// prune it — the scan never yields b's row. |
| 131 | +/// |
| 132 | +/// Observable proof: `DataSourceExec::metrics().output_rows() == 1`. Without |
| 133 | +/// the round-trip wiring, TopK would update a disconnected filter and the |
| 134 | +/// scan would emit both rows. |
| 135 | +#[tokio::test] |
| 136 | +async fn topk_dynamic_filter_prunes_files_via_sql() -> datafusion_common::Result<()> { |
| 137 | + let tmp = tempfile::TempDir::new()?; |
| 138 | + let schema = Arc::new(Schema::new(vec![ |
| 139 | + Field::new("n_nationkey", DataType::Int64, false), |
| 140 | + Field::new("n_name", DataType::Utf8, false), |
| 141 | + ])); |
| 142 | + let write_file = |
| 143 | + |name: &str, key: i64, value: &str| -> datafusion_common::Result<()> { |
| 144 | + let path = tmp.path().join(name); |
| 145 | + let batch = RecordBatch::try_new( |
| 146 | + Arc::clone(&schema), |
| 147 | + vec![ |
| 148 | + Arc::new(Int64Array::from(vec![key])), |
| 149 | + Arc::new(StringArray::from(vec![value])), |
| 150 | + ], |
| 151 | + )?; |
| 152 | + let file = std::fs::File::create(&path)?; |
| 153 | + let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?; |
| 154 | + writer.write(&batch)?; |
| 155 | + writer.close()?; |
| 156 | + Ok(()) |
| 157 | + }; |
| 158 | + write_file("a.parquet", 1, "foo")?; |
| 159 | + write_file("b.parquet", 2, "bar")?; |
| 160 | + |
| 161 | + let config = SessionConfig::new() |
| 162 | + .set_bool("datafusion.execution.parquet.pushdown_filters", true) |
| 163 | + .with_target_partitions(1); |
| 164 | + let ctx = SessionContext::new_with_config(config); |
| 165 | + ctx.register_parquet( |
| 166 | + "t", |
| 167 | + tmp.path().to_str().unwrap(), |
| 168 | + ParquetReadOptions::default(), |
| 169 | + ) |
| 170 | + .await?; |
| 171 | + |
| 172 | + let sql = "SELECT n_name FROM t ORDER BY n_nationkey ASC LIMIT 1"; |
| 173 | + let plan = ctx.sql(sql).await?.create_physical_plan().await?; |
| 174 | + |
| 175 | + let codec = DefaultPhysicalExtensionCodec {}; |
| 176 | + let proto_converter = DeduplicatingProtoConverter {}; |
| 177 | + let bytes = physical_plan_to_bytes_with_proto_converter( |
| 178 | + Arc::clone(&plan), |
| 179 | + &codec, |
| 180 | + &proto_converter, |
| 181 | + )?; |
| 182 | + let result_plan = physical_plan_from_bytes_with_proto_converter( |
| 183 | + bytes.as_ref(), |
| 184 | + ctx.task_ctx().as_ref(), |
| 185 | + &codec, |
| 186 | + &proto_converter, |
| 187 | + )?; |
| 188 | + |
| 189 | + let sort = |
| 190 | + find_sort(&result_plan).expect("deserialized plan should contain a SortExec"); |
| 191 | + assert!( |
| 192 | + sort.dynamic_filter().is_some(), |
| 193 | + "deserialized SortExec should carry a TopK dynamic filter" |
| 194 | + ); |
| 195 | + |
| 196 | + let batches = collect(Arc::clone(&result_plan), ctx.task_ctx()).await?; |
| 197 | + let total_out: usize = batches.iter().map(|b| b.num_rows()).sum(); |
| 198 | + assert_eq!(total_out, 1, "ORDER BY LIMIT 1 should emit one row"); |
| 199 | + |
| 200 | + let scan = find_data_source(&result_plan) |
| 201 | + .expect("deserialized plan should contain a DataSourceExec"); |
| 202 | + let rows = scan |
| 203 | + .metrics() |
| 204 | + .and_then(|m| m.output_rows()) |
| 205 | + .expect("DataSourceExec should record output_rows"); |
| 206 | + assert_eq!( |
| 207 | + rows, 1, |
| 208 | + "scan emitted {rows} rows; expected exactly 1 (only a.parquet), \ |
| 209 | + because the TopK dynamic filter should prune b.parquet once TopK \ |
| 210 | + has seen a's row" |
| 211 | + ); |
| 212 | + |
| 213 | + Ok(()) |
| 214 | +} |
| 215 | + |
| 216 | +fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) -> Option<&HashJoinExec> { |
| 217 | + if let Some(hj) = plan.downcast_ref::<HashJoinExec>() { |
| 218 | + return Some(hj); |
| 219 | + } |
| 220 | + for child in plan.children() { |
| 221 | + if let Some(hj) = find_hash_join(child) { |
| 222 | + return Some(hj); |
| 223 | + } |
| 224 | + } |
| 225 | + None |
| 226 | +} |
| 227 | + |
| 228 | +fn find_sort(plan: &Arc<dyn ExecutionPlan>) -> Option<&SortExec> { |
| 229 | + if let Some(s) = plan.downcast_ref::<SortExec>() { |
| 230 | + return Some(s); |
| 231 | + } |
| 232 | + for child in plan.children() { |
| 233 | + if let Some(s) = find_sort(child) { |
| 234 | + return Some(s); |
| 235 | + } |
| 236 | + } |
| 237 | + None |
| 238 | +} |
| 239 | + |
| 240 | +fn find_data_source(plan: &Arc<dyn ExecutionPlan>) -> Option<&DataSourceExec> { |
| 241 | + if let Some(dse) = plan.downcast_ref::<DataSourceExec>() { |
| 242 | + return Some(dse); |
| 243 | + } |
| 244 | + for child in plan.children() { |
| 245 | + if let Some(dse) = find_data_source(child) { |
| 246 | + return Some(dse); |
| 247 | + } |
| 248 | + } |
| 249 | + None |
| 250 | +} |
0 commit comments