1818//! Defines the External shuffle repartition plan.
1919
2020use crate :: execution:: shuffle:: metrics:: ShufflePartitionerMetrics ;
21+ use crate :: execution:: shuffle:: writers:: { BufBatchWriter , PartitionWriter } ;
2122use crate :: execution:: shuffle:: { CometPartitioning , CompressionCodec , ShuffleBlockWriter } ;
2223use crate :: execution:: tracing:: { with_trace, with_trace_async} ;
2324use arrow:: compute:: interleave_record_batch;
2425use async_trait:: async_trait;
26+ use datafusion:: common:: exec_datafusion_err;
2527use datafusion:: common:: utils:: proxy:: VecAllocExt ;
2628use datafusion:: physical_expr:: { EquivalenceProperties , Partitioning } ;
2729use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
@@ -31,7 +33,6 @@ use datafusion::{
3133 error:: { DataFusionError , Result } ,
3234 execution:: {
3335 context:: TaskContext ,
34- disk_manager:: RefCountedTempFile ,
3536 memory_pool:: { MemoryConsumer , MemoryReservation } ,
3637 runtime_env:: RuntimeEnv ,
3738 } ,
@@ -45,8 +46,6 @@ use datafusion::{
4546use datafusion_comet_spark_expr:: hash_funcs:: murmur3:: create_murmur3_hashes;
4647use futures:: { StreamExt , TryFutureExt , TryStreamExt } ;
4748use itertools:: Itertools ;
48- use std:: borrow:: Borrow ;
49- use std:: io:: { Cursor , Error , SeekFrom } ;
5049use std:: {
5150 any:: Any ,
5251 fmt,
@@ -256,10 +255,15 @@ async fn external_shuffle(
256255 // into the corresponding partition buffer.
257256 // Otherwise, pull the next batch from the input stream might overwrite the
258257 // current batch in the repartitioner.
259- repartitioner. insert_batch ( batch?) . await ?;
258+ repartitioner
259+ . insert_batch ( batch?)
260+ . await
261+ . map_err ( |err| exec_datafusion_err ! ( "Error inserting batch: {err}" ) ) ?;
260262 }
261263
262- repartitioner. shuffle_write ( ) ?;
264+ repartitioner
265+ . shuffle_write ( )
266+ . map_err ( |err| exec_datafusion_err ! ( "Error in shuffle write: {err}" ) ) ?;
263267
264268 // shuffle writer always has empty output
265269 Ok ( Box :: pin ( EmptyRecordBatchStream :: new ( Arc :: clone ( & schema) ) ) as SendableRecordBatchStream )
@@ -803,11 +807,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
803807
804808 // if we wrote a spill file for this partition then copy the
805809 // contents into the shuffle file
806- if let Some ( spill_data) = self . partition_writers [ i] . spill_file . as_ref ( ) {
807- let mut spill_file =
808- BufReader :: new ( File :: open ( spill_data. temp_file . path ( ) ) . map_err ( to_df_err) ?) ;
810+ if let Some ( spill_path) = self . partition_writers [ i] . path ( ) {
811+ let mut spill_file = BufReader :: new ( File :: open ( spill_path) ?) ;
809812 let mut write_timer = self . metrics . write_time . timer ( ) ;
810- std:: io:: copy ( & mut spill_file, & mut output_data) . map_err ( to_df_err ) ?;
813+ std:: io:: copy ( & mut spill_file, & mut output_data) ?;
811814 write_timer. stop ( ) ;
812815 }
813816
@@ -828,17 +831,15 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
828831 write_timer. stop ( ) ;
829832
830833 // add one extra offset at last to ease partition length computation
831- offsets[ num_output_partitions] = output_data. stream_position ( ) . map_err ( to_df_err ) ?;
834+ offsets[ num_output_partitions] = output_data. stream_position ( ) ?;
832835
833836 let mut write_timer = self . metrics . write_time . timer ( ) ;
834837 let mut output_index =
835838 BufWriter :: new ( File :: create ( index_file) . map_err ( |e| {
836839 DataFusionError :: Execution ( format ! ( "shuffle write error: {e:?}" ) )
837840 } ) ?) ;
838841 for offset in offsets {
839- output_index
840- . write_all ( & ( offset as i64 ) . to_le_bytes ( ) [ ..] )
841- . map_err ( to_df_err) ?;
842+ output_index. write_all ( & ( offset as i64 ) . to_le_bytes ( ) [ ..] ) ?;
842843 }
843844 output_index. flush ( ) ?;
844845 write_timer. stop ( ) ;
@@ -895,8 +896,7 @@ impl SinglePartitionShufflePartitioner {
895896 . write ( true )
896897 . create ( true )
897898 . truncate ( true )
898- . open ( output_data_path)
899- . map_err ( to_df_err) ?;
899+ . open ( output_data_path) ?;
900900
901901 let output_data_writer =
902902 BufBatchWriter :: new ( shuffle_block_writer, output_data_file, write_buffer_size) ;
@@ -1011,15 +1011,9 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
10111011 . open ( self . output_index_path . clone ( ) )
10121012 . map_err ( |e| DataFusionError :: Execution ( format ! ( "shuffle write error: {e:?}" ) ) ) ?;
10131013 let mut index_buf_writer = BufWriter :: new ( index_file) ;
1014- let data_file_length = self
1015- . output_data_writer
1016- . writer
1017- . stream_position ( )
1018- . map_err ( to_df_err) ?;
1014+ let data_file_length = self . output_data_writer . writer_stream_position ( ) ?;
10191015 for offset in [ 0 , data_file_length] {
1020- index_buf_writer
1021- . write_all ( & ( offset as i64 ) . to_le_bytes ( ) [ ..] )
1022- . map_err ( to_df_err) ?;
1016+ index_buf_writer. write_all ( & ( offset as i64 ) . to_le_bytes ( ) [ ..] ) ?;
10231017 }
10241018 index_buf_writer. flush ( ) ?;
10251019
@@ -1031,21 +1025,17 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
10311025 }
10321026}
10331027
1034- fn to_df_err ( e : Error ) -> DataFusionError {
1035- DataFusionError :: Execution ( format ! ( "shuffle write error: {e:?}" ) )
1036- }
1037-
10381028/// A helper struct to produce shuffled batches.
10391029/// This struct takes ownership of the buffered batches and partition indices from the
10401030/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions.
1041- struct PartitionedBatchesProducer {
1031+ pub ( crate ) struct PartitionedBatchesProducer {
10421032 buffered_batches : Vec < RecordBatch > ,
10431033 partition_indices : Vec < Vec < ( u32 , u32 ) > > ,
10441034 batch_size : usize ,
10451035}
10461036
10471037impl PartitionedBatchesProducer {
1048- fn new (
1038+ pub ( crate ) fn new (
10491039 buffered_batches : Vec < RecordBatch > ,
10501040 indices : Vec < Vec < ( u32 , u32 ) > > ,
10511041 batch_size : usize ,
@@ -1066,7 +1056,7 @@ impl PartitionedBatchesProducer {
10661056 }
10671057}
10681058
1069- struct PartitionedBatchIterator < ' a > {
1059+ pub ( crate ) struct PartitionedBatchIterator < ' a > {
10701060 record_batches : Vec < & ' a RecordBatch > ,
10711061 batch_size : usize ,
10721062 indices : Vec < ( usize , usize ) > ,
@@ -1125,141 +1115,6 @@ impl Iterator for PartitionedBatchIterator<'_> {
11251115 }
11261116}
11271117
1128- struct PartitionWriter {
1129- /// Spill file for intermediate shuffle output for this partition. Each spill event
1130- /// will append to this file and the contents will be copied to the shuffle file at
1131- /// the end of processing.
1132- spill_file : Option < SpillFile > ,
1133- /// Writer that performs encoding and compression
1134- shuffle_block_writer : ShuffleBlockWriter ,
1135- }
1136-
1137- struct SpillFile {
1138- temp_file : RefCountedTempFile ,
1139- file : File ,
1140- }
1141-
1142- impl PartitionWriter {
1143- fn try_new ( shuffle_block_writer : ShuffleBlockWriter ) -> Result < Self > {
1144- Ok ( Self {
1145- spill_file : None ,
1146- shuffle_block_writer,
1147- } )
1148- }
1149-
1150- fn spill (
1151- & mut self ,
1152- iter : & mut PartitionedBatchIterator ,
1153- runtime : & RuntimeEnv ,
1154- metrics : & ShufflePartitionerMetrics ,
1155- write_buffer_size : usize ,
1156- ) -> Result < usize > {
1157- if let Some ( batch) = iter. next ( ) {
1158- self . ensure_spill_file_created ( runtime) ?;
1159-
1160- let total_bytes_written = {
1161- let mut buf_batch_writer = BufBatchWriter :: new (
1162- & mut self . shuffle_block_writer ,
1163- & mut self . spill_file . as_mut ( ) . unwrap ( ) . file ,
1164- write_buffer_size,
1165- ) ;
1166- let mut bytes_written =
1167- buf_batch_writer. write ( & batch?, & metrics. encode_time , & metrics. write_time ) ?;
1168- for batch in iter {
1169- let batch = batch?;
1170- bytes_written += buf_batch_writer. write (
1171- & batch,
1172- & metrics. encode_time ,
1173- & metrics. write_time ,
1174- ) ?;
1175- }
1176- buf_batch_writer. flush ( & metrics. write_time ) ?;
1177- bytes_written
1178- } ;
1179-
1180- Ok ( total_bytes_written)
1181- } else {
1182- Ok ( 0 )
1183- }
1184- }
1185-
1186- fn ensure_spill_file_created ( & mut self , runtime : & RuntimeEnv ) -> Result < ( ) > {
1187- if self . spill_file . is_none ( ) {
1188- // Spill file is not yet created, create it
1189- let spill_file = runtime
1190- . disk_manager
1191- . create_tmp_file ( "shuffle writer spill" ) ?;
1192- let spill_data = OpenOptions :: new ( )
1193- . write ( true )
1194- . create ( true )
1195- . truncate ( true )
1196- . open ( spill_file. path ( ) )
1197- . map_err ( |e| {
1198- DataFusionError :: Execution ( format ! ( "Error occurred while spilling {e}" ) )
1199- } ) ?;
1200- self . spill_file = Some ( SpillFile {
1201- temp_file : spill_file,
1202- file : spill_data,
1203- } ) ;
1204- }
1205- Ok ( ( ) )
1206- }
1207- }
1208-
1209- /// Write batches to writer while using a buffer to avoid frequent system calls.
1210- /// The record batches were first written by ShuffleBlockWriter into an internal buffer.
1211- /// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
1212- struct BufBatchWriter < S : Borrow < ShuffleBlockWriter > , W : Write > {
1213- shuffle_block_writer : S ,
1214- writer : W ,
1215- buffer : Vec < u8 > ,
1216- buffer_max_size : usize ,
1217- }
1218-
1219- impl < S : Borrow < ShuffleBlockWriter > , W : Write > BufBatchWriter < S , W > {
1220- fn new ( shuffle_block_writer : S , writer : W , buffer_max_size : usize ) -> Self {
1221- Self {
1222- shuffle_block_writer,
1223- writer,
1224- buffer : vec ! [ ] ,
1225- buffer_max_size,
1226- }
1227- }
1228-
1229- fn write (
1230- & mut self ,
1231- batch : & RecordBatch ,
1232- encode_time : & Time ,
1233- write_time : & Time ,
1234- ) -> Result < usize > {
1235- let mut cursor = Cursor :: new ( & mut self . buffer ) ;
1236- cursor. seek ( SeekFrom :: End ( 0 ) ) ?;
1237- let bytes_written =
1238- self . shuffle_block_writer
1239- . borrow ( )
1240- . write_batch ( batch, & mut cursor, encode_time) ?;
1241- let pos = cursor. position ( ) ;
1242- if pos >= self . buffer_max_size as u64 {
1243- let mut write_timer = write_time. timer ( ) ;
1244- self . writer . write_all ( & self . buffer ) ?;
1245- write_timer. stop ( ) ;
1246- self . buffer . clear ( ) ;
1247- }
1248- Ok ( bytes_written)
1249- }
1250-
1251- fn flush ( & mut self , write_time : & Time ) -> Result < ( ) > {
1252- let mut write_timer = write_time. timer ( ) ;
1253- if !self . buffer . is_empty ( ) {
1254- self . writer . write_all ( & self . buffer ) ?;
1255- }
1256- self . writer . flush ( ) ?;
1257- write_timer. stop ( ) ;
1258- self . buffer . clear ( ) ;
1259- Ok ( ( ) )
1260- }
1261- }
1262-
12631118fn pmod ( hash : u32 , n : usize ) -> usize {
12641119 let hash = hash as i32 ;
12651120 let n = n as i32 ;
@@ -1371,14 +1226,14 @@ mod test {
13711226
13721227 assert_eq ! ( 2 , repartitioner. partition_writers. len( ) ) ;
13731228
1374- assert ! ( repartitioner. partition_writers[ 0 ] . spill_file . is_none ( ) ) ;
1375- assert ! ( repartitioner. partition_writers[ 1 ] . spill_file . is_none ( ) ) ;
1229+ assert ! ( ! repartitioner. partition_writers[ 0 ] . has_spill_file ( ) ) ;
1230+ assert ! ( ! repartitioner. partition_writers[ 1 ] . has_spill_file ( ) ) ;
13761231
13771232 repartitioner. spill ( ) . unwrap ( ) ;
13781233
13791234 // after spill, there should be spill files
1380- assert ! ( repartitioner. partition_writers[ 0 ] . spill_file . is_some ( ) ) ;
1381- assert ! ( repartitioner. partition_writers[ 1 ] . spill_file . is_some ( ) ) ;
1235+ assert ! ( repartitioner. partition_writers[ 0 ] . has_spill_file ( ) ) ;
1236+ assert ! ( repartitioner. partition_writers[ 1 ] . has_spill_file ( ) ) ;
13821237
13831238 // insert another batch after spilling
13841239 repartitioner. insert_batch ( batch. clone ( ) ) . await . unwrap ( ) ;
0 commit comments