File tree Expand file tree Collapse file tree
datafusion/physical-plan/src/spill Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -101,6 +101,13 @@ impl InProgressSpillFile {
101101 Ok ( ( ) )
102102 }
103103
104+ pub fn flush ( & mut self ) -> Result < ( ) > {
105+ if let Some ( writer) = & mut self . writer {
106+ writer. flush ( ) ?;
107+ }
108+ Ok ( ( ) )
109+ }
110+
104111 /// Returns a reference to the in-progress file, if it exists.
105112 /// This can be used to get the file path for creating readers before the file is finished.
106113 pub fn file ( & self ) -> Option < & RefCountedTempFile > {
Original file line number Diff line number Diff line change @@ -310,6 +310,11 @@ impl IPCStreamWriter {
310310 Ok ( ( delta_num_rows, delta_num_bytes) )
311311 }
312312
313+ pub fn flush ( & mut self ) -> Result < ( ) > {
314+ self . writer . flush ( ) ?;
315+ Ok ( ( ) )
316+ }
317+
313318 /// Finish the writer
314319 pub fn finish ( & mut self ) -> Result < ( ) > {
315320 self . writer . finish ( ) . map_err ( Into :: into)
Original file line number Diff line number Diff line change @@ -188,6 +188,19 @@ impl SpillManager {
188188
189189 Ok ( spawn_buffered ( stream, self . batch_read_buffer_capacity ) )
190190 }
191+
192+ /// Same as `read_spill_as_stream`, but without buffering.
193+ pub fn read_spill_as_stream_unbuffered (
194+ & self ,
195+ spill_file_path : RefCountedTempFile ,
196+ max_record_batch_memory : Option < usize > ,
197+ ) -> Result < SendableRecordBatchStream > {
198+ Ok ( Box :: pin ( cooperative ( SpillReaderStream :: new (
199+ Arc :: clone ( & self . schema ) ,
200+ spill_file_path,
201+ max_record_batch_memory,
202+ ) ) ) )
203+ }
191204}
192205
193206pub ( crate ) trait GetSlicedSize {
Original file line number Diff line number Diff line change @@ -194,6 +194,8 @@ impl SpillPoolWriter {
194194 // Append the batch
195195 if let Some ( ref mut writer) = file_shared. writer {
196196 writer. append_batch ( batch) ?;
197+ // make sure we flush the writer for readers
198+ writer. flush ( ) ?;
197199 file_shared. batches_written += 1 ;
198200 file_shared. estimated_size += batch_size;
199201 }
@@ -535,7 +537,11 @@ impl Stream for SpillFile {
535537 // Step 2: Lazy-create reader stream if needed
536538 if self . reader . is_none ( ) && should_read {
537539 if let Some ( file) = file {
538- match self . spill_manager . read_spill_as_stream ( file, None ) {
540+ // we want this unbuffered because files are actively being written to
541+ match self
542+ . spill_manager
543+ . read_spill_as_stream_unbuffered ( file, None )
544+ {
539545 Ok ( stream) => {
540546 self . reader = Some ( SpillFileReader {
541547 stream,
You can’t perform that action at this time.
0 commit comments