Skip to content

Commit 15ba62b

Browse files
committed
fix: Unwind panic in spawned threads
Signed-off-by: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com>
1 parent 03e683e commit 15ba62b

2 files changed

Lines changed: 70 additions & 23 deletions

File tree

datafusion/core/src/physical_plan/hash_aggregate.rs

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ use std::vec;
2525
use ahash::RandomState;
2626
use futures::{
2727
stream::{Stream, StreamExt},
28-
Future,
28+
Future, FutureExt,
2929
};
3030

31-
use crate::error::Result;
31+
use crate::error::{DataFusionError, Result};
3232
use crate::physical_plan::hash_utils::create_hashes;
3333
use crate::physical_plan::{
3434
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
@@ -576,16 +576,32 @@ impl GroupedHashAggregateStream {
576576
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
577577

578578
let join_handle = tokio::spawn(async move {
579-
let result = compute_grouped_hash_aggregate(
580-
mode,
581-
schema_clone,
582-
group_expr,
583-
aggr_expr,
584-
input,
585-
elapsed_compute,
586-
)
579+
let result = std::panic::AssertUnwindSafe(async move {
580+
compute_grouped_hash_aggregate(
581+
mode,
582+
schema_clone,
583+
group_expr,
584+
aggr_expr,
585+
input,
586+
elapsed_compute,
587+
)
588+
.await
589+
.record_output(&baseline_metrics)
590+
})
591+
.catch_unwind()
587592
.await
588-
.record_output(&baseline_metrics);
593+
.unwrap_or_else(|panic_payload| {
594+
let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
595+
s
596+
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
597+
s.as_str()
598+
} else {
599+
"unknown panic"
600+
};
601+
Err(ArrowError::ExternalError(Box::new(
602+
DataFusionError::Execution(format!("Panic: {}", msg)),
603+
)))
604+
});
589605

590606
// failing here is OK, the receiver is gone and does not care about the result
591607
tx.send(result).ok();
@@ -804,15 +820,31 @@ impl HashAggregateStream {
804820
let schema_clone = schema.clone();
805821
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
806822
let join_handle = tokio::spawn(async move {
807-
let result = compute_hash_aggregate(
808-
mode,
809-
schema_clone,
810-
aggr_expr,
811-
input,
812-
elapsed_compute,
813-
)
823+
let result = std::panic::AssertUnwindSafe(async move {
824+
compute_hash_aggregate(
825+
mode,
826+
schema_clone,
827+
aggr_expr,
828+
input,
829+
elapsed_compute,
830+
)
831+
.await
832+
.record_output(&baseline_metrics)
833+
})
834+
.catch_unwind()
814835
.await
815-
.record_output(&baseline_metrics);
836+
.unwrap_or_else(|panic_payload| {
837+
let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
838+
s
839+
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
840+
s.as_str()
841+
} else {
842+
"unknown panic"
843+
};
844+
Err(ArrowError::ExternalError(Box::new(
845+
DataFusionError::Execution(format!("Panic: {}", msg)),
846+
)))
847+
});
816848

817849
// failing here is OK, the receiver is gone and does not care about the result
818850
tx.send(result).ok();

datafusion/core/src/physical_plan/windows/window_agg_exec.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Stream and channel implementations for window function expressions.
1919
20-
use crate::error::Result;
20+
use crate::error::{DataFusionError, Result};
2121
use crate::execution::context::TaskContext;
2222
use crate::physical_plan::common::AbortOnDropSingle;
2323
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -256,10 +256,25 @@ impl WindowAggStream {
256256
let schema_clone = schema.clone();
257257
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
258258
let join_handle = tokio::spawn(async move {
259-
let schema = schema_clone.clone();
260-
let result =
259+
let result = std::panic::AssertUnwindSafe(async move {
260+
let schema = schema_clone.clone();
261261
WindowAggStream::process(input, window_expr, schema, elapsed_compute)
262-
.await;
262+
.await
263+
})
264+
.catch_unwind()
265+
.await
266+
.unwrap_or_else(|panic_payload| {
267+
let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
268+
s
269+
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
270+
s.as_str()
271+
} else {
272+
"unknown panic"
273+
};
274+
Err(ArrowError::ExternalError(Box::new(
275+
DataFusionError::Execution(format!("Panic: {}", msg)),
276+
)))
277+
});
263278

264279
// failing here is OK, the receiver is gone and does not care about the result
265280
tx.send(result).ok();

0 commit comments

Comments
 (0)