feat: precomputed-hash column for Partial→Hash-Repartition shuffle (draft)#21816
feat: precomputed-hash column for Partial→Hash-Repartition shuffle (draft)#21816Dandandan wants to merge 2 commits intoapache:mainfrom
Conversation
Adds an opt-in `datafusion.execution.emit_aggregate_group_hash` config option. When enabled, a Partial `AggregateExec` whose immediate consumer is `RepartitionExec` with `Partitioning::Hash` over the same group columns appends a trailing `__datafusion_precomputed_hash: UInt64` field to its output. The hash is computed once with `REPARTITION_RANDOM_STATE` and `RepartitionExec` consumes it directly via a new fast path, eliminating a full rehashing pass on the shuffle — biggest wins on string/binary group keys (e.g. Clickbench regexp_replace keys). Pieces: - `AggregateExec::with_emit_group_hash(bool)` rebuilds schema + cache. - `create_schema` appends the tagged `UInt64` field with source-column indices in field metadata so multi-column groups work. - `row_hash::emit` / `transform_to_states` hash the group arrays with the repartition seed and append as the last column. - `RepartitionExec::Hash` scans its input schema for the precomputed marker; matches both "partitioning expr IS the hash column" and "partitioning exprs are the recorded source columns in order". - New optimizer rule `EmitPartialAggregateHash` flips the flag when the Partial→Hash-Repartition pattern is present; config-gated so default behavior is unchanged. - Field sits at the end of the Partial's output; Final's indexing into group/state columns is unaffected and its own output schema is clean. Out of scope for this PR: - FinalPartitioned `GroupValues` reuse (requires GroupValues trait extension and a second agg-seed hash column — noted as follow-up). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (de92c7a) to 067ba4b (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (de92c7a) to 067ba4b (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (de92c7a) to 067ba4b (merge-base) diff using: tpcds File an issue against this benchmark runner |
Extends the precomputed-hash pipeline through the Final side: - Unify `AGGREGATION_HASH_SEED` with `REPARTITION_RANDOM_STATE` (seed 0) so a single hash value is valid for the Partial's hash table, RepartitionExec routing, and the Final's hash-table probe. Hashbrown selects buckets from high bits while routing uses low bits, so reuse is safe in practice. - Add `GroupValues::intern_with_hashes(cols, hashes, groups)` with a default implementation that falls back to `intern`. Override on `GroupValuesRows` (the row-based general path) to skip the `create_hashes` call; factored the probe/insert loop into `GroupValuesRows::intern_rows`. - Wire `GroupedHashAggregateStream`: when the Final-side input schema carries a `UInt64` field tagged with the precomputed-hash metadata, extract the array and feed its values into `intern_with_hashes`. - Flip `datafusion.execution.emit_aggregate_group_hash` default to `true`. The rule is still gated by the config so it's trivially disabled per session if needed. Other `GroupValues` specializations (single-column bytes/primitive/ boolean, multi-column composite) pick up the fallback behavior — they still work correctly, just without the Final-side saving until each implementation opts in. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (ac295b2) to 067ba4b (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (ac295b2) to 067ba4b (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing precomputed-group-hash-for-repartition (ac295b2) to 067ba4b (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Summary
datafusion.execution.emit_aggregate_group_hash(default false). When enabled, a PartialAggregateExecfeeding a HashRepartitionExecover the same group columns emits a trailingUInt64column of precomputed row hashes (seeded withREPARTITION_RANDOM_STATE).RepartitionExec::Hashconsumes the column directly via a new fast path, eliminating one full rehashing pass on the shuffle. Biggest wins on string/binary group keys (e.g. Clickbenchregexp_replacekeys).EmitPartialAggregateHashflips the flag on matchingPartial → Hash-Repartitionpairs. Config-gated so default behavior is unchanged.What the rule does
datafusion.precomputed_hash = "repartition_seed_0"plus adatafusion.precomputed_hash_colsCSV of source indices).Out of scope / follow-ups
GroupValuesreuse. Would need aGroupValuestrait extension + a second hash column seeded withAGGREGATION_HASH_SEEDto preserve the intentional seed difference between the shuffle hash and the agg-side probe hash.Test plan
create_hashes(&group_arrays, REPARTITION_RANDOM_STATE)with_emit_group_hash(true)is a no-op on non-Partial modesdetect_precomputed_hash_columnmatches (a) hash-column-as-partitioning-expr and (b) multi-column group with source indices, and rejects subset/reorderingcargo fmt --all+cargo clippy -p datafusion-physical-plan -p datafusion-physical-optimizer -p datafusion-common --all-targets -- -D warnings🤖 Generated with Claude Code