Skip to content

Commit 39ee59a

Browse files
alambnvartolomeitustvold
authored
Fix panic propagation in CoalescePartitions, consolidates panic propagation into RecordBatchReceiverStream (#6507)
* Propagate panics Another try for fixing #3104. RepartitionExec might need a similar fix. * avoid allocation by pinning on the stack instead * Consolidate panic propagation into RecordBatchReceiverStream * Update docs / cleanup/ * Apply suggestions from code review Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * rename to be consistent and not deal with English pecularities * Add a test and comments * write test for drop cancel * Add test fpr not driving to completion * Do not drive all streams to error * terminate early on panic * tweak comments * tweak comments * use futures::stream --------- Co-authored-by: Nicolae Vartolomei <nv@nvartolomei.com> Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
1 parent 9cb6161 commit 39ee59a

8 files changed

Lines changed: 610 additions & 231 deletions

File tree

datafusion/core/src/physical_plan/analyze.rs

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ use crate::{
2929
};
3030
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
3131
use futures::StreamExt;
32-
use tokio::task::JoinSet;
3332

3433
use super::expressions::PhysicalSortExpr;
35-
use super::stream::RecordBatchStreamAdapter;
34+
use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
3635
use super::{Distribution, SendableRecordBatchStream};
3736
use datafusion_execution::TaskContext;
3837

@@ -121,23 +120,15 @@ impl ExecutionPlan for AnalyzeExec {
121120
// Gather futures that will run each input partition in
122121
// parallel (on a separate tokio task) using a JoinSet to
123122
// cancel outstanding futures on drop
124-
let mut set = JoinSet::new();
125123
let num_input_partitions = self.input.output_partitioning().partition_count();
124+
let mut builder =
125+
RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
126126

127127
for input_partition in 0..num_input_partitions {
128-
let input_stream = self.input.execute(input_partition, context.clone());
129-
130-
set.spawn(async move {
131-
let mut total_rows = 0;
132-
let mut input_stream = input_stream?;
133-
while let Some(batch) = input_stream.next().await {
134-
let batch = batch?;
135-
total_rows += batch.num_rows();
136-
}
137-
Ok(total_rows) as Result<usize>
138-
});
128+
builder.run_input(self.input.clone(), input_partition, context.clone());
139129
}
140130

131+
// Create future that computes thefinal output
141132
let start = Instant::now();
142133
let captured_input = self.input.clone();
143134
let captured_schema = self.schema.clone();
@@ -146,18 +137,11 @@ impl ExecutionPlan for AnalyzeExec {
146137
// future that gathers the results from all the tasks in the
147138
// JoinSet that computes the overall row count and final
148139
// record batch
140+
let mut input_stream = builder.build();
149141
let output = async move {
150142
let mut total_rows = 0;
151-
while let Some(res) = set.join_next().await {
152-
// translate join errors (aka task panic's) into ExecutionErrors
153-
match res {
154-
Ok(row_count) => total_rows += row_count?,
155-
Err(e) => {
156-
return Err(DataFusionError::Execution(format!(
157-
"Join error in AnalyzeExec: {e}"
158-
)))
159-
}
160-
}
143+
while let Some(batch) = input_stream.next().await.transpose()? {
144+
total_rows += batch.num_rows();
161145
}
162146

163147
let duration = Instant::now() - start;

datafusion/core/src/physical_plan/coalesce_partitions.rs

Lines changed: 25 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,19 @@
2020
2121
use std::any::Any;
2222
use std::sync::Arc;
23-
use std::task::Poll;
24-
25-
use futures::Stream;
26-
use tokio::sync::mpsc;
2723

2824
use arrow::datatypes::SchemaRef;
29-
use arrow::record_batch::RecordBatch;
3025

31-
use super::common::AbortOnDropMany;
3226
use super::expressions::PhysicalSortExpr;
3327
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34-
use super::{RecordBatchStream, Statistics};
28+
use super::stream::{ObservedStream, RecordBatchReceiverStream};
29+
use super::Statistics;
3530
use crate::error::{DataFusionError, Result};
3631
use crate::physical_plan::{
3732
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
3833
};
3934

4035
use super::SendableRecordBatchStream;
41-
use crate::physical_plan::common::spawn_execution;
4236
use datafusion_execution::TaskContext;
4337

4438
/// Merge execution plan executes partitions in parallel and combines them into a single
@@ -137,27 +131,17 @@ impl ExecutionPlan for CoalescePartitionsExec {
137131
// use a stream that allows each sender to put in at
138132
// least one result in an attempt to maximize
139133
// parallelism.
140-
let (sender, receiver) =
141-
mpsc::channel::<Result<RecordBatch>>(input_partitions);
134+
let mut builder =
135+
RecordBatchReceiverStream::builder(self.schema(), input_partitions);
142136

143137
// spawn independent tasks whose resulting streams (of batches)
144138
// are sent to the channel for consumption.
145-
let mut join_handles = Vec::with_capacity(input_partitions);
146139
for part_i in 0..input_partitions {
147-
join_handles.push(spawn_execution(
148-
self.input.clone(),
149-
sender.clone(),
150-
part_i,
151-
context.clone(),
152-
));
140+
builder.run_input(self.input.clone(), part_i, context.clone());
153141
}
154142

155-
Ok(Box::pin(MergeStream {
156-
input: receiver,
157-
schema: self.schema(),
158-
baseline_metrics,
159-
drop_helper: AbortOnDropMany(join_handles),
160-
}))
143+
let stream = builder.build();
144+
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
161145
}
162146
}
163147
}
@@ -183,32 +167,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
183167
}
184168
}
185169

186-
struct MergeStream {
187-
schema: SchemaRef,
188-
input: mpsc::Receiver<Result<RecordBatch>>,
189-
baseline_metrics: BaselineMetrics,
190-
#[allow(unused)]
191-
drop_helper: AbortOnDropMany<()>,
192-
}
193-
194-
impl Stream for MergeStream {
195-
type Item = Result<RecordBatch>;
196-
197-
fn poll_next(
198-
mut self: std::pin::Pin<&mut Self>,
199-
cx: &mut std::task::Context<'_>,
200-
) -> Poll<Option<Self::Item>> {
201-
let poll = self.input.poll_recv(cx);
202-
self.baseline_metrics.record_poll(poll)
203-
}
204-
}
205-
206-
impl RecordBatchStream for MergeStream {
207-
fn schema(&self) -> SchemaRef {
208-
self.schema.clone()
209-
}
210-
}
211-
212170
#[cfg(test)]
213171
mod tests {
214172

@@ -218,7 +176,9 @@ mod tests {
218176
use super::*;
219177
use crate::physical_plan::{collect, common};
220178
use crate::prelude::SessionContext;
221-
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
179+
use crate::test::exec::{
180+
assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
181+
};
222182
use crate::test::{self, assert_is_pending};
223183

224184
#[tokio::test]
@@ -270,4 +230,19 @@ mod tests {
270230

271231
Ok(())
272232
}
233+
234+
#[tokio::test]
235+
#[should_panic(expected = "PanickingStream did panic")]
236+
async fn test_panic() {
237+
let session_ctx = SessionContext::new();
238+
let task_ctx = session_ctx.task_ctx();
239+
let schema =
240+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
241+
242+
let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
243+
let coalesce_partitions_exec =
244+
Arc::new(CoalescePartitionsExec::new(panicking_exec));
245+
246+
collect(coalesce_partitions_exec, task_ctx).await.unwrap();
247+
}
273248
}

datafusion/core/src/physical_plan/common.rs

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,20 @@ use super::SendableRecordBatchStream;
2121
use crate::error::{DataFusionError, Result};
2222
use crate::execution::memory_pool::MemoryReservation;
2323
use crate::physical_plan::stream::RecordBatchReceiverStream;
24-
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
24+
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
2525
use arrow::datatypes::Schema;
2626
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
2727
use arrow::record_batch::RecordBatch;
28-
use datafusion_execution::TaskContext;
2928
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
3029
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
3130
use futures::{Future, StreamExt, TryStreamExt};
32-
use log::debug;
3331
use parking_lot::Mutex;
3432
use pin_project_lite::pin_project;
3533
use std::fs;
3634
use std::fs::{metadata, File};
3735
use std::path::{Path, PathBuf};
3836
use std::sync::Arc;
3937
use std::task::{Context, Poll};
40-
use tokio::sync::mpsc;
4138
use tokio::task::JoinHandle;
4239

4340
/// [`MemoryReservation`] used across query execution streams
@@ -96,65 +93,30 @@ fn build_file_list_recurse(
9693
Ok(())
9794
}
9895

99-
/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
100-
pub(crate) fn spawn_execution(
101-
input: Arc<dyn ExecutionPlan>,
102-
output: mpsc::Sender<Result<RecordBatch>>,
103-
partition: usize,
104-
context: Arc<TaskContext>,
105-
) -> JoinHandle<()> {
106-
tokio::spawn(async move {
107-
let mut stream = match input.execute(partition, context) {
108-
Err(e) => {
109-
// If send fails, plan being torn down,
110-
// there is no place to send the error.
111-
output.send(Err(e)).await.ok();
112-
debug!(
113-
"Stopping execution: error executing input: {}",
114-
displayable(input.as_ref()).one_line()
115-
);
116-
return;
117-
}
118-
Ok(stream) => stream,
119-
};
120-
121-
while let Some(item) = stream.next().await {
122-
// If send fails, plan being torn down,
123-
// there is no place to send the error.
124-
if output.send(item).await.is_err() {
125-
debug!(
126-
"Stopping execution: output is gone, plan cancelling: {}",
127-
displayable(input.as_ref()).one_line()
128-
);
129-
return;
130-
}
131-
}
132-
})
133-
}
134-
13596
/// If running in a tokio context spawns the execution of `stream` to a separate task
13697
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
13798
pub(crate) fn spawn_buffered(
13899
mut input: SendableRecordBatchStream,
139100
buffer: usize,
140101
) -> SendableRecordBatchStream {
141102
// Use tokio only if running from a tokio context (#2201)
142-
let handle = match tokio::runtime::Handle::try_current() {
143-
Ok(handle) => handle,
144-
Err(_) => return input,
103+
if tokio::runtime::Handle::try_current().is_err() {
104+
return input;
145105
};
146106

147-
let schema = input.schema();
148-
let (sender, receiver) = mpsc::channel(buffer);
149-
let join = handle.spawn(async move {
107+
let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);
108+
109+
let sender = builder.tx();
110+
111+
builder.spawn(async move {
150112
while let Some(item) = input.next().await {
151113
if sender.send(item).await.is_err() {
152114
return;
153115
}
154116
}
155117
});
156118

157-
RecordBatchReceiverStream::create(&schema, receiver, join)
119+
builder.build()
158120
}
159121

160122
/// Computes the statistics for an in-memory RecordBatch

datafusion/core/src/physical_plan/sorts/sort.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use std::io::BufReader;
5252
use std::path::{Path, PathBuf};
5353
use std::sync::Arc;
5454
use tempfile::NamedTempFile;
55-
use tokio::sync::mpsc::{Receiver, Sender};
55+
use tokio::sync::mpsc::Sender;
5656
use tokio::task;
5757

5858
struct ExternalSorterMetrics {
@@ -373,18 +373,16 @@ fn read_spill_as_stream(
373373
path: NamedTempFile,
374374
schema: SchemaRef,
375375
) -> Result<SendableRecordBatchStream> {
376-
let (sender, receiver): (Sender<Result<RecordBatch>>, Receiver<Result<RecordBatch>>) =
377-
tokio::sync::mpsc::channel(2);
378-
let join_handle = task::spawn_blocking(move || {
376+
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
377+
let sender = builder.tx();
378+
379+
builder.spawn_blocking(move || {
379380
if let Err(e) = read_spill(sender, path.path()) {
380381
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
381382
}
382383
});
383-
Ok(RecordBatchReceiverStream::create(
384-
&schema,
385-
receiver,
386-
join_handle,
387-
))
384+
385+
Ok(builder.build())
388386
}
389387

390388
fn write_sorted(

datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -792,21 +792,20 @@ mod tests {
792792
let mut streams = Vec::with_capacity(partition_count);
793793

794794
for partition in 0..partition_count {
795-
let (sender, receiver) = tokio::sync::mpsc::channel(1);
795+
let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 1);
796+
797+
let sender = builder.tx();
798+
796799
let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
797-
let join_handle = tokio::spawn(async move {
800+
builder.spawn(async move {
798801
while let Some(batch) = stream.next().await {
799802
sender.send(batch).await.unwrap();
800803
// This causes the MergeStream to wait for more input
801804
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
802805
}
803806
});
804807

805-
streams.push(RecordBatchReceiverStream::create(
806-
&schema,
807-
receiver,
808-
join_handle,
809-
));
808+
streams.push(builder.build());
810809
}
811810

812811
let metrics = ExecutionPlanMetricsSet::new();

0 commit comments

Comments
 (0)