Skip to content

Commit ea845a6

Browse files
committed
MTP: Enable MTP2MTP transfers
Streams! This prepares features like S3Bucket→FTP. Also in this commit: handle read-only drives and display a nice error message
1 parent e476dc7 commit ea845a6

15 files changed

Lines changed: 732 additions & 37 deletions

File tree

apps/desktop/src-tauri/src/file_system/volume/local_posix_test.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ fn test_supports_watching_returns_true() {
101101
assert!(volume.supports_watching());
102102
}
103103

104+
#[test]
105+
fn test_supports_streaming_returns_false() {
106+
// LocalPosixVolume uses the default implementation which returns false.
107+
// Streaming is primarily for MTP-to-MTP transfers.
108+
let volume = LocalPosixVolume::new("Test", "/tmp");
109+
assert!(!volume.supports_streaming());
110+
}
111+
104112
#[test]
105113
fn test_write_operations() {
106114
use std::fs;

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,21 @@ impl std::fmt::Display for VolumeError {
9191

9292
impl std::error::Error for VolumeError {}
9393

94+
/// A stream of bytes read from a volume.
95+
///
96+
/// This is a synchronous, blocking iterator-style interface for reading
97+
/// file data in chunks. Used for streaming transfers between volumes.
98+
pub trait VolumeReadStream: Send {
99+
/// Returns the next chunk of data, or None if complete.
100+
fn next_chunk(&mut self) -> Option<Result<Vec<u8>, VolumeError>>;
101+
102+
/// Total size of the file in bytes.
103+
fn total_size(&self) -> u64;
104+
105+
/// Bytes read so far (for progress tracking).
106+
fn bytes_read(&self) -> u64;
107+
}
108+
94109
impl From<std::io::Error> for VolumeError {
95110
fn from(err: std::io::Error) -> Self {
96111
match err.kind() {
@@ -218,6 +233,35 @@ pub trait Volume: Send + Sync {
218233
fn get_space_info(&self) -> Result<SpaceInfo, VolumeError> {
219234
Err(VolumeError::NotSupported)
220235
}
236+
237+
// ========================================
238+
// Streaming: Optional, default not supported
239+
// ========================================
240+
241+
/// Returns true if this volume supports streaming read/write operations.
242+
fn supports_streaming(&self) -> bool {
243+
false
244+
}
245+
246+
/// Opens a streaming reader for the given path.
247+
///
248+
/// Returns a VolumeReadStream that yields chunks of data.
249+
/// The stream must be fully consumed or dropped before other operations.
250+
fn open_read_stream(&self, path: &Path) -> Result<Box<dyn VolumeReadStream>, VolumeError> {
251+
let _ = path;
252+
Err(VolumeError::NotSupported)
253+
}
254+
255+
/// Writes data from a stream to the given path.
256+
///
257+
/// # Arguments
258+
/// * `dest` - Destination path (file will be created/overwritten)
259+
/// * `size` - Total size in bytes (required for protocols like MTP)
260+
/// * `stream` - Source data stream
261+
fn write_from_stream(&self, dest: &Path, size: u64, stream: Box<dyn VolumeReadStream>) -> Result<u64, VolumeError> {
262+
let _ = (dest, size, stream);
263+
Err(VolumeError::NotSupported)
264+
}
221265
}
222266

223267
// Implementations

apps/desktop/src-tauri/src/file_system/volume/mtp.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
//! Wraps MTP device storage as a Volume, enabling MTP browsing through
44
//! the standard file listing pipeline (same icons, sorting, view modes as local files).
55
6-
use super::{ConflictInfo, CopyScanResult, SourceItemInfo, SpaceInfo, Volume, VolumeError};
6+
use super::{ConflictInfo, CopyScanResult, SourceItemInfo, SpaceInfo, Volume, VolumeError, VolumeReadStream};
77
use crate::file_system::FileEntry;
88
use crate::mtp::connection::{MtpConnectionError, connection_manager};
99
use log::debug;
10+
use mtp_rs::FileDownload;
1011
use std::path::{Path, PathBuf};
1112

1213
/// A volume backed by an MTP device storage.
@@ -380,6 +381,109 @@ impl Volume for MtpVolume {
380381
})
381382
.map_err(map_mtp_error)
382383
}
384+
385+
fn supports_streaming(&self) -> bool {
386+
true
387+
}
388+
389+
fn open_read_stream(&self, path: &Path) -> Result<Box<dyn VolumeReadStream>, VolumeError> {
390+
let mtp_path = self.to_mtp_path(path);
391+
let device_id = self.device_id.clone();
392+
let storage_id = self.storage_id;
393+
394+
let handle = tokio::runtime::Handle::current();
395+
396+
// Get the file download stream from connection manager
397+
let (download, total_size) = handle
398+
.block_on(async {
399+
connection_manager()
400+
.open_download_stream(&device_id, storage_id, &mtp_path)
401+
.await
402+
})
403+
.map_err(map_mtp_error)?;
404+
405+
Ok(Box::new(MtpReadStream {
406+
handle,
407+
download: Some(download),
408+
total_size,
409+
bytes_read: 0,
410+
}))
411+
}
412+
413+
fn write_from_stream(
414+
&self,
415+
dest: &Path,
416+
size: u64,
417+
mut stream: Box<dyn VolumeReadStream>,
418+
) -> Result<u64, VolumeError> {
419+
let dest_folder = dest.parent().map(|p| self.to_mtp_path(p)).unwrap_or_default();
420+
let filename = dest
421+
.file_name()
422+
.and_then(|n| n.to_str())
423+
.ok_or_else(|| VolumeError::IoError("Invalid filename".into()))?
424+
.to_string();
425+
426+
let device_id = self.device_id.clone();
427+
let storage_id = self.storage_id;
428+
429+
// IMPORTANT: Collect all chunks BEFORE entering block_on to avoid nested runtime error.
430+
// MtpReadStream::next_chunk() uses block_on internally, so we can't call it from
431+
// within another block_on (which upload_from_stream would do).
432+
let mut chunks: Vec<bytes::Bytes> = Vec::new();
433+
while let Some(result) = stream.next_chunk() {
434+
let data = result?;
435+
chunks.push(bytes::Bytes::from(data));
436+
}
437+
438+
let handle = tokio::runtime::Handle::current();
439+
440+
handle
441+
.block_on(async {
442+
connection_manager()
443+
.upload_from_chunks(&device_id, storage_id, &dest_folder, &filename, size, chunks)
444+
.await
445+
})
446+
.map_err(map_mtp_error)
447+
}
448+
}
449+
450+
/// Streaming reader for MTP files.
451+
///
452+
/// Wraps the mtp-rs FileDownload to provide sync iteration.
453+
pub struct MtpReadStream {
454+
/// Tokio runtime handle for blocking on async operations.
455+
handle: tokio::runtime::Handle,
456+
/// The underlying async download (wrapped in Option for take semantics).
457+
download: Option<FileDownload>,
458+
/// Total file size.
459+
total_size: u64,
460+
/// Bytes read so far.
461+
bytes_read: u64,
462+
}
463+
464+
impl VolumeReadStream for MtpReadStream {
465+
fn next_chunk(&mut self) -> Option<Result<Vec<u8>, VolumeError>> {
466+
let download = self.download.as_mut()?;
467+
468+
self.handle.block_on(async {
469+
match download.next_chunk().await {
470+
Some(Ok(bytes)) => {
471+
self.bytes_read += bytes.len() as u64;
472+
Some(Ok(bytes.to_vec()))
473+
}
474+
Some(Err(e)) => Some(Err(VolumeError::IoError(e.to_string()))),
475+
None => None,
476+
}
477+
})
478+
}
479+
480+
fn total_size(&self) -> u64 {
481+
self.total_size
482+
}
483+
484+
fn bytes_read(&self) -> u64 {
485+
self.bytes_read
486+
}
383487
}
384488

385489
/// Maps MTP connection errors to Volume errors.
@@ -461,4 +565,11 @@ mod tests {
461565
let vol = MtpVolume::new("mtp-20-5", 65537, "Test");
462566
assert!(!vol.supports_watching());
463567
}
568+
569+
#[test]
570+
fn test_supports_streaming_returns_true() {
571+
// MTP volumes support streaming for direct MTP-to-MTP transfers.
572+
let vol = MtpVolume::new("mtp-20-5", 65537, "Test");
573+
assert!(vol.supports_streaming());
574+
}
464575
}

apps/desktop/src-tauri/src/file_system/write_operations/volume_copy.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -517,9 +517,10 @@ fn copy_volumes_with_progress(
517517
/// Copies a single path from source volume to destination volume.
518518
///
519519
/// Determines the appropriate strategy based on volume types:
520+
/// - If both support streaming: Use streaming for direct transfer
520521
/// - If source is local: dest.import_from_local()
521522
/// - If dest is local: source.export_to_local()
522-
/// - Otherwise: export to temp, then import from temp
523+
/// - Otherwise: Not supported
523524
fn copy_single_path(
524525
source_volume: &Arc<dyn Volume>,
525526
source_path: &Path,
@@ -536,6 +537,22 @@ fn copy_single_path(
536537
let source_is_local = source_volume.root().starts_with("/") && !source_volume.root().starts_with("/mtp-volume/");
537538
let dest_is_local = dest_volume.root().starts_with("/") && !dest_volume.root().starts_with("/mtp-volume/");
538539

540+
// Try streaming path for non-local volumes that support it
541+
if !source_is_local && !dest_is_local {
542+
if source_volume.supports_streaming() && dest_volume.supports_streaming() {
543+
log::debug!(
544+
"copy_single_path: using streaming for {} -> {}",
545+
source_path.display(),
546+
dest_path.display()
547+
);
548+
let stream = source_volume.open_read_stream(source_path)?;
549+
let size = stream.total_size();
550+
return dest_volume.write_from_stream(dest_path, size, stream);
551+
}
552+
// Neither supports streaming - not supported
553+
return Err(VolumeError::NotSupported);
554+
}
555+
539556
if source_is_local && !dest_is_local {
540557
// Source is local, dest is not (e.g., Local → MTP)
541558
// Use import_from_local on destination
@@ -554,7 +571,7 @@ fn copy_single_path(
554571
dest_volume.root().join(dest_path)
555572
};
556573
source_volume.export_to_local(source_path, &local_dest)
557-
} else if source_is_local && dest_is_local {
574+
} else {
558575
// Both are local, use export which resolves paths internally
559576
// Note: export_to_local takes a path relative to the volume root for source,
560577
// and an absolute local path for destination
@@ -564,10 +581,6 @@ fn copy_single_path(
564581
dest_volume.root().join(dest_path)
565582
};
566583
source_volume.export_to_local(source_path, &local_dest)
567-
} else {
568-
// Both are non-local (e.g., MTP → MTP) - not supported directly
569-
// Would need to go through temp directory
570-
Err(VolumeError::NotSupported)
571584
}
572585
}
573586

0 commit comments

Comments
 (0)