Skip to content

Commit fcb1c93

Browse files
gboucher90alamb
andauthored
Fix duplicate group keys after hash aggregation spill (#20724) (#20858)
## Which issue does this PR close? - Closes #20724 ## Rationale for this change When `update_merged_stream()` switches to streaming merge after spill, it sets `group_ordering = GroupOrdering::Full` but does not recreate `group_values`. The existing `GroupValuesColumn<false>` uses `vectorized_intern`, which can produce non-monotonic group indices under hash collisions, violating `GroupOrderingFull`'s assumption and causing duplicate groups in the output. ### Example with concrete values Consider a batch of 6 rows arriving at `vectorized_intern` after spill merge, with keys that hash to only 2 buckets (due to partial collisions): ``` Row Key Hash (truncated) Bucket ─── ───────── ──────────────── ────── 0 (1, 100) 0x03 3 1 (1, 200) 0x03 3 ← collides with row 0 2 (1, 300) 0x07 7 3 (1, 400) 0x07 7 ← collides with row 2 4 (1, 500) 0x03 3 ← collides with rows 0,1 5 (1, 600) 0x07 7 ← collides with rows 2,3 ``` `vectorized_intern` processes rows in two passes: 1. **Fast path** (empty bucket → new group): rows 0, 2 get group indices 0, 1 2. **Collision path** (bucket occupied → `equal_to` → not equal → new group): rows 1, 3, 4, 5 get indices 2, 3, 4, 5 The resulting group index assignment is: ``` Row Key Group Index ─── ───────── ─────────── 0 (1, 100) 0 ← fast path 1 (1, 200) 2 ← collision path (processed after all fast-path rows) 2 (1, 300) 1 ← fast path 3 (1, 400) 3 ← collision path 4 (1, 500) 4 ← collision path 5 (1, 600) 5 ← collision path ``` The group indices are **non-monotonic**: `[0, 2, 1, 3, 4, 5]`. The input was sorted `(1,100) < (1,200) < (1,300)`, but group 2 (key `(1,200)`) comes after group 1 (key `(1,300)`). `GroupOrderingFull` tracks progress via `current = total_num_groups - 1`. When group index 1 is seen, it thinks group 0 is complete and emits it. But later, group index 2 arrives for key `(1,200)` — a key that sorts *between* already-emitted groups. If this group happens to span a batch boundary, it gets emitted twice: once partially in the current batch and once in the next batch, producing **duplicate group keys** in the final output. ## What changes are included in this PR? **Fix** (`row_hash.rs`): Recreate `group_values` after updating `group_ordering` in `update_merged_stream()`, so the streaming-mode `GroupValuesColumn<true>` (with `scalarized_intern`) is used instead. `scalarized_intern` processes rows one at a time, always producing monotonic group indices. **Regression test** (`memory_limit/mod.rs`): Deterministic test that reproduces the bug using a new `force_hash_partial_collisions` feature flag. **New feature flag** (`force_hash_partial_collisions`): Truncates real hashes to 5 bits (32 distinct values) instead of zeroing them all. Full collisions (`force_hash_collisions`) paradoxically do not trigger the bug because all rows land in the same bucket, so every row takes the collision path in order — producing monotonic indices. Partial collisions create the necessary mix of fast-path and collision-path rows. **CI** (`extended.yml`, `rust.yml`): Added the regression test to extended tests and a compile check for the new feature. ## Are these changes tested? Yes — the new `test_no_duplicate_groups_after_spill` test deterministically reproduces the bug (fails without the fix, passes with it) when compiled with `--features force_hash_partial_collisions`. ## Are there any user-facing changes? No. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 57b275a commit fcb1c93

1 file changed

Lines changed: 12 additions & 0 deletions

File tree

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream {
12671267
// on the grouping columns.
12681268
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
12691269

1270+
// Recreate `group_values` for streaming merge so group ids are assigned
1271+
// in first-seen order, as required by `GroupOrderingFull`.
1272+
// The pre-spill multi-column collector may use `vectorized_intern`, which
1273+
// can assign new group ids out of input order under hash collisions.
1274+
let group_schema = self
1275+
.spill_state
1276+
.merging_group_by
1277+
.group_schema(&self.spill_state.spill_schema)?;
1278+
if group_schema.fields().len() > 1 {
1279+
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
1280+
}
1281+
12701282
// Use `OutOfMemoryMode::ReportError` from this point on
12711283
// to ensure we don't spill the spilled data to disk again.
12721284
self.oom_mode = OutOfMemoryMode::ReportError;

0 commit comments

Comments
 (0)