perf: avoid redundant columnar shuffle when both parent and child are non-Comet#4010
Conversation
| * nor the child is a Comet plan that can consume columnar output, that conversion is pure | ||
| * overhead (row->arrow->shuffle->arrow->row vs. row->shuffle->row). | ||
| */ | ||
| private def revertRedundantColumnarShuffle(plan: SparkPlan): SparkPlan = { |
There was a problem hiding this comment.
Thanks for the opttimzation @andygrove
I guess this is the first time where CometShuffleExchangeExec is reverted back to a plain ShuffleExchangeExec.
The two shuffle paths use different memory systems:
- Comet columnar shuffle uses Comet's own memory pool. (off-heap)
- Spark vanilla shuffle uses the JVM execution memory pool , with spills managed by ExternalSorter.
Users who have tuned their clusters for Comet (smaller JVM heap) could see unexpected spills after this chang, shifting shuffle memory pressure back to theJVM.
Additionally, Comet's Arrow IPC columnar format typically compresses better than Spark's row-based UnsafeRowSerializer path, so shuffle I/O mayalso increase.
It would be good to document or log when a shuffle is reverted so users can correlate any unexpected behavior with this optimization.
There was a problem hiding this comment.
Thanks for the feedback @karuppayya. I'm assuming the cost of doing two transitions (r2c then c2r) would outweigh the benefits of using Comet shuffle? I agree that it would be worth adding documentation.
There was a problem hiding this comment.
I agree that this optimization will improve performance and compute efficiency.
My main concern is determining the best recommendation for users to tune memory, particularly since they cannot explicitly disable it.
Also can it be a seperate rule in itself and have it only in org.apache.comet.CometSparkSessionExtensions.CometExecColumnar#postColumnarTransitions?
There was a problem hiding this comment.
@karuppayya I added a section to the tuning guide and also added a config to enable/disable this optimization. Could you take another look?
I'm open to refactoring to create different rules, but would prefer to wait for some current DPP work to finish first, and also some work for fixing planning issues with mixed partial/final aggregates.
When a CometShuffleExchangeExec with CometColumnarShuffle has a non-Comet child and a non-Comet parent, the columnar shuffle only adds row->arrow->shuffle->arrow->row conversion overhead with no Comet operator on either side to consume columnar output. Revert such shuffles to the original Spark ShuffleExchangeExec after the main transform pass. Closes apache#4004
The broader match that checked any non-Comet parent broke object-mode Dataset plans in CometIcebergNativeSuite (DeserializeToObject around a CometColumnarExchange over encoder nodes). A CometNativeColumnarToRowExec elsewhere in the plan had its assertion child.supportsColumnar violated when transform bubbled up the new row-based Exchange. Restrict the match to the exact reported pattern: HashAggregateExec or ObjectHashAggregateExec on both sides of the shuffle. Golden TPC-DS plans are unchanged by this narrowing.
…ce Comet columnar shuffle Without the tag, AQE re-plans each stage in isolation, and the isolated subplan (which no longer shows the parent aggregate) converts the reverted ShuffleExchangeExec back into a CometShuffleExchangeExec. Subsequent plan canonicalization then fails because a ColumnarToRowExec ends up with a non-columnar child. Persist the revert decision via a TreeNodeTag on the ShuffleExchangeExec. Both applyCometShuffle and the main transform now short-circuit when the tag is set, so the decision survives re-entrancy.
c856b23 to
5e1e49d
Compare
| "The maximum number of columns to hash for round robin partitioning must be non-negative.") | ||
| .createWithDefault(0) | ||
|
|
||
| val COMET_EXEC_SHUFFLE_REVERT_SANDWICHED_ENABLED: ConfigEntry[Boolean] = |
There was a problem hiding this comment.
I'm not sure about sandwich name, tbh
There was a problem hiding this comment.
making you hungry? 🥪 🤤
There was a problem hiding this comment.
I have now removed all bread-related terminology from this PR. I suppose you could say it is now Gluten-free?
|
I ran benchmarks TPC-DS @ 1TB and saw 2.4% speedup. Could just be noise. |
parthchandra
left a comment
There was a problem hiding this comment.
Absolutely great PR. Some small suggestions below, otherwise lgtm
| * | ||
| * Also tag the reverted shuffle so AQE stage-isolated re-planning does not convert it back to a | ||
| * Comet shuffle when the outer aggregate context is no longer visible. | ||
| */ |
There was a problem hiding this comment.
Maybe also add that correctness depends on running as preColumnarTransitions (before Spark inserts ColumnarToRowExec), and that the SKIP_COMET_SHUFFLE_TAG guards against re-conversion during both AQE QueryStagePrepRule and ColumnarRule passes.
There was a problem hiding this comment.
Expanded the docstring in 2241ccb to call out the preColumnarTransitions correctness dependency and the SKIP_COMET_SHUFFLE_TAG guarding both the AQE QueryStagePrepRule and ColumnarRule re-entries.
| @@ -229,4 +229,37 @@ class CometExecRuleSuite extends CometTestBase { | |||
| } | |||
| } | |||
|
|
|||
| test("CometExecRule should not wrap shuffle in CometColumnarShuffle when both sides are JVM") { | |||
There was a problem hiding this comment.
Claude recommends: Two additional tests would round out coverage: (a) assert the revert does NOT fire when revertRedundantColumnar.enabled=false, and (b) assert the revert does NOT fire when aggregates are successfully converted to Comet native.
There was a problem hiding this comment.
Added both tests in 2241ccb: one asserts the revert does not fire when COMET_EXEC_SHUFFLE_REVERT_REDUNDANT_COLUMNAR_ENABLED=false (shuffle stays CometColumnarShuffle), and one asserts it does not fire when both aggregates convert to Comet native (shuffle stays CometColumnarShuffle with CometHashAggregate on both sides).
…e tests Document that correctness requires running in preColumnarTransitions and that SKIP_COMET_SHUFFLE_TAG guards against re-entry from both the AQE QueryStagePrepRule pass and the ColumnarRule pass. Add two CometExecRuleSuite cases asserting the revert does not fire when the revert config is disabled and when both aggregates convert to Comet native.
Which issue does this PR close?
Closes #4004.
Rationale for this change
In TPC-DS plans such as q1, when partial/final hash aggregates cannot be converted to Comet (e.g. due to unsupported aggregate expressions or types), the shuffle between them is still wrapped as
CometColumnarExchange(columnar shuffle). With a JVM operator on both sides of the shuffle, this adds a row -> Arrow -> shuffle -> Arrow -> row round trip with no Comet operator on either side able to consume columnar output:The extra conversion is pure overhead compared to a vanilla Spark row-based shuffle.
What changes are included in this PR?
revertRedundantColumnarShuffleinCometExecRulethat detects aCometShuffleExchangeExecwithCometColumnarShufflewhose parent and child are both non-Comet hash aggregates, and reverts it to the original SparkShuffleExchangeExec.spark.comet.exec.shuffle.revertRedundantColumnar.enabled(defaulttrue). Documented the memory/IO tradeoffs in the tuning guide.After the fix:
How are these changes tested?
CometExecRule should not wrap shuffle in CometColumnarShuffle when both sides are JVMinCometExecRuleSuitedisables partial hash aggregate to force both aggregates to stay JVM, then asserts that the shuffle remains a plainShuffleExchangeExec.CometShuffleSuite,CometExecSuite,CometShuffleFallbackStickinessSuite, and both TPC-DS plan-stability suites (v1.4 and v2.7) continue to pass against the regenerated golden files.