[SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation#55252
[SPARK-56385][SQL] Track pushed filter expressions on DataSourceV2ScanRelation#55252yyanyy wants to merge 2 commits intoapache:masterfrom
Conversation
…nRelation Track fully-pushed filter expressions on DataSourceV2ScanRelation. During pushDownFilters, compute pushed filters as the set-difference of normalized input filters minus post-scan filters. In pruneColumns, remap pushed filters through projectionFunc to ensure attribute references match the pruned scan output. The pushedFilters field is not yet wired into the constraint propagation framework (validConstraints). Doing so would change filter inference behavior and requires plan stability testing first.
| ordering: Option[Seq[SortOrder]] = None, | ||
| pushedFilters: Seq[Expression] = Seq.empty) extends LeafNode with NamedRelation { | ||
|
|
||
| // TODO: Override validConstraints to return ExpressionSet(pushedFilters) so that pushed |
There was a problem hiding this comment.
I don't have a strong opinion. We can do it in this PR and update golden files if needed, or defer it to a followup PR.
There was a problem hiding this comment.
Thank you both for the review! I'd like to defer that in a later PR to avoid blocking this one for too long for adjustments for golden files and others that might take quite some investigation time.
There was a problem hiding this comment.
As discussed offline, let's remove this comment?
| case projectionOverSchema(newExpr) => newExpr | ||
| } | ||
|
|
||
| // Remap pushed filter attributes to the pruned output schema. Note: if a pushed filter |
There was a problem hiding this comment.
I am not sure I follow. How can we drop the column if a filter references it? That seems like a violation of the contract?
There was a problem hiding this comment.
It actually could happen since in V2 a fully pushed filter can be removed from the plan and no longer be referenced, triggering column pruning to drop it. But Wenchen also pointed a similar issue later, that I'll share some more details in that comment
| assert(realOutput.length == holder.output.length, | ||
| "The data source returns unexpected number of columns") | ||
| val wrappedScan = getWrappedScan(scan, holder) | ||
| // Note: holder.pushedFilterExpressions is not propagated here because the output schema |
There was a problem hiding this comment.
In theory it's not, but the aggregate path replaces the output schema entirely (table columns -> aggregate columns), so the original filter expressions can't be remapped to the new output. So I'd prefer to defer this and revisit if there's a concrete use case.
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem: Once a filter is pushed into a DSv2 scan, the logical plan loses track of it — DataSourceV2ScanRelation contributes no constraints to the optimizer. This prevents constraint propagation (e.g., inferring filters across joins) and identification of redundant post-scan filters.
Design approach: Add a pushedFilters: Seq[Expression] field to DataSourceV2ScanRelation that records which Catalyst filter expressions were fully pushed down (not present in post-scan filters). The field is computed as the set-difference of normalized input filters minus post-scan filters using ExpressionSet for canonical comparison. Not yet wired into validConstraints — intentionally deferred for plan stability testing.
Key design decisions:
- Catalyst expressions (not V1/V2 filter objects): allows direct participation in the optimizer's expression infrastructure (tree transformations, canonicalization).
- Deferred
validConstraints: pragmatic — land the field first, wire up later after plan stability testing. - Skip in aggregate/join/variant paths: the output schema changes fundamentally in these paths, so the pushed-filter attribute references wouldn't map to the new output.
Implementation: ScanBuilderHolder.pushedFilterExpressions (mutable var) carries the data between pushDownFilters and pruneColumns, consistent with other pushed-down state (pushedPredicates, pushedAggregate, etc.). In pruneColumns, the filters are remapped through ProjectionOverSchema to match the pruned output schema. The field participates in doCanonicalize via normalizeExpressions.
|
|
||
| // Compute the pushed filter expressions: the normalized filters that were fully pushed | ||
| // down (i.e., not in postScanFilters). These are stored on the scan relation for | ||
| // potential future use in constraint propagation. |
There was a problem hiding this comment.
ExpressionSet.contains only checks its internal baseSet, which excludes non-deterministic expressions (they go to originals only — see ExpressionSet.add at ExpressionSet.scala:97-103). So if a non-deterministic filter like rand() > 0.5 is untranslatable and ends up in postScanFiltersWithoutSubquery, postScanFilterSet.contains will return false for it, and it will be incorrectly included in pushedFilterExpressions.
Trace: SELECT * FROM t WHERE i > 3 AND rand() > 0.5:
normalizedFiltersWithoutSubquery=[i > 3, rand() > 0.5]PushDownUtils.pushFilterscan't translaterand() > 0.5→untranslatableExprs→postScanFiltersWithoutSubqueryExpressionSet(postScanFiltersWithoutSubquery)→rand() > 0.5goes tooriginalsonly.filterNot(postScanFilterSet.contains)→contains(rand() > 0.5)→false→ NOT filtered- Result:
pushedFilterExpressionsincorrectly includesrand() > 0.5
Consider filtering the result:
| // potential future use in constraint propagation. | |
| val postScanFilterSet = ExpressionSet(postScanFiltersWithoutSubquery) | |
| sHolder.pushedFilterExpressions = | |
| normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic) |
There was a problem hiding this comment.
thanks for the info, TIL!
| _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, output))) | ||
| ) | ||
| ), | ||
| pushedFilters = pushedFilters.map(QueryPlan.normalizeExpressions(_, output)) |
There was a problem hiding this comment.
The comment in pruneColumns (line 809-814 of V2ScanRelationPushDown.scala) says stale references are "acceptable while pushedFilters is informational only." However, this line normalizes pushedFilters against output via normalizeExpressions, which returns ordinal -1 for attributes not in the output, leaving the original exprId intact. Two equivalent plans would have different exprIds in their canonical form, breaking canonicalized == comparison (used by subquery dedup, plan caching, etc.).
Consider filtering out pushed filters with stale references in pruneColumns:
val remappedPushedFilters = sHolder.pushedFilterExpressions.map(projectionFunc)
.filter(_.references.subsetOf(AttributeSet(output)))This ensures only filters with valid references participate in canonicalization, and is consistent with the "drop filters with stale references" option already mentioned in the comment.
There was a problem hiding this comment.
I debated with myself a bit on this and decided to not include pushed filters whose references are no longer in the pruned output.
The reason for me to include them before, is that I was a bit worried that a fully pushed filter can be lost from pushedFilter when the filtered column is not projected:
SELECT j FROM t WHERE i > 3with a connector that pushes both GreaterThan and IsNotNull:
- After analysis:
Project([j], Filter(i > 3, DataSourceV2Relation([i, j]))) - InferFiltersFromConstraints (runs before V2ScanRelationPushDown): derives
IsNotNull(i) → Project([j], Filter(i > 3 AND IsNotNull(i), DataSourceV2Relation([i, j]))) - pushDownFilters: both
i > 3andIsNotNull(i)fully pushed → Filter node removed →Project([j], ScanBuilderHolder), pushedFilterExpressions = [i > 3] - pruneColumns: only j is referenced →
output = [j], columnipruned → pushed filteri > 3references prunedi→ dropped
And I was a bit worried since part of the intention for this field was to keep what guarantee the connector can make, and now we couldn't have the complete information.
But for now, I think this is acceptable since 1/ pushedFilters is informational only; a potential future improvement could be to keep fully pushed filters as post-scan Filter nodes, which would prevent the column from being pruned in the first place, and 2/ it appears that spark plan validator will also catch and reject this case when it realizes that some references in expressions is not resolvable.
…ence bugs - Filter non-deterministic expressions from pushedFilterExpressions (ExpressionSet.contains misses them, causing incorrect inclusion) - Drop pushed filters whose references are no longer in the pruned output after column pruning - Add tests for both fixes
What changes were proposed in this pull request?
Add a
pushedFilters: Seq[Expression]field toDataSourceV2ScanRelationthat records which Catalyst filter expressions were fully pushed down to the data source and no longer appear as post-scanFilternodes.The field is computed in
V2ScanRelationPushDown.pushDownFiltersas the set-difference of normalized input filters minus post-scan filters (usingExpressionSetfor canonical comparison), and remapped throughprojectionFuncinpruneColumnsso that attribute references (including nested struct types) stay consistent with the pruned scan output.Other scan-building paths (
buildScanWithPushedAggregate,buildScanWithPushedJoin,buildScanWithPushedVariants) use the default empty value since their output schemas differ from table columns.The field is not yet wired into
validConstraints. Doing so would change filter inference behavior (e.g.,InferFiltersFromConstraintsadding/removing filters,PruneFiltersdropping redundant post-scan filters) and requires plan stability testing first.Why are the changes needed?
Once a filter is pushed into a DSv2 scan, the logical plan loses track of it --
DataSourceV2ScanRelationcontributes no constraints to the optimizer. This prevents constraint propagation (e.g., inferring filters across joins) and identification of redundant post-scan filters.This change stores the pushed filter information on the scan relation as groundwork for future constraint propagation.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests
Was this patch authored or co-authored using generative AI tooling?
claude code opus 4.6