Skip to content

Commit 1598f8c

Browse files
committed
MTP: Channel-based read stream to prevent nested block_on
Replace `MtpReadStream` (which called `block_on` in `next_chunk`, causing panics inside SmbVolume's `block_on`) with `MtpChannelStream`. A background tokio task reads USB chunks into a bounded `sync_channel(4)`. `next_chunk()` is a plain `recv()` — safe in any context. - On cancellation (receiver dropped), the background task calls `download.cancel()` to cleanly release the USB endpoint - Memory bounded to ~2 MB (4 × ~512 KB MTP chunks) - Updated smb2 dep: `Error::Io` no longer classified as `ConnectionLost`, so callback cancellation doesn't brick the SMB connection - Updated `map_smb_error_io` test for the new error classification
1 parent 043597f commit 1598f8c

3 files changed

Lines changed: 77 additions & 44 deletions

File tree

Cargo.lock

Lines changed: 13 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/desktop/src-tauri/src/file_system/volume/mtp.rs

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -746,12 +746,10 @@ impl Volume for MtpVolume {
746746
})
747747
.map_err(map_mtp_error)?;
748748

749-
Ok(Box::new(MtpReadStream {
750-
handle,
751-
download: Some(download),
752-
total_size,
753-
bytes_read: 0,
754-
}))
749+
// Spawn a background task to read chunks from USB and feed them through
750+
// a bounded channel. This makes next_chunk() a plain recv() — safe to
751+
// call from inside block_on (no nested runtime panic).
752+
Ok(Box::new(MtpChannelStream::spawn(&handle, download, total_size)))
755753
}
756754

757755
fn write_from_stream(
@@ -797,35 +795,69 @@ impl Volume for MtpVolume {
797795

798796
/// Streaming reader for MTP files.
799797
///
800-
/// Wraps the mtp-rs FileDownload to provide sync iteration.
801-
pub struct MtpReadStream {
802-
/// Tokio runtime handle for blocking on async operations.
803-
handle: tokio::runtime::Handle,
804-
/// The underlying async download (wrapped in Option for take semantics).
805-
download: Option<FileDownload>,
806-
/// Total file size.
798+
/// A background tokio task reads chunks from the MTP USB endpoint and sends
799+
/// them through a bounded channel. `next_chunk()` is a plain `recv()` — no
800+
/// `block_on`, so it's safe to call from any context, including inside another
801+
/// `block_on` (like `SmbVolume::write_from_stream`).
802+
///
803+
/// Cancellation: dropping the receiver causes the background task's next
804+
/// `send()` to fail. The task then calls `download.cancel().await` to cleanly
805+
/// release the USB endpoint, preventing the `ReceiveStream` drop panic.
806+
struct MtpChannelStream {
807+
rx: std::sync::mpsc::Receiver<Result<Vec<u8>, VolumeError>>,
807808
total_size: u64,
808-
/// Bytes read so far.
809809
bytes_read: u64,
810810
}
811811

812-
impl VolumeReadStream for MtpReadStream {
813-
fn next_chunk(&mut self) -> Option<Result<Vec<u8>, VolumeError>> {
814-
let download = self.download.as_mut()?;
815-
816-
self.handle.block_on(async {
817-
match download.next_chunk().await {
818-
Some(Ok(bytes)) => {
819-
self.bytes_read += bytes.len() as u64;
820-
Some(Ok(bytes.to_vec()))
812+
/// Channel capacity: number of chunks buffered between the MTP reader task and
813+
/// the consumer. 4 × ~512 KB (typical MTP chunk) ≈ 2 MB of buffering.
814+
const MTP_STREAM_CHANNEL_CAPACITY: usize = 4;
815+
816+
impl MtpChannelStream {
817+
/// Spawns the background reader task and returns the channel-backed stream.
818+
fn spawn(handle: &tokio::runtime::Handle, mut download: FileDownload, total_size: u64) -> Self {
819+
let (tx, rx) = std::sync::mpsc::sync_channel(MTP_STREAM_CHANNEL_CAPACITY);
820+
821+
handle.spawn(async move {
822+
loop {
823+
match download.next_chunk().await {
824+
Some(Ok(bytes)) => {
825+
if tx.send(Ok(bytes.to_vec())).is_err() {
826+
// Receiver dropped — consumer cancelled. Clean up the USB stream.
827+
let _ = download.cancel(std::time::Duration::from_millis(300)).await;
828+
break;
829+
}
830+
}
831+
Some(Err(e)) => {
832+
let _ = tx.send(Err(VolumeError::IoError {
833+
message: e.to_string(),
834+
raw_os_error: None,
835+
}));
836+
break;
837+
}
838+
None => break, // EOF — fully consumed, safe to drop
821839
}
822-
Some(Err(e)) => Some(Err(VolumeError::IoError {
823-
message: e.to_string(),
824-
raw_os_error: None,
825-
})),
826-
None => None,
827840
}
828-
})
841+
});
842+
843+
Self {
844+
rx,
845+
total_size,
846+
bytes_read: 0,
847+
}
848+
}
849+
}
850+
851+
impl VolumeReadStream for MtpChannelStream {
852+
fn next_chunk(&mut self) -> Option<Result<Vec<u8>, VolumeError>> {
853+
match self.rx.recv() {
854+
Ok(Ok(data)) => {
855+
self.bytes_read += data.len() as u64;
856+
Some(Ok(data))
857+
}
858+
Ok(Err(e)) => Some(Err(e)),
859+
Err(_) => None, // Sender dropped — EOF or task exited
860+
}
829861
}
830862

831863
fn total_size(&self) -> u64 {

apps/desktop/src-tauri/src/file_system/volume/smb.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,8 +1434,9 @@ mod tests {
14341434
fn map_smb_error_io() {
14351435
let err = smb2::Error::Io(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe broke"));
14361436
let ve = map_smb_error(err);
1437-
// IO errors with ConnectionLost kind map to DeviceDisconnected
1438-
assert!(matches!(ve, VolumeError::DeviceDisconnected(_)));
1437+
// IO errors (callback errors, etc.) are not connection losses — they map to IoError.
1438+
// Real connection losses come through Error::Disconnected → ConnectionLost.
1439+
assert!(matches!(ve, VolumeError::IoError { .. }));
14391440
}
14401441

14411442
// ── Connection state tests ──────────────────────────────────────

0 commit comments

Comments
 (0)