Skip to content

Commit ac5ec4d

Browse files
committed
MTP: Fix copy progress and mid-file cancellation
- Implement `export_to_local_with_progress()` on `MtpVolume`, mirroring the existing SMB pattern - Add `download_file_with_progress()` to MTP `file_ops` — calls the progress callback after each USB chunk and breaks on `ControlFlow::Break` - Thread progress callback through `download_recursive_with_progress()` and `download_entries_recursive_with_progress()` in `bulk_ops` - On cancel, clean up partial file before returning `MtpConnectionError::Cancelled` - Add `Cancelled` variant to `MtpConnectionError` and `VolumeError` - Previously, MTP fell through to the default `export_to_local_with_progress()` which ignored the callback entirely — no progress events, and cancel only took effect between files (not mid-file)
1 parent 282e9c0 commit ac5ec4d

6 files changed

Lines changed: 248 additions & 0 deletions

File tree

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub enum VolumeError {
9494
},
9595
/// Connection timed out.
9696
ConnectionTimeout(String),
97+
/// Operation was cancelled by the user (progress callback returned Break).
98+
Cancelled(String),
9799
IoError(String),
98100
}
99101

@@ -108,6 +110,7 @@ impl std::fmt::Display for VolumeError {
108110
Self::ReadOnly(msg) => write!(f, "Read-only: {}", msg),
109111
Self::StorageFull { message } => write!(f, "Storage full: {}", message),
110112
Self::ConnectionTimeout(msg) => write!(f, "Connection timed out: {}", msg),
113+
Self::Cancelled(msg) => write!(f, "Cancelled: {}", msg),
111114
Self::IoError(msg) => write!(f, "I/O error: {}", msg),
112115
}
113116
}

apps/desktop/src-tauri/src/file_system/volume/mtp.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,24 @@ use log::debug;
1010
use mtp_rs::FileDownload;
1111
use std::path::{Path, PathBuf};
1212

13+
/// Wrapper to assert Send + Sync on a progress callback reference.
14+
///
15+
/// SAFETY: MtpVolume methods are called from `spawn_blocking` contexts (single OS thread).
16+
/// The callback never crosses thread boundaries — `block_on` runs the async download on the
17+
/// same thread. The Volume trait's `Fn` callbacks use atomics for interior mutation, so they
18+
/// are effectively Send + Sync even though the trait doesn't declare it.
19+
struct SendSyncProgress<'a>(&'a dyn Fn(u64, u64) -> std::ops::ControlFlow<()>);
20+
21+
// SAFETY: See above — callback is only accessed from the single spawn_blocking thread.
22+
unsafe impl Send for SendSyncProgress<'_> {}
23+
unsafe impl Sync for SendSyncProgress<'_> {}
24+
25+
impl SendSyncProgress<'_> {
26+
fn call(&self, bytes_done: u64, bytes_total: u64) -> std::ops::ControlFlow<()> {
27+
(self.0)(bytes_done, bytes_total)
28+
}
29+
}
30+
1331
/// A volume backed by an MTP device storage.
1432
///
1533
/// This implementation wraps the MTP connection manager to provide file system
@@ -479,6 +497,43 @@ impl Volume for MtpVolume {
479497
.map_err(map_mtp_error)
480498
}
481499

500+
fn export_to_local_with_progress(
501+
&self,
502+
source: &Path,
503+
local_dest: &Path,
504+
on_progress: &dyn Fn(u64, u64) -> std::ops::ControlFlow<()>,
505+
) -> Result<u64, VolumeError> {
506+
let mtp_path = self.to_mtp_path(source);
507+
let device_id = self.device_id.clone();
508+
let storage_id = self.storage_id;
509+
let local_dest = local_dest.to_path_buf();
510+
511+
debug!(
512+
"MtpVolume::export_to_local_with_progress: device={}, storage={}, source={}, dest={}",
513+
device_id,
514+
storage_id,
515+
mtp_path,
516+
local_dest.display()
517+
);
518+
519+
let handle = tokio::runtime::Handle::current();
520+
let progress = SendSyncProgress(on_progress);
521+
522+
handle
523+
.block_on(async {
524+
connection_manager()
525+
.download_recursive_with_progress(
526+
&device_id,
527+
storage_id,
528+
&mtp_path,
529+
&local_dest,
530+
&|bytes_done, bytes_total| progress.call(bytes_done, bytes_total),
531+
)
532+
.await
533+
})
534+
.map_err(map_mtp_error)
535+
}
536+
482537
fn export_to_local(&self, source: &Path, local_dest: &Path) -> Result<u64, VolumeError> {
483538
let mtp_path = self.to_mtp_path(source);
484539
let device_id = self.device_id.clone();
@@ -707,6 +762,7 @@ fn map_mtp_error(e: MtpConnectionError) -> VolumeError {
707762
MtpConnectionError::ExclusiveAccess { .. } | MtpConnectionError::PermissionDenied { .. } => {
708763
VolumeError::PermissionDenied(e.to_string())
709764
}
765+
MtpConnectionError::Cancelled { .. } => VolumeError::Cancelled(e.to_string()),
710766
MtpConnectionError::Disconnected { .. } => VolumeError::DeviceDisconnected(e.to_string()),
711767
MtpConnectionError::Timeout { .. } => VolumeError::ConnectionTimeout(e.to_string()),
712768
MtpConnectionError::StorageFull { .. } => VolumeError::StorageFull { message: e.to_string() },

apps/desktop/src-tauri/src/file_system/write_operations/volume_copy.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,9 @@ pub(super) fn map_volume_error(context_path: &str, e: VolumeError) -> WriteOpera
13371337
VolumeError::ConnectionTimeout(_) => WriteOperationError::ConnectionInterrupted {
13381338
path: context_path.to_string(),
13391339
},
1340+
VolumeError::Cancelled(_) => WriteOperationError::Cancelled {
1341+
message: "Operation cancelled by user".to_string(),
1342+
},
13401343
VolumeError::IoError(msg) => WriteOperationError::IoError {
13411344
path: context_path.to_string(),
13421345
message: msg,

apps/desktop/src-tauri/src/mtp/connection/bulk_ops.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,144 @@ impl MtpConnectionManager {
273273
Ok(total_bytes)
274274
}
275275

276+
/// Downloads a file or directory recursively with a progress/cancellation callback.
277+
///
278+
/// Same as `download_recursive` but calls `on_progress(bytes_done, bytes_total)` per chunk.
279+
/// Returns `ControlFlow::Break(())` from the callback to cancel.
280+
pub async fn download_recursive_with_progress(
281+
&self,
282+
device_id: &str,
283+
storage_id: u32,
284+
object_path: &str,
285+
local_dest: &Path,
286+
on_progress: &(dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Send + Sync),
287+
) -> Result<u64, MtpConnectionError> {
288+
debug!(
289+
"MTP download_recursive_with_progress: device={}, storage={}, path={}, dest={}",
290+
device_id,
291+
storage_id,
292+
object_path,
293+
local_dest.display()
294+
);
295+
296+
match self.list_directory(device_id, storage_id, object_path).await {
297+
Ok(entries) if !entries.is_empty() => {
298+
self.download_entries_recursive_with_progress(device_id, storage_id, &entries, local_dest, on_progress)
299+
.await
300+
}
301+
Ok(_) => {
302+
if self.is_file_by_parent(device_id, storage_id, object_path).await {
303+
let operation_id = format!("download-{}", uuid::Uuid::new_v4());
304+
let result = self
305+
.download_file_with_progress(
306+
device_id,
307+
storage_id,
308+
object_path,
309+
local_dest,
310+
None,
311+
&operation_id,
312+
Some(on_progress),
313+
)
314+
.await?;
315+
Ok(result.bytes_transferred)
316+
} else {
317+
tokio::fs::create_dir_all(local_dest)
318+
.await
319+
.map_err(|e| MtpConnectionError::Other {
320+
device_id: device_id.to_string(),
321+
message: format!("Failed to create local directory: {}", e),
322+
})?;
323+
Ok(0)
324+
}
325+
}
326+
Err(e) => {
327+
if self.is_file_by_parent(device_id, storage_id, object_path).await {
328+
let operation_id = format!("download-{}", uuid::Uuid::new_v4());
329+
let result = self
330+
.download_file_with_progress(
331+
device_id,
332+
storage_id,
333+
object_path,
334+
local_dest,
335+
None,
336+
&operation_id,
337+
Some(on_progress),
338+
)
339+
.await?;
340+
Ok(result.bytes_transferred)
341+
} else {
342+
Err(e)
343+
}
344+
}
345+
}
346+
}
347+
348+
/// Downloads directory entries recursively with a progress/cancellation callback.
349+
async fn download_entries_recursive_with_progress(
350+
&self,
351+
device_id: &str,
352+
storage_id: u32,
353+
entries: &[FileEntry],
354+
local_dest: &Path,
355+
on_progress: &(dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Send + Sync),
356+
) -> Result<u64, MtpConnectionError> {
357+
tokio::fs::create_dir_all(local_dest)
358+
.await
359+
.map_err(|e| MtpConnectionError::Other {
360+
device_id: device_id.to_string(),
361+
message: format!("Failed to create local directory: {}", e),
362+
})?;
363+
364+
let mut total_bytes = 0u64;
365+
366+
for entry in entries {
367+
let child_dest = local_dest.join(&entry.name);
368+
369+
if entry.is_directory {
370+
let children = self.list_directory(device_id, storage_id, &entry.path).await?;
371+
if children.is_empty() {
372+
tokio::fs::create_dir_all(&child_dest)
373+
.await
374+
.map_err(|e| MtpConnectionError::Other {
375+
device_id: device_id.to_string(),
376+
message: format!("Failed to create local directory: {}", e),
377+
})?;
378+
} else {
379+
let bytes = Box::pin(self.download_entries_recursive_with_progress(
380+
device_id,
381+
storage_id,
382+
&children,
383+
&child_dest,
384+
on_progress,
385+
))
386+
.await?;
387+
total_bytes += bytes;
388+
}
389+
} else {
390+
let operation_id = format!("download-{}", uuid::Uuid::new_v4());
391+
let result = self
392+
.download_file_with_progress(
393+
device_id,
394+
storage_id,
395+
&entry.path,
396+
&child_dest,
397+
None,
398+
&operation_id,
399+
Some(on_progress),
400+
)
401+
.await?;
402+
total_bytes += result.bytes_transferred;
403+
}
404+
}
405+
406+
debug!(
407+
"MTP download_entries_recursive_with_progress: directory {} complete, {} bytes",
408+
local_dest.display(),
409+
total_bytes
410+
);
411+
Ok(total_bytes)
412+
}
413+
276414
/// Checks if a path is a file by looking it up in its parent directory listing.
277415
async fn is_file_by_parent(&self, device_id: &str, storage_id: u32, path: &str) -> bool {
278416
let path_buf = normalize_mtp_path(path);

apps/desktop/src-tauri/src/mtp/connection/errors.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub enum MtpConnectionError {
4040
PermissionDenied {
4141
device_id: String,
4242
},
43+
Cancelled {
44+
device_id: String,
45+
message: String,
46+
},
4347
ObjectNotFound {
4448
device_id: String,
4549
path: String,
@@ -90,6 +94,9 @@ impl std::fmt::Display for MtpConnectionError {
9094
Self::PermissionDenied { device_id } => {
9195
write!(f, "Permission denied for device: {device_id}")
9296
}
97+
Self::Cancelled { device_id, message } => {
98+
write!(f, "Cancelled on {device_id}: {message}")
99+
}
93100
Self::ObjectNotFound { device_id, path } => {
94101
write!(f, "Object not found on {device_id}: {path}")
95102
}
@@ -299,6 +306,10 @@ mod tests {
299306
MtpConnectionError::PermissionDenied {
300307
device_id: "test".to_string(),
301308
},
309+
MtpConnectionError::Cancelled {
310+
device_id: "test".to_string(),
311+
message: "cancelled".to_string(),
312+
},
302313
MtpConnectionError::ObjectNotFound {
303314
device_id: "test".to_string(),
304315
path: "/path".to_string(),

apps/desktop/src-tauri/src/mtp/connection/file_ops.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,25 @@ impl MtpConnectionManager {
3333
local_dest: &Path,
3434
app: Option<&AppHandle>,
3535
operation_id: &str,
36+
) -> Result<MtpOperationResult, MtpConnectionError> {
37+
self.download_file_with_progress(device_id, storage_id, object_path, local_dest, app, operation_id, None)
38+
.await
39+
}
40+
41+
/// Downloads a file from the MTP device to a local path, with optional progress/cancellation callback.
42+
///
43+
/// The `on_progress` callback receives `(bytes_done, bytes_total)` and returns `ControlFlow::Break(())`
44+
/// to cancel the transfer. On cancellation, the partial file is removed.
45+
#[allow(clippy::too_many_arguments, reason = "mirrors download_file with added on_progress callback")]
46+
pub async fn download_file_with_progress(
47+
&self,
48+
device_id: &str,
49+
storage_id: u32,
50+
object_path: &str,
51+
local_dest: &Path,
52+
app: Option<&AppHandle>,
53+
operation_id: &str,
54+
on_progress: Option<&(dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Send + Sync)>,
3655
) -> Result<MtpOperationResult, MtpConnectionError> {
3756
debug!(
3857
"MTP download_file: device={}, storage={}, path={}, dest={}",
@@ -117,6 +136,7 @@ impl MtpConnectionManager {
117136

118137
// Write chunks to file (must complete before releasing device lock)
119138
let mut bytes_written = 0u64;
139+
let mut cancelled = false;
120140
while let Some(chunk_result) = download.next_chunk().await {
121141
let chunk = chunk_result.map_err(|e| MtpConnectionError::Other {
122142
device_id: device_id.to_string(),
@@ -129,12 +149,29 @@ impl MtpConnectionManager {
129149
})?;
130150

131151
bytes_written += chunk.len() as u64;
152+
153+
// Report progress and check for cancellation
154+
if let Some(ref cb) = on_progress
155+
&& cb(bytes_written, total_size).is_break() {
156+
cancelled = true;
157+
break;
158+
}
132159
}
133160

134161
// Release device lock after download completes
135162
drop(storage);
136163
drop(device);
137164

165+
// On cancellation, clean up the partial file
166+
if cancelled {
167+
drop(file);
168+
let _ = tokio::fs::remove_file(local_dest).await;
169+
return Err(MtpConnectionError::Cancelled {
170+
device_id: device_id.to_string(),
171+
message: "Download cancelled".to_string(),
172+
});
173+
}
174+
138175
file.flush().await.map_err(|e| MtpConnectionError::Other {
139176
device_id: device_id.to_string(),
140177
message: format!("Failed to flush local file: {}", e),

0 commit comments

Comments
 (0)