Skip to content

Commit d06d086

Browse files
mihailotim-dbdtenedor
authored andcommitted
[SPARK-55983][SQL] New single-pass analyzer functionality and bugfixes
### What changes were proposed in this pull request? This PR implements core single-pass resolver infrastructure and bugfixes. The single-pass resolver is an alternative to the traditional fixed-point (iterative) analyzer that resolves SQL plans in a single bottom-up traversal. Key changes: **New infrastructure:** - `OperatorResolutionContext` and `OperatorResolutionContextStack` for tracking operator-level state during resolution (HAVING tracking, subquery aggregate push-down, grouping analytics) - `NameResolutionParameters` for bundling name resolution flags (LCA access, hidden output, view resolution, extract value keys, etc.) - `ResolverGuardResult` structured result type replacing boolean return from `ResolverGuard` - `NonDeterministicExpressionCheck` as a single-pass-only resolution check - `RetainsOriginalJoinOutput` trait for preserving join output when metadata columns change child projections - `AliasKind` enum for distinguishing alias types during resolution - `TryExtractOrdinal` utility for ordinal extraction from expressions **Core improvements:** - Extended `ExpressionResolutionContext` with window expression tracking (nestedness level, window function/spec flags, parent context) - Enriched `NameScope` with variable resolution, extract value extraction keys, aggregate expression alias lookup, and hidden output improvements - Improved `HavingResolver` to handle aggregate expressions extracted from subqueries and window + HAVING interaction patterns - Extended `ExpressionIdAssigner` with subquery expression ID remapping and outer reference mapping - Expanded `ResolverGuard` to support lambda functions, star-with-target, regex columns, COALESCE with star, and multi-part TVF names - Moved plan rewrite rules (`CleanupAliases`, `PullOutNondeterministic`, `PruneMetadataColumns`) from `ResolverRunner` to `Resolver.lookupMetadataAndResolve` for correct per-view config handling - Improved `HybridAnalyzer` tentative mode with comprehensive fallback handling - Various improvements to aggregate resolution, sort resolution, join resolution, set operation resolution, and subquery expression resolution ### Why are the changes needed? To bring the Apache Spark single-pass analyzer closer to feature parity with the fixed-point analyzer implementation, enabling: 1. Correct resolution of window expressions, HAVING clauses with windows, and complex aggregate patterns 2. Proper operator-level context tracking during resolution 3. Better name resolution with variable support, extract value keys, and hidden output handling 4. Structured guard results for cleaner tentative mode fallback logic 5. Foundation for future features like nested correlated subquery support ### Does this PR introduce _any_ user-facing change? No. The single-pass analyzer is behind feature flags (`spark.sql.analyzer.singlePassResolver.enabled` and `spark.sql.analyzer.singlePassResolver.enabled.tentatively`) and is not the default code path. ### How was this patch tested? - Extended `HybridAnalyzerSuite` with `testDualRun` framework covering dual-run, tentative, and single-pass modes - New `DataFrameAnalyzerTestGapsSuite` for alias resolution edge cases (implicit aliases, nested alias collapsing, autogenerated alias names) - New `SinglePassAnalyzerTestUtils` test utility trait - Updated `ResolverGuardSuite` with structured result assertions and new feature tests (star-with-target, regex columns, COALESCE with star, multi-part TVF names) - Updated `ExplicitlyUnsupportedResolverFeatureSuite` with generator edge case tests - Updated `ExpressionIdAssignerSuite` for new attribute reference mapping semantics and subquery remapping - Updated `NameScopeSuite` with variable resolution and extract value key tests - Updated `AggregateResolverSuite`, `AliasResolverSuite`, `MetadataResolverSuite`, `ViewResolverSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #54729 from mihailotim-db/resolver-pr1-core. Authored-by: Mihailo Timotic <[email protected]> Signed-off-by: Daniel Tenedorio <[email protected]>
1 parent 5acd8e6 commit d06d086

File tree

77 files changed

+4992
-1747
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+4992
-1747
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2476,6 +2476,13 @@
24762476
"Single-pass analyzer output schema:",
24772477
"<singlePassOutputSchema>"
24782478
]
2479+
},
2480+
"SINGLE_PASS_FAILED_FIXED_POINT_SUCCEEDED" : {
2481+
"message" : [
2482+
"Single-pass resolution failed, but fixed-point resolution succeeded.",
2483+
"Fixed-point analyzer output:",
2484+
"<fixedPointOutput>"
2485+
]
24792486
}
24802487
},
24812488
"sqlState" : "XX000"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala

Lines changed: 120 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,17 @@
1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

2020
import org.apache.spark.sql.AnalysisException
21-
import org.apache.spark.sql.catalyst.expressions.{Expression, OuterReference, SubExprUtils}
22-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ListAgg}
21+
import org.apache.spark.sql.catalyst.analysis.TypeCoercionValidation
22+
import org.apache.spark.sql.catalyst.expressions.{
23+
Expression,
24+
OuterReference,
25+
SubExprUtils
26+
}
27+
import org.apache.spark.sql.catalyst.expressions.aggregate.{
28+
AggregateExpression,
29+
AggregateFunction,
30+
ListAgg
31+
}
2332
import org.apache.spark.sql.catalyst.util.toPrettySQL
2433
import org.apache.spark.sql.errors.QueryCompilationErrors
2534

@@ -39,16 +48,18 @@ class AggregateExpressionResolver(
3948
private val expressionResolutionContextStack =
4049
expressionResolver.getExpressionResolutionContextStack
4150
private val subqueryRegistry = operatorResolver.getSubqueryRegistry
42-
private val autoGeneratedAliasProvider = new AutoGeneratedAliasProvider(
43-
expressionResolver.getExpressionIdAssigner
44-
)
51+
private val autoGeneratedAliasProvider = expressionResolver.getAutoGeneratedAliasProvider
4552

4653
/**
4754
* Resolves the given [[AggregateExpression]] originating from [[ExpressionResolver]] by
4855
* resolving its children recursively and validating the resolved expression.
4956
*/
5057
override def resolve(aggregateExpression: AggregateExpression): Expression = {
51-
expressionResolutionContextStack.peek().resolvingTreeUnderAggregateExpression = true
58+
val expressionResolutionContext = expressionResolutionContextStack.peek()
59+
expressionResolutionContext.resolvingTreeUnderAggregateExpression = true
60+
if (expressionResolutionContext.resolvingWindowFunction) {
61+
expressionResolutionContext.windowFunctionNestednessLevel += 1
62+
}
5263
val aggregateExpressionWithChildrenResolved =
5364
withResolvedChildren(aggregateExpression, expressionResolver.resolve _)
5465
.asInstanceOf[AggregateExpression]
@@ -76,25 +87,84 @@ class AggregateExpressionResolver(
7687
* - Resolution:
7788
* 1. Update the [[ExpressionResolver.expressionResolutionContextStack]];
7889
* 2. Handle [[OuterReference]] in [[AggregateExpression]], if there are any (see
79-
* `handleOuterAggregateExpression`);
90+
* `handleOuterAggregateExpression`);
91+
* 3. Confirm that we are resolving an actual aggregate expression, not a window aggregate
92+
* function by assessing the `windowFunctionNestednessLevel`:
93+
*
94+
* - In this query, `windowFunctionNestednessLevel` is `1` when resolving `SUM()`,
95+
* meaning we are resolving a window aggregate function:
96+
*
97+
* {{{ SELECT SUM() OVER() FROM VALUES 1, 2; }}}
98+
*
99+
* - In this query, `windowFunctionNestednessLevel` is `2` when resolving `SUM(col1)`,
100+
* meaning we are resolving an aggregate expression:
101+
*
102+
* {{{
103+
* SELECT
104+
* SUM(SUM(col1)) OVER (PARTITION BY SUM(col1))
105+
* FROM
106+
* VALUES (1, 2)
107+
* GROUP BY col2;
108+
* }}}
109+
*
110+
* - In this query, `windowFunctionNestednessLevel` is `0` when resolving `SUM(col1)`,
111+
* meaning we are resolving an aggregate expression:
112+
*
113+
* {{{ SELECT SUM(col1) FROM VALUES (1, 2); }}}
114+
*
115+
* If the expression is not a window aggregate function, set
116+
* `hasAggregateExpressionsOutsideWindow` to `true` and collect it as window source
117+
* expression.
118+
*
119+
* In this query:
120+
*
121+
* {{{ SELECT SUM(SUM(col1)) OVER () FROM VALUES (1, 2) GROUP BY col2; }}}
122+
*
123+
* `SUM(col1)` is collected, whereas `SUM(SUM(col1))` is not because the latter is a
124+
* window aggregate function. As a result, `SUM(col1)` is handled by [[Aggregate]],
125+
* while `SUM(SUM(col1))` is handled by [[Window]]:
126+
*
127+
* Window [sum(_w0#2) windowspecdefinition...#3]
128+
* +- Aggregate [col2#1], [sum(col1#0) AS _w0#2]
80129
* - Validation:
81-
* 1. [[ListAgg]] is not allowed in DISTINCT aggregates if the order value is ambiguous
130+
* 1. Validate that the type coercion is doen properly for the underlying aggregate function.
131+
* 2. [[ListAgg]] is not allowed in DISTINCT aggregates if the order value is ambiguous
82132
* after deduplication. When [[SQLConf.LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER]] is
83133
* enabled, a mismatch is tolerated if the child value uniquely determines the order
84134
* value (see [[ListAgg.hasDistinctOrderAmbiguity]]);
85-
* 2. Nested aggregate functions are not allowed;
86-
* 3. Nondeterministic expressions in the subtree of a related aggregate function are not
135+
* 3. [[PythonUDAF]] is not allowed in [[Pivot.aggregates]];
136+
* 4. Nested aggregate functions are not allowed, unless the outer function represents
137+
* [[WindowExpression.windowFunction]]. For example the following query is considered
138+
* valid, because `SUM(SUM(col1))` is followed by an `OVER` clause, making it a nesting
139+
* of aggregate expression in a window expression:
140+
*
141+
* {{{
142+
* SELECT
143+
* SUM(SUM(col1)) OVER (PARTITION BY col2)
144+
* FROM (VALUES (1, 'a'), (2, 'b'))
145+
* GROUP BY col2
146+
* }}}
147+
*
148+
* On the other hand, this query should be invalid, because `SUM(SUM(col1))` isn't followed
149+
* by an `OVER` clause, therefore it should be treated like aggregate expression nesting:
150+
*
151+
* {{{
152+
* SELECT
153+
* SUM(SUM(SUM(col1)) OVER (PARTITION BY col2)
154+
* FROM (VALUES (1, 'a'), (2, 'b'))
155+
* GROUP BY col2
156+
* }}}
157+
*
158+
* 5. Nondeterministic expressions in the subtree of a related aggregate function are not
87159
* allowed;
88-
* 4. The mix of outer and local references is not allowed;
160+
* 6. The mix of outer and local references is not allowed;
89161
*/
90162
private def handleAggregateExpressionWithChildrenResolved(
91163
aggregateExpressionWithChildrenResolved: AggregateExpression): Expression = {
92164
val expressionResolutionContext = expressionResolutionContextStack.peek()
93165

94166
validateResolvedAggregateExpression(aggregateExpressionWithChildrenResolved)
95167

96-
expressionResolutionContext.hasAggregateExpressions = true
97-
98168
// There are two different cases that we handle regarding the value of the flag:
99169
//
100170
// - We have an attribute under an `AggregateExpression`:
@@ -108,20 +178,31 @@ class AggregateExpressionResolver(
108178
// It would be `true` as described above.
109179
expressionResolutionContext.hasAttributeOutsideOfAggregateExpressions = false
110180

111-
if (expressionResolutionContext.hasOuterReferences) {
181+
val resolvedAggregateExpression = if (expressionResolutionContext.hasOuterReferences) {
112182
handleOuterAggregateExpression(aggregateExpressionWithChildrenResolved)
113183
} else {
114184
aggregateExpressionWithChildrenResolved
115185
}
186+
187+
if (expressionResolutionContext.windowFunctionNestednessLevel != 1) {
188+
expressionResolutionContext.hasAggregateExpressionsOutsideWindow = true
189+
}
190+
191+
resolvedAggregateExpression
116192
}
117193

118194
private def validateResolvedAggregateExpression(aggregateExpression: AggregateExpression): Unit =
119195
aggregateExpression match {
196+
case _ @AggregateExpression(aggregateFunction: AggregateFunction, _, _, _, _)
197+
if aggregateFunction.checkInputDataTypes().isFailure =>
198+
TypeCoercionValidation.failOnTypeCheckResult(aggregateFunction)
120199
case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _)
121200
if agg.isDistinct && listAgg.hasDistinctOrderAmbiguity =>
122201
listAgg.throwDistinctOrderError()
123202
case _ =>
124-
if (expressionResolutionContextStack.peek().hasAggregateExpressions) {
203+
val expressionResolutionContext = expressionResolutionContextStack.peek()
204+
if (expressionResolutionContext.windowFunctionNestednessLevel != 1 &&
205+
expressionResolutionContext.hasAggregateExpressionsOutsideWindow) {
125206
throwNestedAggregateFunction(aggregateExpression)
126207
}
127208

@@ -145,8 +226,9 @@ class AggregateExpressionResolver(
145226
* - If outer aggregates are allowed, replace the [[AggregateExpression]] with an
146227
* [[OuterReference]] to the auto-generated [[Alias]] that we created in case the subtree
147228
* without [[OuterReference]]s can't be found in the outer
148-
* [[Aggregate.aggregateExpressions]] list. Otherwise, use the [[Alias]] from the outer
149-
* [[Aggregate]]. This alias will later be injected into the outer [[Aggregate]];
229+
* [[Aggregate.aggregateExpressions]] list or a semantically equal expression wasn't already
230+
* added. Otherwise, use the [[Alias]] from the outer [[Aggregate]]. This alias will later be
231+
* injected into the outer [[Aggregate]];
150232
* - Store the name that needs to be used for the [[OuterReference]] in
151233
* [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] and
152234
* [[OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE]], both computed from the
@@ -202,20 +284,27 @@ class AggregateExpressionResolver(
202284
child = aggregateExpressionWithStrippedOuterReferences
203285
)
204286

205-
val (_, referencedAggregateExpressionAlias) =
206-
aggregateExpressionsExtractor.collectFirstAggregateExpression(
207-
aggregateExpressionWithStrippedOuterReferences
208-
)
209-
210-
referencedAggregateExpressionAlias match {
211-
case Some(alias) =>
212-
subqueryRegistry.currentScope.addAliasForOuterAggregateExpression(alias)
213-
OuterReference(alias.toAttribute)
287+
subqueryRegistry.currentScope.getExtractedAggregateExpression(
288+
aggregateExpressionWithStrippedOuterReferences
289+
) match {
290+
case Some(alreadyExtractedAggregateExpression) =>
291+
OuterReference(alreadyExtractedAggregateExpression.toAttribute)
214292
case None =>
215-
subqueryRegistry.currentScope.addAliasForOuterAggregateExpression(
216-
outerAggregateExpressionAlias
217-
)
218-
OuterReference(outerAggregateExpressionAlias.toAttribute)
293+
val (_, referencedAggregateExpressionAlias) =
294+
aggregateExpressionsExtractor.collectFirstAggregateExpression(
295+
aggregateExpressionWithStrippedOuterReferences
296+
)
297+
298+
referencedAggregateExpressionAlias match {
299+
case Some(alias) =>
300+
subqueryRegistry.currentScope.addExtractedAggregateExpression(alias)
301+
OuterReference(alias.toAttribute)
302+
case None =>
303+
subqueryRegistry.currentScope.addExtractedAggregateExpression(
304+
outerAggregateExpressionAlias
305+
)
306+
OuterReference(outerAggregateExpressionAlias.toAttribute)
307+
}
219308
}
220309
}
221310

@@ -236,4 +325,5 @@ class AggregateExpressionResolver(
236325
origin = nonDeterministicChild.origin
237326
)
238327
}
328+
239329
}

0 commit comments

Comments
 (0)