diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index af0dd436fda47..040064d7c89a9 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -153,8 +153,7 @@ impl Display for DataFusionError { write!(f, "This feature is not implemented: {}", desc) } DataFusionError::Internal(ref desc) => { - write!(f, "Internal error: {}. This was likely caused by a bug in DataFusion's \ - code and we would welcome that you file an bug report in our issue tracker", desc) + write!(f, "Internal error: {}", desc) } DataFusionError::Plan(ref desc) => { write!(f, "Error during planning: {}", desc) diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 686a77c4273f1..bff5f8ac059f3 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -27,7 +27,10 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics}; use arrow::record_batch::RecordBatch; -use arrow::{array::Array, error::Result as ArrowResult}; +use arrow::{ + array::Array, + error::{ArrowError, Result as ArrowResult}, +}; use arrow::{compute::take, datatypes::SchemaRef}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -369,9 +372,8 @@ impl RepartitionExec { .columns() .iter() .map(|c| { - take(c.as_ref(), &indices, None).map_err(|e| { - DataFusionError::Execution(e.to_string()) - }) + take(c.as_ref(), &indices, None) + .map_err(DataFusionError::ArrowError) }) .collect::>>>()?; let output_batch = @@ -426,9 +428,33 @@ impl RepartitionExec { } // Error from running input task Ok(Err(e)) => { + // try to unwrap nested errors + let mut err = &e; + let mut message = None; + // limit the number of unwraps to avoid potential infinite/deep loops + for _ in 0..100 { + if let DataFusionError::External(ext_err) = err { + message = Some(ext_err.to_string()); + break; + } + let DataFusionError::ArrowError(arrow_err) = err else { + message = Some(err.to_string()); + break; + }; + let ArrowError::ExternalError(ext_err) = arrow_err else { + message = Some(arrow_err.to_string()); + break; + }; + let Some(df_err) = ext_err.downcast_ref::() else { + message = Some(ext_err.to_string()); + break; + }; + err = df_err; + } + let message = message.unwrap_or_else(|| err.to_string()); for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = DataFusionError::Execution(e.to_string()); + let err = DataFusionError::Execution(message.clone()); let err = Err(err.into()); tx.send(Some(err)).ok(); }