Skip to content

Commit 56a26eb

Browse files
committed
Do not drive all streams to error
1 parent a531afe commit 56a26eb

2 files changed

Lines changed: 32 additions & 29 deletions

File tree

datafusion/core/src/physical_plan/stream.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,26 @@ impl RecordBatchReceiverStreamBuilder {
117117
Ok(stream) => stream,
118118
};
119119

120+
// Transfer batches from inner stream to the output tx
121+
// immediately.
120122
while let Some(item) = stream.next().await {
121-
// If send fails, plan being torn down,
122-
// there is no place to send the error.
123+
let is_err = item.is_err();
124+
125+
// If send fails, plan being torn down, there is no
126+
// place to send the error and no reason to continue.
123127
if output.send(item).await.is_err() {
124128
debug!(
125129
"Stopping execution: output is gone, plan cancelling: {}",
126130
displayable(input.as_ref()).one_line()
127131
);
128132
return;
129133
}
134+
135+
// stop after the first error is encontered (don't
136+
// drive all streams to completion)
137+
if is_err {
138+
return;
139+
}
130140
}
131141
});
132142
}
@@ -332,7 +342,7 @@ mod test {
332342

333343
#[tokio::test]
334344
#[should_panic(expected = "PanickingStream did panic: 1")]
335-
async fn record_batch_receiver_stream_propagates_panics_one() {
345+
async fn record_batch_receiver_stream_propagates_panics_early_shutdown() {
336346
let schema = schema();
337347

338348
// make 2 partitions, second panics before the first
@@ -341,7 +351,12 @@ mod test {
341351
.with_partition_panic(0, 10)
342352
.with_partition_panic(1, 3); // partition 1 should panic first (after 3 )
343353

344-
let max_batches = 5; // expect to read every other batch: (0,1,0,1,0,panic)
354+
// ensure that the panic results in an early shutdown (that
355+
// everything stops after the first panic).
356+
357+
// Since the stream reads every other batch: (0,1,0,1,0,panic)
358+
// so should not exceed 5 batches prior to the panic
359+
let max_batches = 5;
345360
consume(input, max_batches).await
346361
}
347362

@@ -378,10 +393,6 @@ mod test {
378393
let task_ctx = session_ctx.task_ctx();
379394
let schema = schema();
380395

381-
// Make an input that will not proceed
382-
let blocking_input = BlockingExec::new(schema.clone(), 1);
383-
let refs = blocking_input.refs();
384-
385396
// make an input that will error
386397
let error_stream = MockExec::new(
387398
vec![
@@ -392,28 +403,17 @@ mod test {
392403
)
393404
.with_use_task(false);
394405

395-
// Configure a RecordBatchReceiverStream to consume the
396-
// blocking input (which will never advance) and the stream
397-
// that will error.
398-
399406
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
400-
builder.run_input(Arc::new(blocking_input), 0, task_ctx.clone());
401407
builder.run_input(Arc::new(error_stream), 0, task_ctx.clone());
402408
let mut stream = builder.build();
403409

404-
// first input input should be present
405-
assert!(std::sync::Weak::strong_count(&refs) > 0);
406-
407410
// get the first result, which should be an error
408411
let first_batch = stream.next().await.unwrap();
409412
let first_err = first_batch.unwrap_err();
410413
assert_eq!(first_err.to_string(), "Execution error: Test1");
411414

412415
// There should be no more batches produced (should not get the second error)
413416
assert!(stream.next().await.is_none());
414-
415-
// And the other inputs should be cleaned up (even before stream is dropped)
416-
assert_strong_count_converges_to_zero(refs).await;
417417
}
418418

419419
/// Consumes all the input's partitions into a

datafusion/core/src/test/exec.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ impl RecordBatchStream for TestStream {
116116
}
117117
}
118118

119-
/// A Mock ExecutionPlan that can be used for writing tests of other ExecutionPlans
120-
///
119+
/// A Mock ExecutionPlan that can be used for writing tests of other
120+
/// ExecutionPlans
121121
#[derive(Debug)]
122122
pub struct MockExec {
123123
/// the results to send back
@@ -129,15 +129,18 @@ pub struct MockExec {
129129
}
130130

131131
impl MockExec {
132-
/// Create a new exec with a single partition that returns the
133-
/// record batches in this Exec. Note the batches are not produced
134-
/// immediately (the caller has to actually yield and another task
135-
/// must run) to ensure any poll loops are correct.
132+
/// Create a new `MockExec` with a single partition that returns
133+
/// the specified `Results`s.
134+
///
135+
/// By default, the batches are not produced immediately (the
136+
/// caller has to actually yield and another task must run) to
137+
/// ensure any poll loops are correct. This behavior can be
138+
/// changed with `with_use_task`
136139
pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
137140
Self {
138141
data,
139142
schema,
140-
use_task: false,
143+
use_task: true,
141144
}
142145
}
143146

@@ -198,9 +201,9 @@ impl ExecutionPlan for MockExec {
198201

199202
if self.use_task {
200203
let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
201-
// send data in order but in a separate
202-
// thread (to ensure the batches are not available without the
203-
// DelayedStream yielding).
204+
// send data in order but in a separate task (to ensure
205+
// the batches are not available without the stream
206+
// yielding).
204207
let tx = builder.tx();
205208
builder.spawn(async move {
206209
for batch in data {

0 commit comments

Comments
 (0)