Skip to content

Commit b5d2072

Browse files
committed
augment test with stats
1 parent 149c8ba commit b5d2072

1 file changed

Lines changed: 145 additions & 52 deletions

File tree

  • datafusion/physical-plan/src/aggregates

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 145 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3295,62 +3295,133 @@ mod tests {
32953295

32963296
println!("Testing with {num_groups} groups (UInt32 + String + LargeList keys)");
32973297

3298-
// Helper to run the aggregation with a specific batch size
3299-
// Returns (time_to_first_emission, total_batch_count)
3300-
let run_scenario = |batch_size: usize| {
3301-
let schema = Arc::clone(&schema);
3302-
let batch = batch.clone();
3303-
let group_by = group_by.clone();
3304-
let aggregates = aggregates.clone();
3305-
3306-
async move {
3307-
let input = TestMemoryExec::try_new_exec(
3308-
&[vec![batch]],
3309-
Arc::clone(&schema),
3310-
None,
3311-
)?;
3298+
// Case 1: Chunked emission with detailed timing
3299+
println!("\n=== Chunked emission (batch_size=8192) ===");
3300+
let input_chunked = TestMemoryExec::try_new_exec(
3301+
&[vec![batch.clone()]],
3302+
Arc::clone(&schema),
3303+
None,
3304+
)?;
3305+
let session_config_chunked = SessionConfig::new().with_batch_size(8192);
3306+
let task_ctx_chunked =
3307+
Arc::new(TaskContext::default().with_session_config(session_config_chunked));
3308+
let aggregate_chunked = Arc::new(AggregateExec::try_new(
3309+
AggregateMode::Single,
3310+
group_by.clone(),
3311+
aggregates.clone(),
3312+
vec![None],
3313+
input_chunked,
3314+
Arc::clone(&schema),
3315+
)?);
33123316

3313-
let session_config = SessionConfig::new().with_batch_size(batch_size);
3314-
let task_ctx =
3315-
Arc::new(TaskContext::default().with_session_config(session_config));
3317+
let mut stream_chunked = aggregate_chunked.execute(0, task_ctx_chunked)?;
3318+
let start = Instant::now();
3319+
let mut first_emission = None;
3320+
let mut batch_count = 0;
3321+
let mut prev_batch_time = start;
3322+
let mut poll_times_chunked = Vec::new();
33163323

3317-
let aggregate = Arc::new(AggregateExec::try_new(
3318-
AggregateMode::Single,
3319-
group_by,
3320-
aggregates,
3321-
vec![None],
3322-
input,
3323-
schema,
3324-
)?);
3324+
while let Some(result) = stream_chunked.next().await {
3325+
let batch = result?;
3326+
batch_count += 1;
3327+
let batch_time = prev_batch_time.elapsed();
3328+
poll_times_chunked.push(batch_time);
3329+
3330+
if first_emission.is_none() {
3331+
first_emission = Some(start.elapsed());
3332+
println!(
3333+
"First batch arrived at: {:?} ({} rows)",
3334+
start.elapsed(),
3335+
batch.num_rows()
3336+
);
3337+
}
33253338

3326-
let mut stream = aggregate.execute(0, task_ctx)?;
3327-
let start = Instant::now();
3328-
let mut first_emission = None;
3329-
let mut batch_count = 0;
3339+
prev_batch_time = Instant::now();
3340+
}
33303341

3331-
while let Some(result) = stream.next().await {
3332-
if first_emission.is_none() {
3333-
first_emission = Some(start.elapsed());
3334-
}
3335-
result?;
3336-
batch_count += 1;
3337-
}
3342+
let count_chunked = batch_count;
3343+
let total_chunked = start.elapsed();
3344+
let min_poll_chunked =
3345+
poll_times_chunked.iter().min().copied().unwrap_or_default();
3346+
let max_poll_chunked =
3347+
poll_times_chunked.iter().max().copied().unwrap_or_default();
3348+
let avg_poll_chunked: Duration =
3349+
poll_times_chunked.iter().sum::<Duration>() / poll_times_chunked.len() as u32;
3350+
3351+
println!("Total batches: {count_chunked}");
3352+
println!("Total time: {total_chunked:?}");
3353+
println!("Poll times: min={min_poll_chunked:?}, max={max_poll_chunked:?}, avg={avg_poll_chunked:?}");
3354+
3355+
// Case 2: Blocking emission with detailed timing
3356+
println!("\n=== Blocking emission (batch_size > num_groups) ===");
3357+
let input_blocking = TestMemoryExec::try_new_exec(
3358+
&[vec![batch.clone()]],
3359+
Arc::clone(&schema),
3360+
None,
3361+
)?;
3362+
let session_config_blocking =
3363+
SessionConfig::new().with_batch_size(num_groups as usize + 1000);
3364+
let task_ctx_blocking =
3365+
Arc::new(TaskContext::default().with_session_config(session_config_blocking));
3366+
let aggregate_blocking = Arc::new(AggregateExec::try_new(
3367+
AggregateMode::Single,
3368+
group_by,
3369+
aggregates,
3370+
vec![None],
3371+
input_blocking,
3372+
Arc::clone(&schema),
3373+
)?);
33383374

3339-
Ok::<(Duration, usize), DataFusionError>((
3340-
first_emission.unwrap_or_default(),
3341-
batch_count,
3342-
))
3343-
}
3344-
};
3375+
let mut stream_blocking = aggregate_blocking.execute(0, task_ctx_blocking)?;
3376+
let start = Instant::now();
3377+
let mut count_blocking = 0;
3378+
let mut prev_batch_time = start;
3379+
let mut poll_times_blocking = Vec::new();
3380+
3381+
while let Some(result) = stream_blocking.next().await {
3382+
let batch = result?;
3383+
count_blocking += 1;
3384+
let batch_time = prev_batch_time.elapsed();
3385+
poll_times_blocking.push(batch_time);
3386+
println!(" Batch {count_blocking} arrived at: {:?} ({} rows, batch creation took {batch_time:?})", start.elapsed(), batch.num_rows());
3387+
prev_batch_time = Instant::now();
3388+
}
3389+
3390+
let time_blocking = start.elapsed();
3391+
let min_poll_blocking = poll_times_blocking
3392+
.iter()
3393+
.min()
3394+
.copied()
3395+
.unwrap_or_default();
3396+
let max_poll_blocking = poll_times_blocking
3397+
.iter()
3398+
.max()
3399+
.copied()
3400+
.unwrap_or_default();
3401+
let avg_poll_blocking: Duration = poll_times_blocking.iter().sum::<Duration>()
3402+
/ poll_times_blocking.len() as u32;
3403+
3404+
println!("Total time: {time_blocking:?}");
3405+
println!("Poll times: min={min_poll_blocking:?}, max={max_poll_blocking:?}, avg={avg_poll_blocking:?}");
3406+
3407+
println!("\n=== Summary ===");
3408+
println!("Total execution time:");
3409+
println!(" Chunked: {total_chunked:?}");
3410+
println!(" Blocking: {time_blocking:?}");
3411+
println!(
3412+
" Overhead: {:.2}x with chunked",
3413+
total_chunked.as_secs_f64() / time_blocking.as_secs_f64()
3414+
);
3415+
3416+
println!("\nPoll duration (time between batches):");
3417+
println!(" Chunked: min={min_poll_chunked:?}, max={max_poll_chunked:?}, avg={avg_poll_chunked:?}");
3418+
println!(" Blocking: min={min_poll_blocking:?}, max={max_poll_blocking:?}, avg={avg_poll_blocking:?}");
33453419

3346-
// Case 1: Chunked emission (small batch size)
3347-
let (time_chunked, count_chunked) = run_scenario(1024).await?;
3348-
println!("Chunked emission (1024): {time_chunked:?} ({count_chunked} batches)");
3420+
println!("\nYield behavior:");
3421+
println!(" Chunked: {count_chunked} batches (yields between each)");
3422+
println!(" Blocking: {count_blocking} batch (single long stall)");
33493423

3350-
// Case 2: Blocking emission (large batch size)
3351-
let (time_blocking, count_blocking) =
3352-
run_scenario(num_groups as usize + 1000).await?;
3353-
println!("Blocking emission (all): {time_blocking:?} ({count_blocking} batches)");
3424+
println!("Benefit: max poll reduced from {max_poll_blocking:?} to {max_poll_chunked:?}.");
33543425

33553426
assert!(
33563427
count_chunked > 1,
@@ -3362,9 +3433,31 @@ mod tests {
33623433
);
33633434

33643435
// Example output:
3365-
// Testing with 1000000 groups (UInt32 + String + LargeList keys)
3366-
// Chunked emission (1024): 2.1316265s (977 batches)
3367-
// Blocking emission (all): 2.815402s (1 batches)
3436+
// === Chunked emission (batch_size=8192) ===
3437+
// First batch arrived at: 2.210163709s (8192 rows)
3438+
// Total batches: 123
3439+
// Total time: 2.869591125s
3440+
// Poll times: min=369.209µs, max=2.210162417s, avg=23.324541ms
3441+
3442+
// === Blocking emission (batch_size > num_groups) ===
3443+
// Batch 1 arrived at: 2.877907958s (1000000 rows, batch creation took 2.877906208s)
3444+
// Total time: 2.8790405s
3445+
// Poll times: min=2.877906208s, max=2.877906208s, avg=2.877906208s
3446+
3447+
// === Summary ===
3448+
// Total execution time:
3449+
// Chunked: 2.869591125s
3450+
// Blocking: 2.8790405s
3451+
// Overhead: 1.00x with chunked
3452+
3453+
// Poll duration (time between batches):
3454+
// Chunked: min=369.209µs, max=2.210162417s, avg=23.324541ms
3455+
// Blocking: min=2.877906208s, max=2.877906208s, avg=2.877906208s
3456+
3457+
// Yield behavior:
3458+
// Chunked: 123 batches (yields between each)
3459+
// Blocking: 1 batch (single long stall)
3460+
// Benefit: max poll reduced from 2.877906208s to 2.210162417s.
33683461
Ok(())
33693462
}
33703463
}

0 commit comments

Comments
 (0)