perf: Skip RowFilter when all predicate columns are in the projection#20417
perf: Skip RowFilter when all predicate columns are in the projection#20417darmie wants to merge 8 commits intoapache:mainfrom
Conversation
When all predicate columns are in the output projection, late materialization provides no I/O benefit. Replace the expensive RowFilter path with a lightweight batch-level filter to avoid CachedArrayReader/ReadPlanBuilder/try_next_batch overhead.
Add a dedicated test verifying that when all predicate columns are in the output projection, the opener skips RowFilter and applies a batch filter instead — and that both the batch filter and RowFilter paths produce correct results. Simplify the 4-way stream branching into two independent steps: first apply the empty-batch filter, then optionally wrap with EarlyStoppingStream.
Skip dynamic filter expressions (TopK, join pushdown) when deciding whether a predicate is single-conjunct. This preserves the batch filter optimization for queries like Q25 (WHERE col <> '' ORDER BY col LIMIT N) where TopK adds runtime conjuncts, while still routing multi-conjunct static predicates through RowFilter for incremental evaluation.
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20417 (comment)).
|
Change is_subset to strict equality for predicate vs projection column indices. When there are non-predicate projection columns (e.g. SELECT * WHERE col = X), RowFilter provides significant value by skipping their decode for non-matching rows. Only skip RowFilter when every projected column is a predicate column. Also exclude dynamic filter expressions (TopK, join pushdown) when counting conjuncts, so runtime-generated filters don't prevent the batch filter optimization for single static predicates.
|
run benchmark clickbench_partitioned |
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20417 (comment)).
|
|
@alamb looks like the runner isn't working |
|
run benchmark clickbench_partitioned |
|
🤖 Hi @darmie, thanks for the request (#20417 (comment)). |
|
I'll run the benchmark locally ( |
There was a problem hiding this comment.
Pull request overview
This PR optimizes Parquet filter pushdown by skipping the RowFilter (late materialization) path when it provides no I/O benefit, addressing performance regressions identified in ClickBench queries.
Purpose:
The optimization recognizes that when all predicate columns must be decoded for the output projection anyway (and there's at most one static conjunct), the RowFilter machinery adds CPU overhead without providing I/O savings. In these cases, applying the predicate as a post-decode batch filter is more efficient.
Changes:
- Added logic to detect when predicate columns exactly match projection columns with ≤1 static conjunct
- Implemented batch-level filtering as an alternative to RowFilter in these cases
- Added empty batch filtering to remove batches with no rows after filtering
- Comprehensive test coverage for various predicate/projection combinations including multi-conjunct predicates and dynamic filters
Reviewed changes
Copilot reviewed 1 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/opener.rs | Core optimization logic to skip RowFilter when predicate columns match projection columns, apply batch-level filtering, and filter empty batches. Includes comprehensive test suite (Cases 1-6) validating different predicate/projection scenarios. |
| .gitignore | Added profiling-artifacts/ directory to ignore profiling outputs |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
It reports some bigger slowdowns 🤔 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
🤖 |
Looks like it is mostly better. Incomparable looks like it runs OOM though, does it currently maybe skip some filters @darmie ? |
I'll investigate |
When a conjunct references columns not in the output projection (e.g. COUNT(*) WHERE col = X), it cannot be evaluated as a batch filter because those columns are absent from the output schema. Keep such conjuncts in the RowFilter to avoid schema errors.
|
🤖: Benchmark completed Details
|
|
@Dandandan The Q1 and Q20 problem was that: when a query like I just pushed a fix: If the filter references columns that aren't in the output projection, it must stay in the RowFilter. Let's run the bench again and see |
|
run benchmark clickbench_partitioned |
|
🤖 |
| let has_extra_cols = projection_col_indices | ||
| .iter() | ||
| .any(|idx| !conjunct_cols.contains(idx)); | ||
| // 2. The conjunct references columns NOT in the output |
There was a problem hiding this comment.
Hmm but won't it be better than to fix the reader schema to add the demoted columns?
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 5 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
🤖: Benchmark completed Details
|
Seems quite beneficial, no regressions (I'll kick it off once more) |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Would this still be beneficial for highly selective predicates in LM? Even if all predicate columns are in projection, would we not win I/O by incrementally reducing data fetch per each |
For the demoted case (single conjunct, filter cols = projection cols), RowFilter has to decode the same columns to evaluate the filter, there are no extra columns whose decode it can skip for non-matching rows. The savings are zero but the RowFilter machinery still adds overhead, so batch filter wins. For multi-conjunct predicates on different columns (e.g. The one scenario where a single conjunct could still benefit despite covering all projection columns is page-index pruning , which is skipping entire pages. That's captured by |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Nice, thanks for the explanation @darmie! |
|
To be honest, these results are pretty compelling: However, I am surprised that Q23 doesn't get substntially faster ( in this case the dynamic filter from the TopK should be pushed down |
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); |
There was a problem hiding this comment.
I feel like this test should also be asserting something about the predicates more directly (this is asserting the number of rows that comes out, rather than the fact that filter is pushed down)
| // Filter pushdown: evaluate predicates during scan | ||
| if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { | ||
| let row_filter = row_filter::build_row_filter( | ||
| // Filter pushdown: evaluate predicates during scan. |
There was a problem hiding this comment.
If we are deciding what filters to push down based on projection and filter columns, is the ParquetOpener the right place? I wonder if we should move the determiniation earlier (like maybe don't bother to try and push down filters at all ?)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> N/A ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Some PRs are being omitted from stale check because they were in a cache, and the workflow appears to not have permission to delete cache so they are forever stuck as unprocessed. For example in this run: https://github.com/apache/datafusion/actions/runs/24756695077/job/72431314533 Seeing this in logs: ``` [apache#20473] issue skipped due being processed during the previous run [apache#20460] pull request skipped due being processed during the previous run [apache#20448] issue skipped due being processed during the previous run [apache#20443] issue skipped due being processed during the previous run [apache#20435] issue skipped due being processed during the previous run [apache#20418] issue skipped due being processed during the previous run [apache#20417] pull request skipped due being processed during the previous run [apache#20416] pull request skipped due being processed during the previous run [apache#20403] pull request skipped due being processed during the previous run ``` And at the end we see this warning: ``` Warning: Error delete _state: [403] Resource not accessible by integration - https://docs.github.com/rest/actions/cache#delete-github-actions-caches-for-a-repository-using-a-cache-key ``` stale workflow uses a cache in case it hits the `operations-per-run` limit meant to prevent API rate limiting (we have default of 30), so it seems we previously hit this limit and some issues/PRs were cached, and have never been uncached since so are never processed again. See: https://github.com/actions/stale#operations-per-run ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Give permission to stale workflow to run github actions (like delete cache). See recommended permissions: https://github.com/actions/stale#recommended-permissions ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
Rationale for this change
When
pushdown_filters = trueand all predicate columns are already in the output projection, the arrow-rsRowFilter(late materialization) machinery provides zero I/O benefit — those columns must be decoded for the projection anyway. Yet the RowFilter adds substantial CPU overhead fromCachedArrayReader,ReadPlanBuilder::with_predicate, andParquetDecoderState::try_next_batch(~1100 extra CPU samples on Q10 flamegraph). This causes regressions on 15 of the 43 ClickBench queries.See profiling details.
What changes are included in this PR?
In
opener.rs, before callingbuild_row_filter(), check whether all predicate column indices are a subset of the projection column indices. If so:build_row_filter()entirely (no RowFilter overhead)batch_filter()If not a subset (i.e., there are non-projected columns that could be skipped), proceed with the RowFilter path as before.
ClickBench results on key regression queries (pushdown ON, fix vs baseline):
Are these changes tested?
Yes. Added
test_skip_row_filter_when_filter_cols_subset_of_projectionwhich validates:All existing tests pass (81 tests in
datafusion-datasource-parquet).Are there any user-facing changes?
No. Behavior is identical — queries return the same results. Performance improves for queries where filter columns overlap with projection columns when
pushdown_filters = true.