refactor: replace multiprocessing.Pool with ThreadPoolExecutor in process_single_position#396
Conversation
…cess_single_position The transform functions passed to process_single_position (numpy, scipy, PyTorch, ANTsPy) all release the GIL, making threads sufficient for parallelism. ThreadPoolExecutor avoids the overhead of spawn-based multiprocessing (re-importing libraries, serializing arguments) and plays better with Nextflow's per-task resource accounting. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Refactors process_single_position to use a ThreadPoolExecutor instead of multiprocessing.Pool, aiming to reduce spawn/pickling overhead and improve compatibility when running under external orchestrators (e.g., Nextflow).
Changes:
- Replaced
multiprocessing.Poolusage withconcurrent.futures.ThreadPoolExecutorfor per-(time, channel) parallelism. - Swapped
multiprocessingimport foros+ThreadPoolExecutor. - Updated logging messages to reflect thread-pool usage.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Guard against os.cpu_count() returning None in containerized environments by falling back to 1. Add test_process_single_position_threaded to exercise the ThreadPoolExecutor code path with num_processes=2. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Looks good, the only thing is if we can wrap this new test in the previous one together. maybe set a new variable in @given for thread_count or something like that.
Extract shared logic into _run_process_single_position helper so the threaded test delegates instead of duplicating the entire test body. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove _run_process_single_position helper and merge the two test functions into a single test_process_single_position that samples num_processes from [1, 2]. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add num_threads parameter to process_single_position and deprecate num_processes. When num_processes is passed, a DeprecationWarning is emitted and the value is forwarded to num_threads if num_threads is smaller. Update tests to use num_threads. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
srivarra
left a comment
There was a problem hiding this comment.
Looks great! thank you!
iohub 0.3.1+ replaced multiprocessing.Pool with ThreadPoolExecutor in process_single_position (czbiohub-sf/iohub#396), renaming num_processes to num_threads. The old num_threads kwarg was leaking through **kwargs to transform functions, causing TypeError in run-flat-field. iohub 0.3.2 also restores OME-Zarr v0.4 write support. Both 0.3.1 and 0.3.2 require Python >=3.12. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
This is a big change with a lot of downstream consequences. @srivarra please always get review from team members running reconstructions before merging structural changes to |
| click.echo(f"\nStarting multiprocess pool with {num_processes} processes") | ||
| if num_processes <= 1: | ||
| # Run serially — Pool(1) with spawn unnecessarily forks a subprocess | ||
| cpu_count = os.cpu_count() or 1 |
There was a problem hiding this comment.
this will fail in the BRUNO. It will try to call the node's num of CPUs~128 depending on the node.
There was a problem hiding this comment.
* refactor: add num_workers/use_threads to process_single_position PR #396 replaced mp.Pool with ThreadPoolExecutor on the assumption that the transforms passed to process_single_position release the GIL and threads suffice. That holds for I/O-bound callers, but not for tensor-heavy CPU torch workloads (deskew, register, deconvolve): under threads, all concurrent task allocations live in one address space, and torch's CPU caching allocator never returns memory to the OS, so peak RSS climbs past the slurm cgroup limit. Process workers are still needed for those cases. Introduce two new public params and deprecate the old ones: * num_workers (default 1) — replaces num_processes (#396 already deprecated this) and num_threads. Both legacy names emit a DeprecationWarning and forward to num_workers. * use_threads (default False) — pick between ThreadPoolExecutor and ProcessPoolExecutor. Behaviour: * num_workers <= 1 -> serial loop in the calling process (matches the short-circuit added in #396). * num_workers > 1, use_threads=True -> ThreadPoolExecutor (the #396 default). * num_workers > 1, use_threads=False -> ProcessPoolExecutor with the spawn context (the new default). Two reasons to use ProcessPoolExecutor (and not mp.Pool, like before #396): 1. Silent worker death — a slurm cgroup OOM-kill of one worker leaves mp.Pool.starmap waiting forever for a result that never comes. ProcessPoolExecutor surfaces this as BrokenProcessPool, so the slurm job fails fast with a real traceback instead of hanging until walltime. 2. Spawn (not fork) — tensorstore's internal C++ threads aren't fork-safe (google/tensorstore#61), and multiprocessing defaults to fork on Linux. Verified end-to-end on a 57-timepoint deskew run (171 (T,C) tasks per fov, 8 workers): both pool variants and the serial path produce bit-identical output, and an intentional OOM under PPE fails within ~1 minute with BrokenProcessPool instead of hanging. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: close input zarr handle and short-circuit nan/zero check Two small cleanups in iohub.ngff.utils that surfaced while debugging deskew memory pressure: 1. `_apply_transform_to_czyx` opens the input zarr without a context manager, leaking the zarr group / metadata cache for the lifetime of the worker. Wrap in `with open_ome_zarr(...)` so the handle is released after each task. No measurable memory effect at the cgroup level — file-handle hygiene fix; matters most for very long task queues. 2. `_check_nan_n_zeros` materialised a full boolean mask of the input volume (via `np.all(arr == 0)`) before reducing it. Replace with `np.any(arr)`, which short-circuits in the numpy C reduction kernel as soon as it sees a truthy element and does not allocate a temp mask. The all-NaN branch only runs when `np.any` returned True (i.e. the array contains content or NaNs); skip it entirely for integer dtypes that can't represent NaN. Behaviour-preserving: produces the same return value as the previous implementation for all 3D and 4D inputs, including the per-channel "any channel empty" semantics for 4D arrays. Verified end-to-end on the deskew workload; bit-identical outputs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: parametrise test_process_single_position over num_workers/use_threads Renames the hypothesis strategy from `num_threads` to `num_workers` to match the new public API, and adds a `use_threads` boolean strategy so the test exercises both the ProcessPoolExecutor (default) and ThreadPoolExecutor paths. The old test only covered serial + threads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: assert num_processes/num_threads emit DeprecationWarning Adds a parametrized regression test that asserts both legacy kwargs trigger a DeprecationWarning when forwarded to num_workers. The warnings are otherwise invisible at runtime under Python's default filter (which suppresses DeprecationWarning raised from package code), so this is the only practical way to catch a future accidental removal of the shim. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: put repo root on PYTHONPATH so spawn workers can import tests/ `test_process_single_position` parametrises over `use_threads ∈ {True, False}`. With `use_threads=False`, iohub spins up a `ProcessPoolExecutor` with the `spawn` context. Spawn children re-initialise sys.path from the runtime defaults plus PYTHONPATH; they do not inherit pytest's `--import-mode=importlib` sys.path manipulation. Unpickling the test-local `dummy_transform` (which lives at `tests.ngff.test_ngff_utils.dummy_transform`) therefore fails with `ModuleNotFoundError: No module named 'tests'` and the worker dies, surfacing as `BrokenProcessPool` in the parent. Fix: prepend the repo root to PYTHONPATH (and to the parent's sys.path for symmetry) in `tests/conftest.py`. Spawn children inherit PYTHONPATH via the OS env, so they can now resolve `tests.ngff.test_ngff_utils` and unpickle the function. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat: honour SLURM_CPUS_PER_TASK when capping num_workers `os.cpu_count()` reports the host's total CPUs, not the cgroup CPU allocation. On a 128-core slurm node where the job was granted only 8 cores, capping `num_workers` at `os.cpu_count()` lets a caller oversubscribe the cgroup. Add `_available_cpus()` that prefers the `SLURM_CPUS_PER_TASK` env var when present and falls back to `os.cpu_count()` otherwise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor!: drop num_processes and num_threads kwargs Both were deprecated in the previous commit ('refactor: add num_workers/use_threads to process_single_position'), with shims that forwarded their values to num_workers. Drop the shims now — anything still passing num_processes / num_threads gets a TypeError pointing at the right argument name, which is more useful than a silent DeprecationWarning that callers may never see (Python suppresses DeprecationWarning raised from package code under the default filter). Removes the corresponding regression test (test_process_single_position_legacy_kwargs_deprecated) and the unused 'warnings' import. BREAKING CHANGE: callers of process_single_position must use num_workers (and, optionally, use_threads) instead of num_processes / num_threads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * revert conftest.py * Revert "revert conftest.py" This reverts commit 0f86c59. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
multiprocessing.Pool(spawn context) withconcurrent.futures.ThreadPoolExecutorinprocess_single_positionmultiprocessingimport, addosandThreadPoolExecutorMotivation
Transform functions are I/O-bound, not GIL-bound
All downstream callers in biahub pass transform functions that release the GIL:
funcpasseddeskew.py_czyx_deskew_datadeconvolve.pydeconvolveconcatenate.pycopy_n_pastesegment.pysegment_dataprocess_data.pybinning_czyxstabilize.pyapply_stabilization_transformregister.pyapply_affine_transformSince the actual computation happens in C extensions (numpy, scipy, ANTsPy) or PyTorch, threads achieve the same parallelism as processes here.
spawn-based multiprocessing has significant overheadThe previous implementation used
mp.get_context("spawn")to work around tensorstore#61.spawnstarts a fresh Python interpreter per worker, which means:funcand all arguments across process boundariesThreads share the parent process memory and avoid all of this overhead. The tensorstore fork-safety issue is also irrelevant with threads since there is no fork.
Better compatibility with Nextflow orchestration
In production, biahub runs
process_single_positioninside Nextflow tasks. Nextflow already parallelizes at the position level, so spawning child processes within each task:With
ThreadPoolExecutor, the Nextflow task is a single process with threads —cpusandmemorydirectives map cleanly to actual resource usage.Test plan
test_process_single_positionhypothesis tests pass