diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index d82d31196d..01a5de1963 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -343,6 +343,7 @@ pub fn write_file_incremental, F: Fn(usize)>( { trace_scoped!("write_segment", "name": path_display, "len": len); f.write_all(&contents)?; + drop(contents); chunk_complete_callback(len); } } diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 56a5a897b4..6d3acae1d4 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -364,16 +364,18 @@ fn unpack_without_first_dir<'a, R: Read>( continue; } + struct SenderEntry<'a, 'b, R: std::io::Read> { + sender: Box) -> bool + 'a>, + entry: tar::Entry<'b, R>, + } + /// true if either no sender_entry was provided, or the incremental file /// has been fully dispatched. fn flush_ios<'a, R: std::io::Read, P: AsRef>( mut budget: &mut MemoryBudget, io_executor: &dyn Executor, mut directories: &mut HashMap, - mut sender_entry: Option<&mut ( - Box) -> bool + 'a>, - &mut tar::Entry<'_, R>, - )>, + mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>, full_path: P, ) -> Result { let mut result = sender_entry.is_none(); @@ -384,16 +386,20 @@ fn unpack_without_first_dir<'a, R: Read>( trigger_children(&*io_executor, &mut directories, &mut budget, op)?; } // Maybe stream a file incrementally - if let Some((sender, entry)) = sender_entry.as_mut() { + if let Some(sender) = sender_entry.as_mut() { if budget.available() as u64 >= IO_CHUNK_SIZE { let mut v = vec![0; IO_CHUNK_SIZE as usize]; - let len = entry.read(&mut v)?; + let len = sender + .entry + .by_ref() + .take(IO_CHUNK_SIZE) + .read_to_end(&mut v)?; if len == 0 { result = true; } v.resize(len, 0); budget.claim_chunk(len); - if !sender(v) { + if !(sender.sender)(v) { bail!(format!( "IO receiver for '{}' disconnected", full_path.as_ref().display() @@ -519,8 +525,11 @@ fn unpack_without_first_dir<'a, R: Read>( } } - let mut incremental_file_sender = incremental_file_sender - .map(|incremental_file_sender| (incremental_file_sender, &mut entry)); + let mut incremental_file_sender = + incremental_file_sender.map(|incremental_file_sender| SenderEntry { + sender: incremental_file_sender, + entry, + }); // monitor io queue and feed in the content of the file (if needed) while !flush_ios(