@@ -3723,234 +3723,4 @@ mod tests {
37233723 }
37243724 Ok ( ( ) )
37253725 }
3726-
3727- /// Tests that PartialReduce mode:
3728- /// 1. Accepts state as input (like Final)
3729- /// 2. Produces state as output (like Partial)
3730- /// 3. Can be followed by a Final stage to get the correct result
3731- ///
3732- /// This simulates a tree-reduce pattern:
3733- /// Partial -> PartialReduce -> Final
3734- #[ tokio:: test]
3735- async fn test_partial_reduce_mode ( ) -> Result < ( ) > {
3736- let schema = Arc :: new ( Schema :: new ( vec ! [
3737- Field :: new( "a" , DataType :: UInt32 , false ) ,
3738- Field :: new( "b" , DataType :: Float64 , false ) ,
3739- ] ) ) ;
3740-
3741- // Produce two partitions of input data
3742- let batch1 = RecordBatch :: try_new (
3743- Arc :: clone ( & schema) ,
3744- vec ! [
3745- Arc :: new( UInt32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
3746- Arc :: new( Float64Array :: from( vec![ 10.0 , 20.0 , 30.0 ] ) ) ,
3747- ] ,
3748- ) ?;
3749- let batch2 = RecordBatch :: try_new (
3750- Arc :: clone ( & schema) ,
3751- vec ! [
3752- Arc :: new( UInt32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
3753- Arc :: new( Float64Array :: from( vec![ 40.0 , 50.0 , 60.0 ] ) ) ,
3754- ] ,
3755- ) ?;
3756-
3757- let groups =
3758- PhysicalGroupBy :: new_single ( vec ! [ ( col( "a" , & schema) ?, "a" . to_string( ) ) ] ) ;
3759- let aggregates: Vec < Arc < AggregateFunctionExpr > > = vec ! [ Arc :: new(
3760- AggregateExprBuilder :: new( sum_udaf( ) , vec![ col( "b" , & schema) ?] )
3761- . schema( Arc :: clone( & schema) )
3762- . alias( "SUM(b)" )
3763- . build( ) ?,
3764- ) ] ;
3765-
3766- // Step 1: Partial aggregation on partition 1
3767- let input1 =
3768- TestMemoryExec :: try_new_exec ( & [ vec ! [ batch1] ] , Arc :: clone ( & schema) , None ) ?;
3769- let partial1 = Arc :: new ( AggregateExec :: try_new (
3770- AggregateMode :: Partial ,
3771- groups. clone ( ) ,
3772- aggregates. clone ( ) ,
3773- vec ! [ None ] ,
3774- input1,
3775- Arc :: clone ( & schema) ,
3776- ) ?) ;
3777-
3778- // Step 2: Partial aggregation on partition 2
3779- let input2 =
3780- TestMemoryExec :: try_new_exec ( & [ vec ! [ batch2] ] , Arc :: clone ( & schema) , None ) ?;
3781- let partial2 = Arc :: new ( AggregateExec :: try_new (
3782- AggregateMode :: Partial ,
3783- groups. clone ( ) ,
3784- aggregates. clone ( ) ,
3785- vec ! [ None ] ,
3786- input2,
3787- Arc :: clone ( & schema) ,
3788- ) ?) ;
3789-
3790- // Collect partial results
3791- let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
3792- let partial_result1 =
3793- crate :: collect ( Arc :: clone ( & partial1) as _ , Arc :: clone ( & task_ctx) ) . await ?;
3794- let partial_result2 =
3795- crate :: collect ( Arc :: clone ( & partial2) as _ , Arc :: clone ( & task_ctx) ) . await ?;
3796-
3797- // The partial results have state schema (group cols + accumulator state)
3798- let partial_schema = partial1. schema ( ) ;
3799-
3800- // Step 3: PartialReduce — combine partial results, still producing state
3801- let combined_input = TestMemoryExec :: try_new_exec (
3802- & [ partial_result1, partial_result2] ,
3803- Arc :: clone ( & partial_schema) ,
3804- None ,
3805- ) ?;
3806- // Coalesce into a single partition for the PartialReduce
3807- let coalesced = Arc :: new ( CoalescePartitionsExec :: new ( combined_input) ) ;
3808-
3809- let partial_reduce = Arc :: new ( AggregateExec :: try_new (
3810- AggregateMode :: PartialReduce ,
3811- groups. clone ( ) ,
3812- aggregates. clone ( ) ,
3813- vec ! [ None ] ,
3814- coalesced,
3815- Arc :: clone ( & partial_schema) ,
3816- ) ?) ;
3817-
3818- // Verify PartialReduce output schema matches Partial output schema
3819- // (both produce state, not final values)
3820- assert_eq ! ( partial_reduce. schema( ) , partial_schema) ;
3821-
3822- // Collect PartialReduce results
3823- let reduce_result =
3824- crate :: collect ( Arc :: clone ( & partial_reduce) as _ , Arc :: clone ( & task_ctx) )
3825- . await ?;
3826-
3827- // Step 4: Final aggregation on the PartialReduce output
3828- let final_input = TestMemoryExec :: try_new_exec (
3829- & [ reduce_result] ,
3830- Arc :: clone ( & partial_schema) ,
3831- None ,
3832- ) ?;
3833- let final_agg = Arc :: new ( AggregateExec :: try_new (
3834- AggregateMode :: Final ,
3835- groups. clone ( ) ,
3836- aggregates. clone ( ) ,
3837- vec ! [ None ] ,
3838- final_input,
3839- Arc :: clone ( & partial_schema) ,
3840- ) ?) ;
3841-
3842- let result = crate :: collect ( final_agg, Arc :: clone ( & task_ctx) ) . await ?;
3843-
3844- // Expected: group 1 -> 10+40=50, group 2 -> 20+50=70, group 3 -> 30+60=90
3845- assert_snapshot ! ( batches_to_sort_string( & result) , @r"
3846- +---+--------+
3847- | a | SUM(b) |
3848- +---+--------+
3849- | 1 | 50.0 |
3850- | 2 | 70.0 |
3851- | 3 | 90.0 |
3852- +---+--------+
3853- " ) ;
3854-
3855- Ok ( ( ) )
3856- }
3857-
3858- #[ test]
3859- fn test_with_dynamic_filter ( ) -> Result < ( ) > {
3860- let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , false ) ] ) ) ;
3861- let child = Arc :: new ( EmptyExec :: new ( Arc :: clone ( & schema) ) ) ;
3862-
3863- // Partial min triggers init_dynamic_filter.
3864- let agg = AggregateExec :: try_new (
3865- AggregateMode :: Partial ,
3866- PhysicalGroupBy :: new_single ( vec ! [ ] ) ,
3867- vec ! [ Arc :: new(
3868- AggregateExprBuilder :: new( min_udaf( ) , vec![ col( "a" , & schema) ?] )
3869- . schema( Arc :: clone( & schema) )
3870- . alias( "min_a" )
3871- . build( ) ?,
3872- ) ] ,
3873- vec ! [ None ] ,
3874- child,
3875- Arc :: clone ( & schema) ,
3876- ) ?;
3877- let original_inner_id = agg
3878- . dynamic_filter ( )
3879- . expect ( "should have dynamic filter after init" )
3880- . inner_id ( ) ;
3881-
3882- let new_df = Arc :: new ( DynamicFilterPhysicalExpr :: new (
3883- vec ! [ col( "a" , & schema) ?] ,
3884- lit ( true ) ,
3885- ) ) ;
3886- let agg = agg. with_dynamic_filter ( Arc :: clone ( & new_df) ) ?;
3887- let restored = agg
3888- . dynamic_filter ( )
3889- . expect ( "should still have dynamic filter" ) ;
3890- assert_eq ! ( restored. inner_id( ) , new_df. inner_id( ) ) ;
3891- assert_ne ! ( restored. inner_id( ) , original_inner_id) ;
3892- Ok ( ( ) )
3893- }
3894-
3895- #[ test]
3896- fn test_with_dynamic_filter_noop_when_unsupported ( ) -> Result < ( ) > {
3897- let schema = Arc :: new ( Schema :: new ( vec ! [
3898- Field :: new( "a" , DataType :: Int64 , false ) ,
3899- Field :: new( "b" , DataType :: Int64 , false ) ,
3900- ] ) ) ;
3901- let child = Arc :: new ( EmptyExec :: new ( Arc :: clone ( & schema) ) ) ;
3902-
3903- // Final mode with a group-by does not support dynamic filters.
3904- let agg = AggregateExec :: try_new (
3905- AggregateMode :: Final ,
3906- PhysicalGroupBy :: new_single ( vec ! [ ( col( "a" , & schema) ?, "a" . to_string( ) ) ] ) ,
3907- vec ! [ Arc :: new(
3908- AggregateExprBuilder :: new( sum_udaf( ) , vec![ col( "b" , & schema) ?] )
3909- . schema( Arc :: clone( & schema) )
3910- . alias( "sum_b" )
3911- . build( ) ?,
3912- ) ] ,
3913- vec ! [ None ] ,
3914- child,
3915- Arc :: clone ( & schema) ,
3916- ) ?;
3917- assert ! ( agg. dynamic_filter( ) . is_none( ) ) ;
3918-
3919- // with_dynamic_filter should be a no-op.
3920- let df = Arc :: new ( DynamicFilterPhysicalExpr :: new (
3921- vec ! [ col( "a" , & schema) ?] ,
3922- lit ( true ) ,
3923- ) ) ;
3924- let agg = agg. with_dynamic_filter ( df) ?;
3925- assert ! ( agg. dynamic_filter( ) . is_none( ) ) ;
3926- Ok ( ( ) )
3927- }
3928-
3929- #[ test]
3930- fn test_with_dynamic_filter_rejects_invalid_columns ( ) -> Result < ( ) > {
3931- let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , false ) ] ) ) ;
3932- let child = Arc :: new ( EmptyExec :: new ( Arc :: clone ( & schema) ) ) ;
3933-
3934- let agg = AggregateExec :: try_new (
3935- AggregateMode :: Partial ,
3936- PhysicalGroupBy :: new_single ( vec ! [ ] ) ,
3937- vec ! [ Arc :: new(
3938- AggregateExprBuilder :: new( min_udaf( ) , vec![ col( "a" , & schema) ?] )
3939- . schema( Arc :: clone( & schema) )
3940- . alias( "min_a" )
3941- . build( ) ?,
3942- ) ] ,
3943- vec ! [ None ] ,
3944- child,
3945- Arc :: clone ( & schema) ,
3946- ) ?;
3947-
3948- // Column index 99 is out of bounds for the input schema.
3949- let df = Arc :: new ( DynamicFilterPhysicalExpr :: new (
3950- vec ! [ Arc :: new( Column :: new( "bad" , 99 ) ) as _] ,
3951- lit ( true ) ,
3952- ) ) ;
3953- assert ! ( agg. with_dynamic_filter( df) . is_err( ) ) ;
3954- Ok ( ( ) )
3955- }
39563726}
0 commit comments