Skip to content

Commit 9d6df0e

Browse files
committed
Speedup: Compound read/write fast-path for small SMB files
- Added `Volume::open_read_stream_with_hint(path, size_hint)` to the trait (default falls through to `open_read_stream`). Backends that can exploit a size hint (SMB) override it. - `SmbVolume` now picks `Tree::read_file_compound` (1 RTT: CREATE+READ+CLOSE in one compound frame) when the caller-provided hint is ≤ negotiated `max_read_size`. Returned bytes are wrapped as a single-chunk `InlineReadStream` so the consumer API is unchanged. Falls back cleanly to the streaming path when the hint is missing, too big, or the compound read returns short. - `SmbVolume::write_from_stream` drains the source stream when `size ≤ max_write_size` and calls `Tree::write_file_compound` (1 RTT: CREATE+WRITE+FLUSH+CLOSE). Only enabled when size is known up-front — streaming path handles unknown/large sizes. - `copy_single_path` and `stream_pipe_file` thread a `source_size_hint: Option<u64>` through. The scan HashMap now carries a `SourceHint { is_directory, size }` struct; for top-level files the size is `CopyScanResult.total_bytes`. `copy_directory_streaming` uses `entry.size` from the directory listing. - On a 60 ms RTT link, a 10 KB SMB→Local copy drops from 3 RTTs + 2 stat probes (Fix 1 removed 1) down to 1 RTT on the read side. The same treatment on the write side collapses 4 RTTs to 1 for small Local→SMB copies. CLAUDE.md in `volume/` documents the decision; eval numbers live in `docs/notes/phase4-rtt-investigation.md`.
1 parent 9409055 commit 9d6df0e

6 files changed

Lines changed: 248 additions & 20 deletions

File tree

apps/desktop/src-tauri/src/file_system/volume/CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ spawned detached task. This is safe because the stream always lives in an async
360360
**Gotcha**: Watcher filenames are NFC (from server) but macOS mount paths are NFD
361361
**Why**: SMB servers return NFC-normalized filenames. macOS filesystem paths use NFD. The watcher NFD-normalizes filenames before constructing display paths used for cache lookups.
362362

363+
**Decision**: `SmbVolume` has a compound fast-path in `open_read_stream_with_hint` and `write_from_stream` for files ≤ `max_read_size` / `max_write_size`
364+
**Why**: The streaming open+read+close sequence costs 3 RTTs per file. For small files (typical 10 KB copies on a NAS) that dominates wall-clock at high-latency links (~60 ms RTT → ~180 ms/file just for protocol overhead, not data). `smb2` already exposes `Tree::read_file_compound` (CREATE+READ+CLOSE in a single compound frame = 1 RTT) and `Tree::write_file_compound` (CREATE+WRITE+FLUSH+CLOSE = 1 RTT). The copy pipeline feeds per-file size hints from the pre-copy scan; when the size is known and fits in one READ/WRITE, we take the compound path. Falls back cleanly to the streaming reader/writer when the hint is missing or the file is too big. Small compound reads return a `Vec<u8>` wrapped as a single-chunk `InlineReadStream` so the consumer API stays shaped the same. See `docs/notes/phase4-rtt-investigation.md` for the measurement.
365+
363366
**Decision**: Phase 4 collapsed `export_to_local` / `import_from_local` onto `open_read_stream` / `write_from_stream`
364367
**Why**: The three pre-Phase-4 copy paths (local↔local, local↔volume, volume↔volume) duplicated the same "open a reader, pipe to a writer" logic in three different shapes. The APFS clonefile fast path is the only one with a real capability difference. Collapsing the other two to a single streaming path means new backends (S3, WebDAV, FTP) implement two methods instead of four, concurrency lives in one place (`volume_copy.rs` — Phase 4.2), and features like resume / checksum / progress benefit every direction at once. See `docs/notes/phase4-volume-copy-unification.md`.
365368

apps/desktop/src-tauri/src/file_system/volume/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,29 @@ pub trait Volume: Send + Sync {
599599
Box::pin(async { Err(VolumeError::NotSupported) })
600600
}
601601

602+
/// Opens a streaming reader with an optional size hint from the caller.
603+
///
604+
/// Network-backed volumes can use the hint to pick a faster compound
605+
/// request path for small files (e.g., SMB's CREATE+READ+CLOSE compound)
606+
/// instead of the 3-RTT streaming open. Backends that can't use the hint
607+
/// fall through to `open_read_stream`.
608+
///
609+
/// The hint is best-effort — callers pass `None` when they don't know
610+
/// the size ahead of time, and the backend must work correctly either
611+
/// way.
612+
#[allow(
613+
clippy::type_complexity,
614+
reason = "async trait method returns a pinned boxed future by design"
615+
)]
616+
fn open_read_stream_with_hint<'a>(
617+
&'a self,
618+
path: &'a Path,
619+
size_hint: Option<u64>,
620+
) -> Pin<Box<dyn Future<Output = Result<Box<dyn VolumeReadStream>, VolumeError>> + Send + 'a>> {
621+
let _ = size_hint;
622+
self.open_read_stream(path)
623+
}
624+
602625
/// Writes data from a stream to the given path.
603626
///
604627
/// `on_progress(bytes_written, total_size)` is called after each chunk is

apps/desktop/src-tauri/src/file_system/volume/smb.rs

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,46 @@ impl VolumeReadStream for SmbReadStream {
582582
}
583583
}
584584

585+
/// Wraps a pre-read `Vec<u8>` as a `VolumeReadStream` that yields the whole
586+
/// buffer as a single chunk. Used by the compound fast-path in
587+
/// `open_read_stream_with_hint`, where the full file body came back inside one
588+
/// SMB compound response — there's no more I/O to drive, just hand the bytes
589+
/// to the consumer.
590+
struct InlineReadStream {
591+
data: Option<Vec<u8>>,
592+
total_size: u64,
593+
bytes_read: u64,
594+
}
595+
596+
impl InlineReadStream {
597+
fn new(data: Vec<u8>) -> Self {
598+
let total_size = data.len() as u64;
599+
Self {
600+
data: Some(data),
601+
total_size,
602+
bytes_read: 0,
603+
}
604+
}
605+
}
606+
607+
impl VolumeReadStream for InlineReadStream {
608+
fn next_chunk(&mut self) -> Pin<Box<dyn Future<Output = Option<Result<Vec<u8>, VolumeError>>> + Send + '_>> {
609+
Box::pin(async move {
610+
let data = self.data.take()?;
611+
self.bytes_read = data.len() as u64;
612+
Some(Ok(data))
613+
})
614+
}
615+
616+
fn total_size(&self) -> u64 {
617+
self.total_size
618+
}
619+
620+
fn bytes_read(&self) -> u64 {
621+
self.bytes_read
622+
}
623+
}
624+
585625
/// If an smb2 error indicates the session is dead, transition state to
586626
/// `Disconnected`. Mirrors `handle_smb_result` for contexts without `&self`.
587627
fn update_state_on_smb_error(state: &AtomicU8, err: &smb2::Error) {
@@ -1074,6 +1114,54 @@ impl Volume for SmbVolume {
10741114
})
10751115
}
10761116

1117+
fn open_read_stream_with_hint<'a>(
1118+
&'a self,
1119+
path: &'a Path,
1120+
size_hint: Option<u64>,
1121+
) -> Pin<Box<dyn Future<Output = Result<Box<dyn VolumeReadStream>, VolumeError>> + Send + 'a>> {
1122+
Box::pin(async move {
1123+
let smb_path = self.to_smb_path(path);
1124+
1125+
// Compound fast-path: if the caller-provided hint fits in one READ,
1126+
// send CREATE+READ+CLOSE as a single compound frame (1 RTT) instead
1127+
// of the 3-RTT streaming open. Falls through to the streaming path
1128+
// when the hint is missing, too large, or the compound read returns
1129+
// short (truncated file — rare but possible if size changed since
1130+
// the scan).
1131+
if let Some(size) = size_hint {
1132+
let mut guard = self.acquire_smb().await?;
1133+
let (client, tree) = guard.as_mut().unwrap();
1134+
let max_read = client.params().map(|p| p.max_read_size).unwrap_or(65536) as u64;
1135+
if size > 0 && size <= max_read {
1136+
debug!(
1137+
"SmbVolume::open_read_stream_with_hint: share={}, path={:?}, size={} — using compound fast-path",
1138+
self.share_name, smb_path, size
1139+
);
1140+
let read_result = tree.read_file_compound(client.connection_mut(), &smb_path).await;
1141+
let data = self.handle_smb_result("open_read_stream_with_hint(compound)", read_result)?;
1142+
if data.len() as u64 == size {
1143+
drop(guard);
1144+
return Ok(Box::new(InlineReadStream::new(data)) as Box<dyn VolumeReadStream>);
1145+
}
1146+
debug!(
1147+
"SmbVolume::open_read_stream_with_hint: compound read returned {} bytes, expected {} — falling back to streaming",
1148+
data.len(),
1149+
size
1150+
);
1151+
// Fall through — release the guard first.
1152+
}
1153+
drop(guard);
1154+
}
1155+
1156+
debug!(
1157+
"SmbVolume::open_read_stream_with_hint: share={}, path={:?} — using streaming path",
1158+
self.share_name, smb_path
1159+
);
1160+
let stream = self.open_smb_download_stream(&smb_path).await?;
1161+
Ok(Box::new(stream) as Box<dyn VolumeReadStream>)
1162+
})
1163+
}
1164+
10771165
fn write_from_stream<'a>(
10781166
&'a self,
10791167
dest: &'a Path,
@@ -1085,8 +1173,8 @@ impl Volume for SmbVolume {
10851173
let smb_path = self.to_smb_path(dest);
10861174

10871175
debug!(
1088-
"SmbVolume::write_from_stream: share={}, path={:?}",
1089-
self.share_name, smb_path
1176+
"SmbVolume::write_from_stream: share={}, path={:?}, size={}",
1177+
self.share_name, smb_path, size
10901178
);
10911179

10921180
// Holds the SMB session mutex for the duration of the transfer.
@@ -1095,6 +1183,51 @@ impl Volume for SmbVolume {
10951183
let mut guard = self.acquire_smb().await?;
10961184
let (client, tree) = guard.as_mut().unwrap();
10971185

1186+
// Compound fast-path: when the caller promised a size that fits in
1187+
// one WRITE, drain the source stream into a buffer and send
1188+
// CREATE+WRITE+FLUSH+CLOSE as a single compound frame (1 RTT
1189+
// instead of 4). Small files are the hot case — for anything
1190+
// larger we use the streaming writer below.
1191+
let max_write = client.params().map(|p| p.max_write_size).unwrap_or(65536) as u64;
1192+
if size > 0 && size <= max_write {
1193+
let mut buffer = Vec::with_capacity(size as usize);
1194+
while let Some(chunk_result) = stream.next_chunk().await {
1195+
let chunk = chunk_result?;
1196+
buffer.extend_from_slice(&chunk);
1197+
}
1198+
if buffer.len() as u64 == size {
1199+
debug!(
1200+
"SmbVolume::write_from_stream: using compound fast-path ({} bytes)",
1201+
buffer.len()
1202+
);
1203+
let write_result = tree
1204+
.write_file_compound(client.connection_mut(), &smb_path, &buffer)
1205+
.await;
1206+
let bytes_written = self.handle_smb_result("write_from_stream(compound)", write_result)?;
1207+
// Emit a single progress tick so counters match the
1208+
// streaming path's post-loop state.
1209+
let _ = on_progress(bytes_written, size);
1210+
return Ok(bytes_written);
1211+
}
1212+
// Size mismatch — fall back to the streaming path so a
1213+
// subsequent open writes whatever came through.
1214+
debug!(
1215+
"SmbVolume::write_from_stream: compound fast-path source yielded {} bytes, expected {} — falling back",
1216+
buffer.len(),
1217+
size
1218+
);
1219+
// Re-feed the already-drained buffer via the streaming writer.
1220+
let writer_result = client.create_file_writer(tree, &smb_path).await;
1221+
let mut writer = self.handle_smb_result("write_from_stream(open)", writer_result)?;
1222+
if !buffer.is_empty() {
1223+
let write_result = writer.write_chunk(&buffer).await;
1224+
self.handle_smb_result("write_from_stream(write_chunk)", write_result)?;
1225+
}
1226+
let finish_result = writer.finish().await;
1227+
self.handle_smb_result("write_from_stream(finish)", finish_result)?;
1228+
return Ok(buffer.len() as u64);
1229+
}
1230+
10981231
let writer_result = client.create_file_writer(tree, &smb_path).await;
10991232
let mut writer = self.handle_smb_result("write_from_stream(open)", writer_result)?;
11001233

apps/desktop/src-tauri/src/file_system/write_operations/volume_copy.rs

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ use futures_util::StreamExt;
2828
use futures_util::stream::FuturesUnordered;
2929

3030
use super::scan::take_cached_scan_result;
31+
32+
/// Per-source hints collected during the scan phase, so the copy loop can
33+
/// skip re-probing the source type/size per file. `size` is only meaningful
34+
/// when `is_directory == false`; it's the top-level file's size and feeds
35+
/// the SMB compound fast-path.
36+
#[derive(Clone, Copy, Default)]
37+
struct SourceHint {
38+
is_directory: bool,
39+
size: u64,
40+
}
3141
use super::state::{
3242
OperationIntent, WRITE_OPERATION_STATE, WriteOperationState, is_cancelled, load_intent, register_operation_status,
3343
unregister_operation_status, update_operation_status,
@@ -298,10 +308,11 @@ async fn copy_volumes_with_progress(
298308
let mut total_files;
299309
let mut total_bytes;
300310

301-
// Per-source `is_directory` hint collected during the scan, so the copy
302-
// loop doesn't need to re-stat each source path via `is_directory`.
303-
// Saves one round-trip per file on network-backed volumes (SMB, MTP).
304-
let mut source_is_directory: HashMap<PathBuf, bool> = HashMap::with_capacity(source_paths.len());
311+
// Per-source hint collected during the scan: whether the top-level path
312+
// is a directory and, for top-level files, the file size. The copy loop
313+
// reuses these to skip an `is_directory` probe per file and, for SMB, to
314+
// pick the 1-RTT compound fast-path when the file fits in one READ.
315+
let mut source_hints: HashMap<PathBuf, SourceHint> = HashMap::with_capacity(source_paths.len());
305316

306317
if let Some(cached) = config.preview_id.as_deref().and_then(take_cached_scan_result) {
307318
total_files = cached.file_count;
@@ -312,12 +323,19 @@ async fn copy_volumes_with_progress(
312323
total_files,
313324
total_bytes
314325
);
315-
// TODO: extend the preview cache to carry per-source type so this branch
316-
// doesn't need to re-stat. For now, the preview path already saved one
317-
// scan per source — this extra stat is bounded by source count.
326+
// TODO: extend the preview cache to carry per-source type + size so this
327+
// branch doesn't need to re-stat. For now, the preview path already saved
328+
// one full scan per source — this extra stat is bounded by source count
329+
// and the compound fast-path falls back cleanly when size is unknown.
318330
for source_path in source_paths {
319331
let is_dir = source_volume.is_directory(source_path).await.unwrap_or(false);
320-
source_is_directory.insert(source_path.clone(), is_dir);
332+
source_hints.insert(
333+
source_path.clone(),
334+
SourceHint {
335+
is_directory: is_dir,
336+
size: 0,
337+
},
338+
);
321339
}
322340
} else {
323341
log::debug!(
@@ -354,7 +372,20 @@ async fn copy_volumes_with_progress(
354372
total_files += scan.file_count;
355373
total_dirs += scan.dir_count;
356374
total_bytes += scan.total_bytes;
357-
source_is_directory.insert(source_path.clone(), scan.top_level_is_directory);
375+
// For top-level files, `scan.total_bytes` == the file size.
376+
// For directories, we leave `size = 0` (unused downstream).
377+
let size = if scan.top_level_is_directory {
378+
0
379+
} else {
380+
scan.total_bytes
381+
};
382+
source_hints.insert(
383+
source_path.clone(),
384+
SourceHint {
385+
is_directory: scan.top_level_is_directory,
386+
size,
387+
},
388+
);
358389
}
359390

360391
log::debug!(
@@ -485,7 +516,7 @@ async fn copy_volumes_with_progress(
485516
// branch that didn't populate it), default to false —
486517
// behaves as if the source is a file (worst case: one extra
487518
// conflict prompt vs. silent merge for dir-over-dir).
488-
let source_is_dir = source_is_directory.get(source_path).copied().unwrap_or(false);
519+
let source_is_dir = source_hints.get(source_path).map(|h| h.is_directory).unwrap_or(false);
489520
let dest_is_dir = dest_meta.is_directory;
490521
if source_is_dir && dest_is_dir {
491522
log::debug!(
@@ -546,7 +577,9 @@ async fn copy_volumes_with_progress(
546577
let source_owned = source_path.clone();
547578
let dest_owned = dest_item_path.clone();
548579
let file_name_owned = file_name.clone();
549-
let source_is_dir_hint = source_is_directory.get(source_path).copied().unwrap_or(false);
580+
let hint = source_hints.get(source_path).copied().unwrap_or_default();
581+
let source_is_dir_hint = hint.is_directory;
582+
let source_size_hint = if hint.is_directory { None } else { Some(hint.size) };
550583

551584
in_flight.push(Box::pin(async move {
552585
// Per-task `last_file_bytes` tracks bytes reported for the
@@ -593,6 +626,7 @@ async fn copy_volumes_with_progress(
593626
&src_vol,
594627
&source_owned,
595628
source_is_dir_hint,
629+
source_size_hint,
596630
&dst_vol,
597631
&dest_owned,
598632
&state_clone,
@@ -672,7 +706,7 @@ async fn copy_volumes_with_progress(
672706
if let Ok(dest_meta) = dest_volume.get_metadata(&dest_item_path).await {
673707
// Reuse the per-source hint from the scan; see note in the
674708
// concurrent path for the fallback.
675-
let source_is_dir = source_is_directory.get(source_path).copied().unwrap_or(false);
709+
let source_is_dir = source_hints.get(source_path).map(|h| h.is_directory).unwrap_or(false);
676710
let dest_is_dir = dest_meta.is_directory;
677711
if source_is_dir && dest_is_dir {
678712
log::debug!(
@@ -766,11 +800,14 @@ async fn copy_volumes_with_progress(
766800

767801
last_dest_path = Some(dest_item_path.clone());
768802

769-
let source_is_dir_hint = source_is_directory.get(source_path).copied().unwrap_or(false);
803+
let hint = source_hints.get(source_path).copied().unwrap_or_default();
804+
let source_is_dir_hint = hint.is_directory;
805+
let source_size_hint = if hint.is_directory { None } else { Some(hint.size) };
770806
match copy_single_path(
771807
&source_volume,
772808
source_path,
773809
source_is_dir_hint,
810+
source_size_hint,
774811
&dest_volume,
775812
&dest_item_path,
776813
state,

apps/desktop/src-tauri/src/file_system/write_operations/volume_move.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ pub async fn move_between_volumes(
186186
&source_volume,
187187
source_path,
188188
source_is_dir,
189+
// Move has no scan phase to cache a size hint. The SMB
190+
// compound fast-path falls through to streaming cleanly
191+
// when the hint is missing.
192+
None,
189193
&dest_volume,
190194
&dest_item,
191195
&state,

0 commit comments

Comments
 (0)