@@ -118,13 +118,13 @@ struct LoggedClientGuard<'a> {
118118impl < ' a > std:: ops:: Deref for LoggedClientGuard < ' a > {
119119 type Target = Option < SmbClient > ;
120120 fn deref ( & self ) -> & Self :: Target {
121- & * self . inner
121+ & self . inner
122122 }
123123}
124124
125125impl < ' a > std:: ops:: DerefMut for LoggedClientGuard < ' a > {
126126 fn deref_mut ( & mut self ) -> & mut Self :: Target {
127- & mut * self . inner
127+ & mut self . inner
128128 }
129129}
130130
@@ -4199,4 +4199,349 @@ mod tests {
41994199 final_fds
42004200 ) ;
42014201 }
4202+
4203+ // ── SMB streaming-write regression test ────────────────────────────
4204+ //
4205+ // Helpers + one `#[ignore]`d integration test that guards against the
4206+ // streaming-write deadlock fixed in commit `efb15479`. See the docstring
4207+ // on `smb_integration_concurrent_streaming_writes_no_deadlock` for the
4208+ // full story.
4209+
4210+ /// All test artifacts on the SMB share live under this prefix. The
4211+ /// cleanup helper refuses to delete anything that doesn't start with it.
4212+ const TEST_PREFIX_ROOT : & str = "_test/cmdr-regression-" ;
4213+
4214+ /// Captures `client-mutex:` (cmdr) and `recv:` (smb2 receiver loop)
4215+ /// debug lines into bounded ring buffers so a hung test's panic message
4216+ /// can include the last ~30 lines from each stream. That's invaluable
4217+ /// for diagnosing a future regression. Installed via `log::set_logger`
4218+ /// once per process; subsequent installs are no-ops.
4219+ struct MutexCaptureLogger {
4220+ mutex_lines : std:: sync:: Mutex < std:: collections:: VecDeque < String > > ,
4221+ recv_lines : std:: sync:: Mutex < std:: collections:: VecDeque < String > > ,
4222+ }
4223+ impl log:: Log for MutexCaptureLogger {
4224+ fn enabled ( & self , _md : & log:: Metadata ) -> bool {
4225+ true
4226+ }
4227+ fn log ( & self , record : & log:: Record ) {
4228+ let msg = format ! ( "{}" , record. args( ) ) ;
4229+ let target = record. target ( ) ;
4230+ // `client-mutex:` lines come from smb.rs via `log::debug!` with
4231+ // the module-path target (`cmdr_lib::file_system::volume::smb`).
4232+ // `recv:` lines come from the smb2 receiver loop with an `smb2::*`
4233+ // target.
4234+ if msg. starts_with ( "client-mutex:" ) {
4235+ let mut q = self . mutex_lines . lock ( ) . unwrap ( ) ;
4236+ if q. len ( ) >= 200 {
4237+ q. pop_front ( ) ;
4238+ }
4239+ q. push_back ( format ! ( "[{}] {}" , target, msg) ) ;
4240+ } else if msg. starts_with ( "recv:" ) || ( target. starts_with ( "smb2" ) && msg. contains ( "recv" ) ) {
4241+ let mut q = self . recv_lines . lock ( ) . unwrap ( ) ;
4242+ if q. len ( ) >= 200 {
4243+ q. pop_front ( ) ;
4244+ }
4245+ q. push_back ( format ! ( "[{}] {}" , target, msg) ) ;
4246+ }
4247+ // The captured ring buffers are the diagnostic. We deliberately
4248+ // skip mirroring to stderr: `eprintln!` is denied crate-wide,
4249+ // and re-emitting through `log::*` would recurse into this same
4250+ // logger (and the mutex above) on every call.
4251+ }
4252+ fn flush ( & self ) { }
4253+ }
4254+
4255+ static MUTEX_CAPTURE_LOGGER : OnceLock < & ' static MutexCaptureLogger > = OnceLock :: new ( ) ;
4256+
4257+ fn install_mutex_capture_logger ( ) -> & ' static MutexCaptureLogger {
4258+ if let Some ( l) = MUTEX_CAPTURE_LOGGER . get ( ) {
4259+ return l;
4260+ }
4261+ let leaked: & ' static MutexCaptureLogger = Box :: leak ( Box :: new ( MutexCaptureLogger {
4262+ mutex_lines : std:: sync:: Mutex :: new ( std:: collections:: VecDeque :: with_capacity ( 200 ) ) ,
4263+ recv_lines : std:: sync:: Mutex :: new ( std:: collections:: VecDeque :: with_capacity ( 200 ) ) ,
4264+ } ) ) ;
4265+ // Best-effort: if another logger is already installed, ignore.
4266+ let _ = log:: set_logger ( leaked) ;
4267+ log:: set_max_level ( log:: LevelFilter :: Debug ) ;
4268+ let _ = MUTEX_CAPTURE_LOGGER . set ( leaked) ;
4269+ leaked
4270+ }
4271+
4272+ /// Deletes every file under `unique_prefix_smb` and then the directory
4273+ /// itself. Safety: refuses any path that doesn't start with
4274+ /// `TEST_PREFIX_ROOT`, both at the top level and per entry, so a logic
4275+ /// bug in the caller can never reach outside the regression sandbox.
4276+ /// Called explicitly at the end of each pass (best effort: logs but
4277+ /// never overrides the test outcome).
4278+ async fn cleanup_test_prefix ( vol : & SmbVolume , mount_path : & Path , unique_prefix_smb : & str ) {
4279+ assert ! (
4280+ unique_prefix_smb. starts_with( TEST_PREFIX_ROOT ) ,
4281+ "cleanup_test_prefix: refusing to clean a prefix outside {TEST_PREFIX_ROOT:?}: {unique_prefix_smb:?}"
4282+ ) ;
4283+ let dir_abs = mount_path. join ( unique_prefix_smb. trim_start_matches ( '/' ) ) ;
4284+ let rel_of = |abs : & Path | -> String {
4285+ abs. to_string_lossy ( )
4286+ . strip_prefix ( mount_path. to_string_lossy ( ) . as_ref ( ) )
4287+ . map ( |s| s. trim_start_matches ( '/' ) . to_string ( ) )
4288+ . unwrap_or_else ( || abs. to_string_lossy ( ) . to_string ( ) )
4289+ } ;
4290+ match vol. list_directory_impl ( & dir_abs) . await {
4291+ Ok ( entries) => {
4292+ for entry in entries {
4293+ let abs = dir_abs. join ( & entry. name ) ;
4294+ let rel = rel_of ( & abs) ;
4295+ if !rel. starts_with ( TEST_PREFIX_ROOT ) {
4296+ log:: warn!( "cleanup_test_prefix: refusing to delete {rel} (outside prefix)" ) ;
4297+ continue ;
4298+ }
4299+ if let Err ( e) = vol. delete ( & abs) . await {
4300+ log:: warn!( "cleanup_test_prefix: failed to delete {rel}: {e:?}" ) ;
4301+ }
4302+ }
4303+ }
4304+ Err ( e) => log:: warn!( "cleanup_test_prefix: list_directory_impl failed for {dir_abs:?}: {e:?}" ) ,
4305+ }
4306+ let rel_dir = rel_of ( & dir_abs) ;
4307+ if rel_dir. starts_with ( TEST_PREFIX_ROOT )
4308+ && let Err ( e) = vol. delete ( & dir_abs) . await
4309+ {
4310+ log:: warn!( "cleanup_test_prefix: failed to delete prefix dir {rel_dir}: {e:?}" ) ;
4311+ }
4312+ }
4313+
4314+ /// Connects to a Docker SMB fixture's `public` share at `127.0.0.1:port`
4315+ /// as guest. `mount_label` becomes the synthetic mount path
4316+ /// (`/Volumes/<label>`); no real OS mount is needed because the test
4317+ /// only drives the smb2 path.
4318+ async fn connect_docker_smb_volume ( port : u16 , mount_label : & str ) -> SmbVolume {
4319+ let mount_path = format ! ( "/Volumes/{mount_label}" ) ;
4320+ connect_smb_volume ( "public" , & mount_path, "127.0.0.1" , "public" , None , None , port)
4321+ . await
4322+ . unwrap_or_else ( |e| panic ! ( "connect to 127.0.0.1:{port} failed: {e:?}" ) )
4323+ }
4324+
4325+ /// One pass of the concurrent-streaming-write scenario:
4326+ /// - generate `n_files` source files of `file_size` bytes in a tempdir,
4327+ /// - pre-upload `n_conflicts` of them to the destination at the same
4328+ /// size so `OverwriteSmaller` resolves them as Skip,
4329+ /// - run `copy_volumes_with_progress` over all `n_files` with a timeout,
4330+ /// - on timeout, panic with the last 30 mutex/recv lines as a diagnostic
4331+ /// dump,
4332+ /// - clean up the unique prefix directory either way.
4333+ async fn run_concurrent_write_pass (
4334+ vol : Arc < SmbVolume > ,
4335+ mount_path : & Path ,
4336+ logger : & ' static MutexCaptureLogger ,
4337+ n_files : usize ,
4338+ n_conflicts : usize ,
4339+ file_size : usize ,
4340+ timeout_secs : u64 ,
4341+ ) -> Duration {
4342+ use crate :: file_system:: write_operations:: {
4343+ CollectorEventSink , VolumeCopyConfig , WriteOperationState , copy_volumes_with_progress,
4344+ } ;
4345+
4346+ assert ! ( n_conflicts <= n_files) ;
4347+
4348+ let ts = std:: time:: SystemTime :: now ( )
4349+ . duration_since ( std:: time:: UNIX_EPOCH )
4350+ . unwrap ( )
4351+ . as_secs ( ) ;
4352+ let unique_prefix = format ! ( "{TEST_PREFIX_ROOT}{ts}-n{n_files}" ) ;
4353+
4354+ let dest_dir_abs = mount_path. join ( unique_prefix. trim_start_matches ( '/' ) ) ;
4355+ let _ = vol. create_directory ( & mount_path. join ( "_test" ) ) . await ;
4356+ vol. create_directory ( & dest_dir_abs)
4357+ . await
4358+ . expect ( "create unique dest dir" ) ;
4359+
4360+ let local_dir = tempfile:: TempDir :: new ( ) . expect ( "tempdir" ) ;
4361+ for i in 0 ..n_files {
4362+ let name = format ! ( "f_{i:04}.bin" ) ;
4363+ let path = local_dir. path ( ) . join ( & name) ;
4364+ // Distinct content per file (byte = i % 251) + an 8-byte seed
4365+ // prefix, so identical-size pre-uploads still hash-differ from
4366+ // their sources should we ever want to verify content.
4367+ let mut buf = vec ! [ 0u8 ; file_size] ;
4368+ buf[ ..8 ] . copy_from_slice ( & ( i as u64 ) . to_le_bytes ( ) ) ;
4369+ for b in buf. iter_mut ( ) . skip ( 8 ) {
4370+ * b = ( i % 251 ) as u8 ;
4371+ }
4372+ std:: fs:: write ( & path, & buf) . expect ( "write source" ) ;
4373+ }
4374+
4375+ log:: info!( "regression: pre-uploading {n_conflicts} conflicting files to {unique_prefix}" ) ;
4376+ for i in 0 ..n_conflicts {
4377+ let name = format ! ( "f_{i:04}.bin" ) ;
4378+ let dest_abs = dest_dir_abs. join ( & name) ;
4379+ let buf = std:: fs:: read ( local_dir. path ( ) . join ( & name) ) . unwrap ( ) ;
4380+ let stream: Box < dyn VolumeReadStream > = Box :: new ( InlineReadStream :: new ( buf. clone ( ) ) ) ;
4381+ let size = buf. len ( ) as u64 ;
4382+ let progress = |_a : u64 , _b : u64 | -> std:: ops:: ControlFlow < ( ) > { std:: ops:: ControlFlow :: Continue ( ( ) ) } ;
4383+ let bytes = vol
4384+ . write_from_stream ( & dest_abs, size, stream, & progress)
4385+ . await
4386+ . unwrap_or_else ( |e| panic ! ( "pre-upload {name} failed: {e:?}" ) ) ;
4387+ assert_eq ! ( bytes, size, "pre-upload size mismatch" ) ;
4388+ }
4389+ log:: info!( "regression: pre-upload done" ) ;
4390+
4391+ let src_vol: Arc < dyn Volume > = Arc :: new ( crate :: file_system:: volume:: LocalPosixVolume :: new (
4392+ "regression-src" ,
4393+ local_dir. path ( ) . to_path_buf ( ) ,
4394+ ) ) ;
4395+ let dst_vol: Arc < dyn Volume > = vol. clone ( ) as Arc < dyn Volume > ;
4396+ let source_rel_paths: Vec < PathBuf > = ( 0 ..n_files) . map ( |i| PathBuf :: from ( format ! ( "f_{i:04}.bin" ) ) ) . collect ( ) ;
4397+
4398+ let state = Arc :: new ( WriteOperationState :: new ( Duration :: from_millis ( 200 ) ) ) ;
4399+ let events = Arc :: new ( CollectorEventSink :: new ( ) ) ;
4400+ let config = VolumeCopyConfig {
4401+ conflict_resolution : crate :: file_system:: write_operations:: ConflictResolution :: OverwriteSmaller ,
4402+ ..VolumeCopyConfig :: default ( )
4403+ } ;
4404+
4405+ let start = std:: time:: Instant :: now ( ) ;
4406+ log:: info!(
4407+ "regression: spawning copy n_files={n_files} n_conflicts={n_conflicts} size={file_size} timeout={timeout_secs}s"
4408+ ) ;
4409+
4410+ let res = tokio:: time:: timeout (
4411+ Duration :: from_secs ( timeout_secs) ,
4412+ copy_volumes_with_progress (
4413+ events. clone ( ) ,
4414+ "regression-op" ,
4415+ & state,
4416+ Arc :: clone ( & src_vol) ,
4417+ & source_rel_paths,
4418+ Arc :: clone ( & dst_vol) ,
4419+ & dest_dir_abs,
4420+ & config,
4421+ ) ,
4422+ )
4423+ . await ;
4424+
4425+ let elapsed = start. elapsed ( ) ;
4426+
4427+ let panic_msg: Option < String > = match res {
4428+ Ok ( Ok ( ( ) ) ) => {
4429+ log:: info!( "regression: copy completed in {elapsed:?}" ) ;
4430+ None
4431+ }
4432+ Ok ( Err ( e) ) => Some ( format ! ( "regression: copy failed in {elapsed:?}: {e:?}" ) ) ,
4433+ Err ( _) => {
4434+ let tail = |q : & std:: sync:: Mutex < std:: collections:: VecDeque < String > > | -> Vec < String > {
4435+ let q = q. lock ( ) . unwrap ( ) ;
4436+ let n = q. len ( ) . min ( 30 ) ;
4437+ q. iter ( ) . skip ( q. len ( ) - n) . cloned ( ) . collect ( )
4438+ } ;
4439+ let mutex_dump = tail ( & logger. mutex_lines ) ;
4440+ let recv_dump = tail ( & logger. recv_lines ) ;
4441+ let last_ticket = CLIENT_LOCK_TICKET . load ( Ordering :: Relaxed ) ;
4442+ Some ( format ! (
4443+ "regression: HANG after {:?} (timeout={}s) n_files={} n_conflicts={} last_ticket={}\n \
4444+ ── last {} client-mutex lines ──\n {}\n ── last {} recv lines ──\n {}\n ",
4445+ elapsed,
4446+ timeout_secs,
4447+ n_files,
4448+ n_conflicts,
4449+ last_ticket,
4450+ mutex_dump. len( ) ,
4451+ mutex_dump. join( "\n " ) ,
4452+ recv_dump. len( ) ,
4453+ recv_dump. join( "\n " ) ,
4454+ ) )
4455+ }
4456+ } ;
4457+
4458+ cleanup_test_prefix ( & vol, mount_path, & unique_prefix) . await ;
4459+
4460+ if let Some ( m) = panic_msg {
4461+ panic ! ( "{m}" ) ;
4462+ }
4463+ elapsed
4464+ }
4465+
4466+ /// Guards the invariant that concurrent streaming writes through
4467+ /// `SmbVolume::write_from_stream` complete without deadlocking.
4468+ ///
4469+ /// Uses the smb2 `smb-maxreadsize` fixture (max_write = 64 KB) so every
4470+ /// 1 MB write exceeds the server's max_write and is forced through the
4471+ /// streaming-fallback (FileWriter) path. That's the path that
4472+ /// historically nested a per-write lock under the client mutex and could
4473+ /// starve the receiver task to a halt.
4474+ ///
4475+ /// Shape (200 files, 140 OverwriteSmaller conflicts + 60 actual copies,
4476+ /// concurrency=8) mirrors the production workload that originally
4477+ /// surfaced the bug, where mixed conflict-skip / write iterations on a
4478+ /// shared SmbClient stressed the lock-ordering pattern hardest.
4479+ ///
4480+ /// Run with:
4481+ /// docker compose -f ~/projects-git/vdavid/smb2/tests/docker/internal/docker-compose.yml up -d smb-maxreadsize
4482+ /// cargo nextest run -p cmdr smb_integration_concurrent_streaming_writes_no_deadlock --run-ignored all
4483+ ///
4484+ /// Originally hung at a QNAP NAS for >5 minutes before the fix. See
4485+ /// commit `ddc71cfb` (lock-ticket instrumentation) and commit `efb15479`
4486+ /// (the fix: `write_from_stream` no longer holds the client mutex
4487+ /// across the streaming write). On post-fix code each pass completes in
4488+ /// roughly 5–15 s.
4489+ ///
4490+ /// Follow-up: promote `smb-maxreadsize` from smb2's `internal` fixtures
4491+ /// to its consumer-class fixtures (vendored into cmdr's `.compose/`)
4492+ /// so this test can graduate to CI without manual fixture setup.
4493+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
4494+ #[ ignore = "Requires docker-compose smb-maxreadsize on port 10454; manual run only" ]
4495+ async fn smb_integration_concurrent_streaming_writes_no_deadlock ( ) {
4496+ use futures_util:: FutureExt ;
4497+
4498+ let logger = install_mutex_capture_logger ( ) ;
4499+ let prior_concurrency = crate :: file_system:: smb_concurrency ( ) ;
4500+ crate :: file_system:: set_smb_concurrency ( 8 ) ;
4501+
4502+ let vol = Arc :: new ( connect_docker_smb_volume ( 10454 , "cmdr-regression-maxreadsize" ) . await ) ;
4503+ let mount_path = vol. mount_path . clone ( ) ;
4504+
4505+ let result = std:: panic:: AssertUnwindSafe ( run_concurrent_write_pass (
4506+ Arc :: clone ( & vol) ,
4507+ & mount_path,
4508+ logger,
4509+ /* n_files = */ 200 ,
4510+ /* n_conflicts = */ 140 ,
4511+ /* file_size = */ 1024 * 1024 ,
4512+ /* timeout_secs = */ 120 ,
4513+ ) )
4514+ . catch_unwind ( )
4515+ . await ;
4516+
4517+ // Always restore concurrency, even on panic, before resuming the unwind.
4518+ crate :: file_system:: set_smb_concurrency ( prior_concurrency) ;
4519+ if let Err ( p) = result {
4520+ std:: panic:: resume_unwind ( p) ;
4521+ }
4522+ }
4523+
4524+ #[ test]
4525+ #[ should_panic( expected = "refusing to clean a prefix outside" ) ]
4526+ fn cleanup_test_prefix_rejects_unsafe_prefix ( ) {
4527+ // The cleanup helper is async, but the safety assert fires before
4528+ // any await point. Poll the future once via a no-op waker so we
4529+ // hit the assert without needing a runtime.
4530+ use std:: task:: Context ;
4531+ let vol = make_test_volume ( ) ;
4532+ let mount = PathBuf :: from ( "/Volumes/TestShare" ) ;
4533+ let mut fut = Box :: pin ( cleanup_test_prefix ( & vol, & mount, "etc/passwd" ) ) ;
4534+ let waker = futures_util:: task:: noop_waker ( ) ;
4535+ let mut cx = Context :: from_waker ( & waker) ;
4536+ let _ = fut. as_mut ( ) . poll ( & mut cx) ; // panics in the assert
4537+ }
4538+
4539+ #[ test]
4540+ fn test_prefix_root_is_safely_scoped ( ) {
4541+ // Static check: the prefix lives under `_test/` and clearly
4542+ // identifies cmdr's regression test, so a future reader (or a
4543+ // misconfigured share) can recognize stale artifacts at a glance.
4544+ assert ! ( TEST_PREFIX_ROOT . starts_with( "_test/" ) ) ;
4545+ assert ! ( TEST_PREFIX_ROOT . contains( "cmdr-regression-" ) ) ;
4546+ }
42024547}
0 commit comments