Skip to content

Commit d6c3879

Browse files
xudong963claude
andcommitted
refactor: skip RowFilter in DataFusion via per-run decoders (no arrow-rs changes)
Instead of adding a `with_fully_matched_row_groups` API to arrow-rs, implement the optimization entirely in DataFusion by creating separate ParquetPushDecoders for row groups that need filtering vs those that are fully matched. Key changes: - Split row groups into consecutive runs of same filter requirement via `split_decoder_runs()`, preserving original row group ordering for ordered scans. - Each filtered run gets its own RowFilter; fully-matched runs skip it. - Use VecDeque<ParquetPushDecoder> in PushDecoderStreamState to chain decoders sequentially. - Remove [patch.crates-io] arrow-rs fork dependency. This aligns with the direction of per-row-group morsels: each decoder run can naturally become a morsel when that infrastructure lands. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d54f53f commit d6c3879

4 files changed

Lines changed: 282 additions & 105 deletions

File tree

Cargo.lock

Lines changed: 36 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -284,22 +284,3 @@ debug = false
284284
debug-assertions = false
285285
strip = "debuginfo"
286286
incremental = false
287-
288-
[patch.crates-io]
289-
parquet = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
290-
arrow = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
291-
arrow-array = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
292-
arrow-buffer = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
293-
arrow-cast = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
294-
arrow-ipc = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
295-
arrow-ord = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
296-
arrow-schema = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
297-
arrow-string = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
298-
arrow-select = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
299-
arrow-data = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
300-
arrow-arith = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
301-
arrow-csv = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
302-
arrow-json = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
303-
arrow-avro = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
304-
arrow-flight = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }
305-
arrow-row = { git = "https://github.com/xudong963/arrow-rs.git", branch = "skip-filter-fully-matched-row-groups" }

datafusion/datasource-parquet/benches/parquet_fully_matched_filter.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,23 +118,40 @@ fn bench_fully_matched_filter(c: &mut Criterion) {
118118
});
119119

120120
// Scenario 2: with pushdown, WITH fully_matched optimization
121-
// (RowFilter skipped for all row groups)
121+
// Uses two decoders: an empty filtered decoder (all RGs are fully matched)
122+
// plus a second decoder without RowFilter for the fully matched RGs.
123+
// This mirrors how DataFusion splits decoders in the opener.
122124
group.bench_function("all_fully_matched/pushdown_with_skip", |b| {
123125
b.iter(|| {
124126
let rf = rebuild_row_filter(dataset, &file_metrics);
127+
// First decoder: with filter but no row groups (all are fully matched)
125128
let reader_metadata = ArrowReaderMetadata::try_new(
126129
Arc::clone(&dataset.metadata),
127130
Default::default(),
128131
)
129132
.unwrap();
130-
let decoder = ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
131-
.with_batch_size(8192)
132-
.with_row_filter(rf)
133-
.with_row_groups(all_rg_indices.clone())
134-
.with_fully_matched_row_groups(all_rg_indices.clone())
135-
.build()
136-
.unwrap();
137-
let rows = run_push_decoder(decoder, &dataset.file_bytes);
133+
let filtered_decoder =
134+
ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
135+
.with_batch_size(8192)
136+
.with_row_filter(rf)
137+
.with_row_groups(vec![]) // empty: no non-fully-matched RGs
138+
.build()
139+
.unwrap();
140+
// Second decoder: no filter for fully matched row groups
141+
let reader_metadata = ArrowReaderMetadata::try_new(
142+
Arc::clone(&dataset.metadata),
143+
Default::default(),
144+
)
145+
.unwrap();
146+
let matched_decoder =
147+
ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
148+
.with_batch_size(8192)
149+
.with_row_groups(all_rg_indices.clone())
150+
.build()
151+
.unwrap();
152+
// Run filtered decoder first (finishes immediately), then matched
153+
let rows = run_push_decoder(filtered_decoder, &dataset.file_bytes);
154+
let rows = rows + run_push_decoder(matched_decoder, &dataset.file_bytes);
138155
assert_eq!(rows, TOTAL_ROWS);
139156
});
140157
});

0 commit comments

Comments
 (0)