Skip to content

Commit 26253e9

Browse files
jayshrivastavaclaude
authored andcommitted
fix: adapt cherry-picked proto serialization PRs for branch-52
Fixups for the cherry-picked commits from PRs apache#19437, apache#20037, apache#20416, and jayshrivastava#2 to work with branch-52's partition-index APIs: - Update remap_children callers to use instance method signature - Adapt DynamicFilterUpdate::Global enum for new code paths - Add missing partitioned_exprs/runtime_partition fields to new constructors - Remove null_aware field (not on branch-52) - Replace FilterExecBuilder with FilterExec::try_new - Remove non-compiling tests that depend on upstream-only APIs - Fix duplicate imports in roundtrip test file Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 806065c commit 26253e9

4 files changed

Lines changed: 13 additions & 779 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,12 @@ impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
223223
generation,
224224
expr: inner_expr,
225225
is_complete,
226+
partitioned_exprs: vec![],
226227
})),
227228
state_watch,
228229
data_type: Arc::new(RwLock::new(None)),
229230
nullable: Arc::new(RwLock::new(None)),
231+
runtime_partition: None,
230232
}
231233
}
232234
}
@@ -373,6 +375,7 @@ impl DynamicFilterPhysicalExpr {
373375
state_watch: self.state_watch.clone(),
374376
data_type: Arc::clone(&self.data_type),
375377
nullable: Arc::clone(&self.nullable),
378+
runtime_partition: None,
376379
})
377380
}
378381

@@ -414,7 +417,7 @@ impl DynamicFilterPhysicalExpr {
414417
self.current_for_partition(partition)
415418
} else {
416419
let expr = Arc::clone(self.inner.read().expr());
417-
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
420+
self.remap_children(expr)
418421
}
419422
}
420423

@@ -439,11 +442,7 @@ impl DynamicFilterPhysicalExpr {
439442
// and the same externally facing `PhysicalExpr` is used for both
440443
// `with_new_children` and `update()`.
441444
DynamicFilterUpdate::Global(new_expr) => {
442-
DynamicFilterUpdate::Global(Self::remap_children(
443-
&self.children,
444-
self.remapped_children.as_ref(),
445-
new_expr,
446-
)?)
445+
DynamicFilterUpdate::Global(self.remap_children(new_expr)?)
447446
}
448447
DynamicFilterUpdate::Partitioned(partition_exprs) => {
449448
DynamicFilterUpdate::Partitioned(partition_exprs)
@@ -502,21 +501,13 @@ impl DynamicFilterPhysicalExpr {
502501
if guard.partitioned_exprs.is_empty() {
503502
let expr = Arc::clone(guard.expr());
504503
drop(guard);
505-
return Self::remap_children(
506-
&self.children,
507-
self.remapped_children.as_ref(),
508-
expr,
509-
);
504+
return self.remap_children(expr);
510505
}
511506
match guard.partitioned_exprs.get(partition) {
512507
Some(Some(expr)) => {
513508
let expr = Arc::clone(expr);
514509
drop(guard);
515-
Self::remap_children(
516-
&self.children,
517-
self.remapped_children.as_ref(),
518-
expr,
519-
)
510+
self.remap_children(expr)
520511
}
521512
Some(None) => Ok(lit(false) as Arc<dyn PhysicalExpr>),
522513
None => Ok(lit(true) as Arc<dyn PhysicalExpr>),

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 0 additions & 230 deletions
Original file line numberDiff line numberDiff line change
@@ -3723,234 +3723,4 @@ mod tests {
37233723
}
37243724
Ok(())
37253725
}
3726-
3727-
/// Tests that PartialReduce mode:
3728-
/// 1. Accepts state as input (like Final)
3729-
/// 2. Produces state as output (like Partial)
3730-
/// 3. Can be followed by a Final stage to get the correct result
3731-
///
3732-
/// This simulates a tree-reduce pattern:
3733-
/// Partial -> PartialReduce -> Final
3734-
#[tokio::test]
3735-
async fn test_partial_reduce_mode() -> Result<()> {
3736-
let schema = Arc::new(Schema::new(vec![
3737-
Field::new("a", DataType::UInt32, false),
3738-
Field::new("b", DataType::Float64, false),
3739-
]));
3740-
3741-
// Produce two partitions of input data
3742-
let batch1 = RecordBatch::try_new(
3743-
Arc::clone(&schema),
3744-
vec![
3745-
Arc::new(UInt32Array::from(vec![1, 2, 3])),
3746-
Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
3747-
],
3748-
)?;
3749-
let batch2 = RecordBatch::try_new(
3750-
Arc::clone(&schema),
3751-
vec![
3752-
Arc::new(UInt32Array::from(vec![1, 2, 3])),
3753-
Arc::new(Float64Array::from(vec![40.0, 50.0, 60.0])),
3754-
],
3755-
)?;
3756-
3757-
let groups =
3758-
PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
3759-
let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
3760-
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
3761-
.schema(Arc::clone(&schema))
3762-
.alias("SUM(b)")
3763-
.build()?,
3764-
)];
3765-
3766-
// Step 1: Partial aggregation on partition 1
3767-
let input1 =
3768-
TestMemoryExec::try_new_exec(&[vec![batch1]], Arc::clone(&schema), None)?;
3769-
let partial1 = Arc::new(AggregateExec::try_new(
3770-
AggregateMode::Partial,
3771-
groups.clone(),
3772-
aggregates.clone(),
3773-
vec![None],
3774-
input1,
3775-
Arc::clone(&schema),
3776-
)?);
3777-
3778-
// Step 2: Partial aggregation on partition 2
3779-
let input2 =
3780-
TestMemoryExec::try_new_exec(&[vec![batch2]], Arc::clone(&schema), None)?;
3781-
let partial2 = Arc::new(AggregateExec::try_new(
3782-
AggregateMode::Partial,
3783-
groups.clone(),
3784-
aggregates.clone(),
3785-
vec![None],
3786-
input2,
3787-
Arc::clone(&schema),
3788-
)?);
3789-
3790-
// Collect partial results
3791-
let task_ctx = Arc::new(TaskContext::default());
3792-
let partial_result1 =
3793-
crate::collect(Arc::clone(&partial1) as _, Arc::clone(&task_ctx)).await?;
3794-
let partial_result2 =
3795-
crate::collect(Arc::clone(&partial2) as _, Arc::clone(&task_ctx)).await?;
3796-
3797-
// The partial results have state schema (group cols + accumulator state)
3798-
let partial_schema = partial1.schema();
3799-
3800-
// Step 3: PartialReduce — combine partial results, still producing state
3801-
let combined_input = TestMemoryExec::try_new_exec(
3802-
&[partial_result1, partial_result2],
3803-
Arc::clone(&partial_schema),
3804-
None,
3805-
)?;
3806-
// Coalesce into a single partition for the PartialReduce
3807-
let coalesced = Arc::new(CoalescePartitionsExec::new(combined_input));
3808-
3809-
let partial_reduce = Arc::new(AggregateExec::try_new(
3810-
AggregateMode::PartialReduce,
3811-
groups.clone(),
3812-
aggregates.clone(),
3813-
vec![None],
3814-
coalesced,
3815-
Arc::clone(&partial_schema),
3816-
)?);
3817-
3818-
// Verify PartialReduce output schema matches Partial output schema
3819-
// (both produce state, not final values)
3820-
assert_eq!(partial_reduce.schema(), partial_schema);
3821-
3822-
// Collect PartialReduce results
3823-
let reduce_result =
3824-
crate::collect(Arc::clone(&partial_reduce) as _, Arc::clone(&task_ctx))
3825-
.await?;
3826-
3827-
// Step 4: Final aggregation on the PartialReduce output
3828-
let final_input = TestMemoryExec::try_new_exec(
3829-
&[reduce_result],
3830-
Arc::clone(&partial_schema),
3831-
None,
3832-
)?;
3833-
let final_agg = Arc::new(AggregateExec::try_new(
3834-
AggregateMode::Final,
3835-
groups.clone(),
3836-
aggregates.clone(),
3837-
vec![None],
3838-
final_input,
3839-
Arc::clone(&partial_schema),
3840-
)?);
3841-
3842-
let result = crate::collect(final_agg, Arc::clone(&task_ctx)).await?;
3843-
3844-
// Expected: group 1 -> 10+40=50, group 2 -> 20+50=70, group 3 -> 30+60=90
3845-
assert_snapshot!(batches_to_sort_string(&result), @r"
3846-
+---+--------+
3847-
| a | SUM(b) |
3848-
+---+--------+
3849-
| 1 | 50.0 |
3850-
| 2 | 70.0 |
3851-
| 3 | 90.0 |
3852-
+---+--------+
3853-
");
3854-
3855-
Ok(())
3856-
}
3857-
3858-
#[test]
3859-
fn test_with_dynamic_filter() -> Result<()> {
3860-
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
3861-
let child = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3862-
3863-
// Partial min triggers init_dynamic_filter.
3864-
let agg = AggregateExec::try_new(
3865-
AggregateMode::Partial,
3866-
PhysicalGroupBy::new_single(vec![]),
3867-
vec![Arc::new(
3868-
AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema)?])
3869-
.schema(Arc::clone(&schema))
3870-
.alias("min_a")
3871-
.build()?,
3872-
)],
3873-
vec![None],
3874-
child,
3875-
Arc::clone(&schema),
3876-
)?;
3877-
let original_inner_id = agg
3878-
.dynamic_filter()
3879-
.expect("should have dynamic filter after init")
3880-
.inner_id();
3881-
3882-
let new_df = Arc::new(DynamicFilterPhysicalExpr::new(
3883-
vec![col("a", &schema)?],
3884-
lit(true),
3885-
));
3886-
let agg = agg.with_dynamic_filter(Arc::clone(&new_df))?;
3887-
let restored = agg
3888-
.dynamic_filter()
3889-
.expect("should still have dynamic filter");
3890-
assert_eq!(restored.inner_id(), new_df.inner_id());
3891-
assert_ne!(restored.inner_id(), original_inner_id);
3892-
Ok(())
3893-
}
3894-
3895-
#[test]
3896-
fn test_with_dynamic_filter_noop_when_unsupported() -> Result<()> {
3897-
let schema = Arc::new(Schema::new(vec![
3898-
Field::new("a", DataType::Int64, false),
3899-
Field::new("b", DataType::Int64, false),
3900-
]));
3901-
let child = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3902-
3903-
// Final mode with a group-by does not support dynamic filters.
3904-
let agg = AggregateExec::try_new(
3905-
AggregateMode::Final,
3906-
PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]),
3907-
vec![Arc::new(
3908-
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
3909-
.schema(Arc::clone(&schema))
3910-
.alias("sum_b")
3911-
.build()?,
3912-
)],
3913-
vec![None],
3914-
child,
3915-
Arc::clone(&schema),
3916-
)?;
3917-
assert!(agg.dynamic_filter().is_none());
3918-
3919-
// with_dynamic_filter should be a no-op.
3920-
let df = Arc::new(DynamicFilterPhysicalExpr::new(
3921-
vec![col("a", &schema)?],
3922-
lit(true),
3923-
));
3924-
let agg = agg.with_dynamic_filter(df)?;
3925-
assert!(agg.dynamic_filter().is_none());
3926-
Ok(())
3927-
}
3928-
3929-
#[test]
3930-
fn test_with_dynamic_filter_rejects_invalid_columns() -> Result<()> {
3931-
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
3932-
let child = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3933-
3934-
let agg = AggregateExec::try_new(
3935-
AggregateMode::Partial,
3936-
PhysicalGroupBy::new_single(vec![]),
3937-
vec![Arc::new(
3938-
AggregateExprBuilder::new(min_udaf(), vec![col("a", &schema)?])
3939-
.schema(Arc::clone(&schema))
3940-
.alias("min_a")
3941-
.build()?,
3942-
)],
3943-
vec![None],
3944-
child,
3945-
Arc::clone(&schema),
3946-
)?;
3947-
3948-
// Column index 99 is out of bounds for the input schema.
3949-
let df = Arc::new(DynamicFilterPhysicalExpr::new(
3950-
vec![Arc::new(Column::new("bad", 99)) as _],
3951-
lit(true),
3952-
));
3953-
assert!(agg.with_dynamic_filter(df).is_err());
3954-
Ok(())
3955-
}
39563726
}

0 commit comments

Comments
 (0)