Skip to content

Commit 18c7d84

Browse files
committed
Recreate group_values after spill merge to fix duplicate group keys (#20724)
When switching to streaming merge after spill, group_ordering is set to Full but group_values is not recreated. The existing GroupValuesColumn<false> uses vectorized_intern which can produce non-monotonic group indices, violating GroupOrderingFull's assumption and causing duplicate groups in the output. Fix: recreate group_values with the correct streaming mode after updating group_ordering in update_merged_stream().
1 parent a960247 commit 18c7d84

1 file changed

Lines changed: 8 additions & 0 deletions

File tree

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,14 @@ impl GroupedHashAggregateStream {
12671267
// on the grouping columns.
12681268
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
12691269

1270+
// Recreate group_values to use streaming mode (scalarized_intern) which preserves input
1271+
// row order, as required by GroupOrderingFull.
1272+
let group_schema = self
1273+
.spill_state
1274+
.merging_group_by
1275+
.group_schema(&self.spill_state.spill_schema)?;
1276+
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
1277+
12701278
// Use `OutOfMemoryMode::ReportError` from this point on
12711279
// to ensure we don't spill the spilled data to disk again.
12721280
self.oom_mode = OutOfMemoryMode::ReportError;

0 commit comments

Comments
 (0)