1616// under the License.
1717
1818use ahash:: RandomState ;
19+ use arrow:: array:: NullArray ;
20+ use arrow:: buffer:: { OffsetBuffer , ScalarBuffer } ;
1921use datafusion_common:: stats:: Precision ;
2022use datafusion_expr:: expr:: WindowFunction ;
2123use datafusion_functions_aggregate_common:: aggregate:: count_distinct:: BytesViewDistinctCountAccumulator ;
24+ use datafusion_functions_aggregate_common:: aggregate:: groups_accumulator:: nulls:: filtered_null_mask;
2225use datafusion_macros:: user_doc;
2326use datafusion_physical_expr:: expressions;
2427use std:: collections:: HashSet ;
@@ -64,7 +67,6 @@ use datafusion_physical_expr_common::binary_map::OutputType;
6467
6568use datafusion_common:: cast:: as_list_array;
6669use datafusion_common:: utils:: expr:: COUNT_STAR_EXPANSION ;
67- use datafusion_common:: utils:: SingleRowListArrayBuilder ;
6870
6971make_udaf_expr_and_func ! (
7072 Count ,
@@ -774,49 +776,6 @@ impl DistinctCountGroupsAccumulator {
774776 . resize_with ( total_num_groups, HashSet :: default) ;
775777 }
776778 }
777-
778- // Helper method to encode sets of distinct values into Arrow arrays
779- fn encode_sets_to_arrays ( & self , start : usize , end : usize ) -> Result < ArrayRef > {
780- let mut arrays: Vec < ArrayRef > = Vec :: with_capacity ( end - start) ;
781-
782- // Create a list array for each group
783- for group_idx in start..end {
784- if group_idx < self . distinct_sets . len ( ) {
785- // Convert the set's values to an array
786- let values: Vec < _ > =
787- self . distinct_sets [ group_idx] . iter ( ) . cloned ( ) . collect ( ) ;
788- let values_array = ScalarValue :: iter_to_array ( values) ?;
789-
790- // Wrap as a list array
791- let list_array =
792- SingleRowListArrayBuilder :: new ( values_array) . build_list_array ( ) ;
793- arrays. push ( Arc :: new ( list_array) as _ ) ;
794- } else {
795- // Group doesn't exist, create empty list
796- let empty_array = ScalarValue :: iter_to_array ( Vec :: < ScalarValue > :: new ( ) ) ?;
797- let list_array =
798- SingleRowListArrayBuilder :: new ( empty_array) . build_list_array ( ) ;
799- arrays. push ( Arc :: new ( list_array) as _ ) ;
800- }
801- }
802-
803- // If we have no arrays, create empty list
804- if arrays. is_empty ( ) {
805- let empty_array = ScalarValue :: iter_to_array ( Vec :: < ScalarValue > :: new ( ) ) ?;
806- let list_array =
807- SingleRowListArrayBuilder :: new ( empty_array) . build_list_array ( ) ;
808- return Ok ( Arc :: new ( list_array) ) ;
809- }
810-
811- // If we only have one array, return it
812- if arrays. len ( ) == 1 {
813- return Ok ( arrays[ 0 ] . clone ( ) ) ;
814- }
815-
816- // Otherwise concatenate all arrays - use explicit casting to ensure we have dyn Arrays
817- let array_refs: Vec < & dyn Array > = arrays. iter ( ) . map ( |a| a. as_ref ( ) ) . collect ( ) ;
818- Ok ( compute:: concat ( & array_refs) ?)
819- }
820779}
821780
822781impl GroupsAccumulator for DistinctCountGroupsAccumulator {
@@ -952,21 +911,38 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator {
952911 }
953912
954913 fn state ( & mut self , emit_to : EmitTo ) -> Result < Vec < ArrayRef > > {
955- // Create arrays that hold the distinct values for each group
956- let arrays = match emit_to {
957- EmitTo :: All => {
958- let arrays = self . encode_sets_to_arrays ( 0 , self . distinct_sets . len ( ) ) ?;
959- self . distinct_sets . clear ( ) ;
960- arrays
961- }
962- EmitTo :: First ( n) => {
963- let arrays = self . encode_sets_to_arrays ( 0 , n) ?;
964- self . distinct_sets = self . distinct_sets . split_off ( n) ;
965- arrays
966- }
914+ let distinct_sets: Vec < HashSet < ScalarValue , RandomState > > =
915+ emit_to. take_needed ( & mut self . distinct_sets ) ;
916+
917+ let mut offsets = Vec :: with_capacity ( distinct_sets. len ( ) + 1 ) ;
918+ offsets. push ( 0 ) ;
919+ let mut curr_len = 0i32 ;
920+
921+ let mut value_iter = distinct_sets
922+ . into_iter ( )
923+ . flat_map ( |set| {
924+ // build offset
925+ curr_len += set. len ( ) as i32 ;
926+ offsets. push ( curr_len) ;
927+ // convert into iter
928+ set. into_iter ( )
929+ } )
930+ . peekable ( ) ;
931+ let data_array: ArrayRef = if value_iter. peek ( ) . is_none ( ) {
932+ Arc :: new ( NullArray :: new ( 0 ) ) as _
933+ } else {
934+ Arc :: new ( ScalarValue :: iter_to_array ( value_iter) ?) as _
967935 } ;
936+ let offset_buffer = OffsetBuffer :: new ( ScalarBuffer :: from ( offsets) ) ;
937+
938+ let list_array = ListArray :: new (
939+ Arc :: new ( Field :: new_list_field ( data_array. data_type ( ) . clone ( ) , true ) ) ,
940+ offset_buffer,
941+ data_array,
942+ None ,
943+ ) ;
968944
969- Ok ( vec ! [ arrays ] )
945+ Ok ( vec ! [ Arc :: new ( list_array ) as _ ] )
970946 }
971947
972948 fn convert_to_state (
@@ -976,50 +952,19 @@ impl GroupsAccumulator for DistinctCountGroupsAccumulator {
976952 ) -> Result < Vec < ArrayRef > > {
977953 // For a single distinct value per row, create a list array with that value
978954 assert_eq ! ( values. len( ) , 1 , "COUNT DISTINCT expects a single argument" ) ;
979- let array = & values[ 0 ] ;
980- let num_rows = array. len ( ) ;
981-
982- // Create list entries for all rows in a batch efficiently
983- let mut values_vec = Vec :: with_capacity ( num_rows) ;
984- let mut offsets = Vec :: with_capacity ( num_rows + 1 ) ;
985- offsets. push ( 0 ) ;
986-
987- // Track which rows will have values (non-null, passes filter)
988- let mut has_value = vec ! [ false ; num_rows] ;
989- let mut total_values = 0 ;
990-
991- // First pass: identify valid rows and calculate space needed
992- for i in 0 ..num_rows {
993- if !array. is_null ( i)
994- && opt_filter. map_or ( true , |f| !f. is_null ( i) && f. value ( i) )
995- {
996- has_value[ i] = true ;
997- total_values += 1 ;
998- }
999- offsets. push ( total_values) ;
1000- }
1001-
1002- // Second pass: build values array
1003- for i in 0 ..num_rows {
1004- if has_value[ i] {
1005- values_vec. push ( ScalarValue :: try_from_array ( array, i) ?) ;
1006- }
1007- }
1008-
1009- // Build the values array once for all rows
1010- let values_array = ScalarValue :: iter_to_array ( values_vec) ?;
1011-
1012- // Create the list array with the calculated offsets
1013- let offsets_buffer = arrow:: buffer:: Buffer :: from_slice_ref ( & offsets) ;
1014- let list_data = arrow:: array:: ArrayData :: builder ( DataType :: List ( Arc :: new (
1015- Field :: new ( "item" , values_array. data_type ( ) . clone ( ) , true ) ,
1016- ) ) )
1017- . len ( num_rows)
1018- . add_buffer ( offsets_buffer)
1019- . add_child_data ( values_array. into_data ( ) )
1020- . build ( ) ?;
955+ let values = values[ 0 ] . clone ( ) ;
956+
957+ let offsets =
958+ OffsetBuffer :: new ( ScalarBuffer :: from_iter ( 0 ..values. len ( ) as i32 + 1 ) ) ;
959+ let nulls = filtered_null_mask ( opt_filter, & values) ;
960+ let list_array = ListArray :: new (
961+ Arc :: new ( Field :: new_list_field ( values. data_type ( ) . clone ( ) , true ) ) ,
962+ offsets,
963+ values,
964+ nulls,
965+ ) ;
1021966
1022- Ok ( vec ! [ Arc :: new( ListArray :: from ( list_data ) ) ] )
967+ Ok ( vec ! [ Arc :: new( list_array ) ] )
1023968 }
1024969
1025970 fn supports_convert_to_state ( & self ) -> bool {
@@ -1076,19 +1021,13 @@ mod tests {
10761021
10771022 #[ test]
10781023 fn test_distinct_count_groups_basic ( ) -> Result < ( ) > {
1079- // Create a simple accumulator for Int32 values
10801024 let mut accumulator = DistinctCountGroupsAccumulator :: new ( ) ;
1081-
1082- // Create some test data
10831025 let values = vec ! [ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 1 , 3 , 2 , 1 ] ) ) as ArrayRef ] ;
10841026
1085- // Group indices: we have 3 groups
1027+ // 3 groups
10861028 let group_indices = vec ! [ 0 , 1 , 0 , 2 , 1 , 0 ] ;
1087-
1088- // Update the accumulator
10891029 accumulator. update_batch ( & values, & group_indices, None , 3 ) ?;
10901030
1091- // Evaluate
10921031 let result = accumulator. evaluate ( EmitTo :: All ) ?;
10931032 let counts = result. as_primitive :: < Int64Type > ( ) ;
10941033
@@ -1104,24 +1043,15 @@ mod tests {
11041043
11051044 #[ test]
11061045 fn test_distinct_count_groups_with_filter ( ) -> Result < ( ) > {
1107- // Create a simple accumulator for string values
11081046 let mut accumulator = DistinctCountGroupsAccumulator :: new ( ) ;
1109-
1110- // Create some test data
11111047 let values = vec ! [
11121048 Arc :: new( StringArray :: from( vec![ "a" , "b" , "a" , "c" , "b" , "d" ] ) ) as ArrayRef ,
11131049 ] ;
1114-
1115- // Group indices: we have 2 groups
1050+ // 2 groups
11161051 let group_indices = vec ! [ 0 , 0 , 0 , 1 , 1 , 1 ] ;
1117-
1118- // Filter: include only some rows
11191052 let filter = BooleanArray :: from ( vec ! [ true , true , false , true , false , true ] ) ;
1120-
1121- // Update the accumulator
11221053 accumulator. update_batch ( & values, & group_indices, Some ( & filter) , 2 ) ?;
11231054
1124- // Evaluate
11251055 let result = accumulator. evaluate ( EmitTo :: All ) ?;
11261056 let counts = result. as_primitive :: < Int64Type > ( ) ;
11271057
0 commit comments