Skip to content

Commit 1799c31

Browse files
authored
Fix Arrow Spill Underrun (#20159)
## Which issue does this PR close? - Closes #19425 ## Rationale for this change This adjusts the way that the spill channel works. Currently we have a spill writer & reader pairing which uses a mutex to coordindate when a file is ready to be read. What happens is, that because we were using a `spawn_buffered` call, the read task would race ahead trying to read a file which is yet to be written out completely. Alongside this, we need to flush each write to the file, as there is a chance that another thread may see stale data. ## What changes are included in this PR? Adds a flush on write, and converts the read task to not buffer reads. ## Are these changes tested? I haven't written a test, but I have been running the example in the attached issue. While it now fails with allocation errors, the original error goes away. ## Are there any user-facing changes? Nope
1 parent aef2965 commit 1799c31

4 files changed

Lines changed: 32 additions & 1 deletion

File tree

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ impl InProgressSpillFile {
101101
Ok(())
102102
}
103103

104+
pub fn flush(&mut self) -> Result<()> {
105+
if let Some(writer) = &mut self.writer {
106+
writer.flush()?;
107+
}
108+
Ok(())
109+
}
110+
104111
/// Returns a reference to the in-progress file, if it exists.
105112
/// This can be used to get the file path for creating readers before the file is finished.
106113
pub fn file(&self) -> Option<&RefCountedTempFile> {

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,11 @@ impl IPCStreamWriter {
310310
Ok((delta_num_rows, delta_num_bytes))
311311
}
312312

313+
pub fn flush(&mut self) -> Result<()> {
314+
self.writer.flush()?;
315+
Ok(())
316+
}
317+
313318
/// Finish the writer
314319
pub fn finish(&mut self) -> Result<()> {
315320
self.writer.finish().map_err(Into::into)

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,19 @@ impl SpillManager {
188188

189189
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
190190
}
191+
192+
/// Same as `read_spill_as_stream`, but without buffering.
193+
pub fn read_spill_as_stream_unbuffered(
194+
&self,
195+
spill_file_path: RefCountedTempFile,
196+
max_record_batch_memory: Option<usize>,
197+
) -> Result<SendableRecordBatchStream> {
198+
Ok(Box::pin(cooperative(SpillReaderStream::new(
199+
Arc::clone(&self.schema),
200+
spill_file_path,
201+
max_record_batch_memory,
202+
))))
203+
}
191204
}
192205

193206
pub(crate) trait GetSlicedSize {

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ impl SpillPoolWriter {
194194
// Append the batch
195195
if let Some(ref mut writer) = file_shared.writer {
196196
writer.append_batch(batch)?;
197+
// make sure we flush the writer for readers
198+
writer.flush()?;
197199
file_shared.batches_written += 1;
198200
file_shared.estimated_size += batch_size;
199201
}
@@ -535,7 +537,11 @@ impl Stream for SpillFile {
535537
// Step 2: Lazy-create reader stream if needed
536538
if self.reader.is_none() && should_read {
537539
if let Some(file) = file {
538-
match self.spill_manager.read_spill_as_stream(file, None) {
540+
// we want this unbuffered because files are actively being written to
541+
match self
542+
.spill_manager
543+
.read_spill_as_stream_unbuffered(file, None)
544+
{
539545
Ok(stream) => {
540546
self.reader = Some(SpillFileReader {
541547
stream,

0 commit comments

Comments
 (0)