@@ -24,7 +24,9 @@ use std::sync::{Arc, LazyLock};
2424#[ cfg( feature = "extended_tests" ) ]
2525mod memory_limit_validation;
2626mod repartition_mem_limit;
27- use arrow:: array:: { ArrayRef , DictionaryArray , Int32Array , RecordBatch , StringViewArray } ;
27+ use arrow:: array:: {
28+ ArrayRef , DictionaryArray , Int32Array , Int64Array , RecordBatch , StringViewArray ,
29+ } ;
2830use arrow:: compute:: SortOptions ;
2931use arrow:: datatypes:: { Int32Type , SchemaRef } ;
3032use arrow_schema:: { DataType , Field , Schema } ;
@@ -56,6 +58,7 @@ use datafusion_physical_plan::collect as collect_batches;
5658use datafusion_physical_plan:: common:: collect;
5759use datafusion_physical_plan:: spill:: get_record_batch_memory_size;
5860use rand:: Rng ;
61+ use std:: collections:: HashSet ;
5962use test_utils:: AccessLogGenerator ;
6063
6164use async_trait:: async_trait;
@@ -1172,3 +1175,123 @@ impl TableProvider for SortedTableProvider {
11721175 Ok ( DataSourceExec :: from_data_source ( mem_conf) )
11731176 }
11741177}
1178+
1179+ // ============================================================================
1180+ // Regression tests for https://github.com/apache/datafusion/issues/20724
1181+ //
1182+ // When hash aggregation spills and switches to streaming merge,
1183+ // `group_values` must be recreated with the streaming variant.
1184+ // Otherwise `vectorized_intern` can produce non-monotonic group indices
1185+ // under hash collisions, causing `GroupOrderingFull` to prematurely
1186+ // emit groups → duplicate keys in output.
1187+ // ============================================================================
1188+
1189+ /// Helper: set up a session that forces spilling during aggregation.
1190+ async fn setup_spill_agg_context (
1191+ memory_limit : usize ,
1192+ batch_size : usize ,
1193+ ) -> Result < SessionContext > {
1194+ let runtime = RuntimeEnvBuilder :: new ( )
1195+ . with_memory_pool ( Arc :: new ( FairSpillPool :: new ( memory_limit) ) )
1196+ . with_disk_manager_builder (
1197+ DiskManagerBuilder :: default ( ) . with_mode ( DiskManagerMode :: OsTmpDirectory ) ,
1198+ )
1199+ . build_arc ( )
1200+ . unwrap ( ) ;
1201+
1202+ let config = SessionConfig :: new ( )
1203+ . with_sort_spill_reservation_bytes ( 64 * 1024 )
1204+ . with_sort_in_place_threshold_bytes ( 0 )
1205+ . with_spill_compression ( SpillCompression :: Uncompressed )
1206+ . with_batch_size ( batch_size)
1207+ . with_target_partitions ( 1 ) ;
1208+
1209+ Ok ( SessionContext :: new_with_config_rt ( config, runtime) )
1210+ }
1211+
1212+ /// Regression test for https://github.com/apache/datafusion/issues/20724
1213+ ///
1214+ /// When hash aggregation spills and switches to streaming merge,
1215+ /// `group_values` (GroupValuesColumn<false>) is not recreated with the
1216+ /// streaming variant (<true>). This means `vectorized_intern` is used
1217+ /// post-spill, which can produce non-monotonic group indices under hash
1218+ /// collisions, causing `GroupOrderingFull` to prematurely emit groups
1219+ /// and create duplicate keys in the output.
1220+ ///
1221+ /// Requirements to trigger:
1222+ /// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`)
1223+ /// - `force_hash_partial_collisions` feature → truncated hashes create the mix
1224+ /// of colliding/non-colliding keys needed for non-monotonic indices
1225+ /// - `batch_size=50` → not a multiple of rows-per-group in the merged stream,
1226+ /// so groups span batch boundaries and premature emission causes duplicates
1227+ #[ tokio:: test]
1228+ async fn test_no_duplicate_groups_after_spill ( ) -> Result < ( ) > {
1229+ let num_keys: i64 = 5000 ;
1230+ let rows_per_key: i64 = 4 ;
1231+ let total_rows = ( num_keys * rows_per_key) as usize ;
1232+
1233+ let schema = Arc :: new ( Schema :: new ( vec ! [
1234+ Field :: new( "key_a" , DataType :: Int64 , false ) ,
1235+ Field :: new( "key_b" , DataType :: Int64 , false ) ,
1236+ Field :: new( "value" , DataType :: Int64 , false ) ,
1237+ ] ) ) ;
1238+
1239+ let mut keys_a = Vec :: with_capacity ( total_rows) ;
1240+ let mut keys_b = Vec :: with_capacity ( total_rows) ;
1241+ let mut vals = Vec :: with_capacity ( total_rows) ;
1242+ for r in 0 ..rows_per_key {
1243+ for k in 0 ..num_keys {
1244+ keys_a. push ( k / 100 ) ;
1245+ keys_b. push ( k % 100 ) ;
1246+ vals. push ( r * num_keys + k) ;
1247+ }
1248+ }
1249+
1250+ let mut batches = Vec :: new ( ) ;
1251+ for start in ( 0 ..total_rows) . step_by ( 500 ) {
1252+ let end = ( start + 500 ) . min ( total_rows) ;
1253+ batches. push ( RecordBatch :: try_new (
1254+ Arc :: clone ( & schema) ,
1255+ vec ! [
1256+ Arc :: new( Int64Array :: from( keys_a[ start..end] . to_vec( ) ) ) ,
1257+ Arc :: new( Int64Array :: from( keys_b[ start..end] . to_vec( ) ) ) ,
1258+ Arc :: new( Int64Array :: from( vals[ start..end] . to_vec( ) ) ) ,
1259+ ] ,
1260+ ) ?) ;
1261+ }
1262+
1263+ let ctx = setup_spill_agg_context ( 128 * 1024 , 50 ) . await ?;
1264+ let table = MemTable :: try_new ( schema, vec ! [ batches] ) ?;
1265+ ctx. register_table ( "t" , Arc :: new ( table) ) ?;
1266+
1267+ let df = ctx
1268+ . sql ( "SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b" )
1269+ . await ?;
1270+ let results =
1271+ collect_batches ( df. create_physical_plan ( ) . await ?, ctx. task_ctx ( ) ) . await ?;
1272+
1273+ let mut seen = HashSet :: new ( ) ;
1274+ for batch in & results {
1275+ let ka = batch
1276+ . column ( 0 )
1277+ . as_any ( )
1278+ . downcast_ref :: < Int64Array > ( )
1279+ . unwrap ( ) ;
1280+ let kb = batch
1281+ . column ( 1 )
1282+ . as_any ( )
1283+ . downcast_ref :: < Int64Array > ( )
1284+ . unwrap ( ) ;
1285+ for i in 0 ..batch. num_rows ( ) {
1286+ assert ! (
1287+ seen. insert( ( ka. value( i) , kb. value( i) ) ) ,
1288+ "DUPLICATE group key ({}, {}). \
1289+ Bug #20724: group_values not recreated for streaming merge.",
1290+ ka. value( i) ,
1291+ kb. value( i) ,
1292+ ) ;
1293+ }
1294+ }
1295+ assert_eq ! ( seen. len( ) , num_keys as usize ) ;
1296+ Ok ( ( ) )
1297+ }
0 commit comments