Skip to content

Commit 043597f

Browse files
committed
Volume: Add progress + cancellation to streaming writes
- `write_from_stream` signature now includes `on_progress` callback that doubles as cancellation (return `ControlFlow::Break`) - `SmbVolume` uses smb2's `write_file_streamed` with the callback bridging progress/cancel per chunk - Streaming branch in `copy_single_path` passes progress and calls `on_file_complete` - Updated smb2 dep to v0.6.0 (streaming write support) Note: MTP→SMB currently panics due to nested `block_on` (MtpReadStream calls block_on inside SmbVolume's block_on). Next commit fixes this with a channel-based MTP stream.
1 parent ac71bd0 commit 043597f

5 files changed

Lines changed: 73 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 16 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,12 +558,22 @@ pub trait Volume: Send + Sync {
558558

559559
/// Writes data from a stream to the given path.
560560
///
561+
/// `on_progress(bytes_written, total_size)` is called after each chunk is
562+
/// written. Return `ControlFlow::Break(())` to cancel the transfer.
563+
///
561564
/// # Arguments
562565
/// * `dest` - Destination path (file will be created/overwritten)
563566
/// * `size` - Total size in bytes (required for protocols like MTP)
564567
/// * `stream` - Source data stream
565-
fn write_from_stream(&self, dest: &Path, size: u64, stream: Box<dyn VolumeReadStream>) -> Result<u64, VolumeError> {
566-
let _ = (dest, size, stream);
568+
/// * `on_progress` - Progress callback; return `ControlFlow::Break(())` to cancel
569+
fn write_from_stream(
570+
&self,
571+
dest: &Path,
572+
size: u64,
573+
stream: Box<dyn VolumeReadStream>,
574+
on_progress: &dyn Fn(u64, u64) -> std::ops::ControlFlow<()>,
575+
) -> Result<u64, VolumeError> {
576+
let _ = (dest, size, stream, on_progress);
567577
Err(VolumeError::NotSupported)
568578
}
569579
}

apps/desktop/src-tauri/src/file_system/volume/mtp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,7 @@ impl Volume for MtpVolume {
759759
dest: &Path,
760760
size: u64,
761761
mut stream: Box<dyn VolumeReadStream>,
762+
_on_progress: &dyn Fn(u64, u64) -> std::ops::ControlFlow<()>,
762763
) -> Result<u64, VolumeError> {
763764
let dest_folder = dest.parent().map(|p| self.to_mtp_path(p)).unwrap_or_default();
764765
let filename = dest

apps/desktop/src-tauri/src/file_system/volume/smb.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,8 +1136,9 @@ impl Volume for SmbVolume {
11361136
fn write_from_stream(
11371137
&self,
11381138
dest: &Path,
1139-
_size: u64,
1139+
size: u64,
11401140
mut stream: Box<dyn VolumeReadStream>,
1141+
on_progress: &dyn Fn(u64, u64) -> std::ops::ControlFlow<()>,
11411142
) -> Result<u64, VolumeError> {
11421143
let smb_path = self.to_smb_path(dest);
11431144
let handle = self.runtime_handle.clone();
@@ -1147,19 +1148,47 @@ impl Volume for SmbVolume {
11471148
self.share_name, smb_path
11481149
);
11491150

1150-
// Collect all chunks into a buffer, then write in one pipelined call
1151-
let mut data = Vec::new();
1152-
while let Some(result) = stream.next_chunk() {
1153-
data.extend_from_slice(&result?);
1154-
}
1155-
1156-
let len = data.len() as u64;
1151+
// Bridge VolumeReadStream → smb2's write_file_streamed callback.
1152+
// smb2 pulls chunks on demand, so memory is bounded to the sliding window.
1153+
let mut bytes_written = 0u64;
1154+
let mut cancelled = false;
11571155
let sp = smb_path;
1158-
self.with_smb("write_from_stream", |client, tree| {
1159-
handle.block_on(client.write_file_pipelined(tree, &sp, &data))
1160-
})?;
11611156

1162-
Ok(len)
1157+
let result = self.with_smb("write_from_stream", |client, tree| {
1158+
handle.block_on(client.write_file_streamed(tree, &sp, &mut || {
1159+
// Check cancellation via the progress callback
1160+
if cancelled {
1161+
return Some(Err(std::io::Error::new(
1162+
std::io::ErrorKind::Interrupted,
1163+
"Operation cancelled",
1164+
)));
1165+
}
1166+
1167+
match stream.next_chunk() {
1168+
Some(Ok(chunk)) => {
1169+
bytes_written += chunk.len() as u64;
1170+
if on_progress(bytes_written, size) == std::ops::ControlFlow::Break(()) {
1171+
cancelled = true;
1172+
// Don't return error here — let smb2 finish writing this chunk
1173+
// so we don't corrupt the connection state. Next call will error.
1174+
}
1175+
Some(Ok(chunk))
1176+
}
1177+
Some(Err(e)) => Some(Err(std::io::Error::other(e.to_string()))),
1178+
None => None,
1179+
}
1180+
}))
1181+
});
1182+
1183+
if cancelled {
1184+
return Err(VolumeError::IoError {
1185+
message: "Operation cancelled".to_string(),
1186+
raw_os_error: None,
1187+
});
1188+
}
1189+
1190+
result?;
1191+
Ok(bytes_written)
11631192
}
11641193

11651194
fn smb_connection_state(&self) -> Option<SmbConnectionState> {

apps/desktop/src-tauri/src/file_system/write_operations/volume_strategy.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ pub(super) fn copy_single_path(
7777
);
7878
let stream = source_volume.open_read_stream(source_path)?;
7979
let size = stream.total_size();
80-
return dest_volume.write_from_stream(dest_path, size, stream);
80+
let bytes = dest_volume.write_from_stream(dest_path, size, stream, on_file_progress)?;
81+
on_file_complete();
82+
return Ok(bytes);
8183
}
8284

8385
// Neither supports streaming — fall back to temp local (export then import)

0 commit comments

Comments
 (0)