Skip to content

Commit c989a6d

Browse files
aofei-liuclaude
andcommitted
refactor: replace multiprocessing.Pool with ThreadPoolExecutor in process_single_position
The transform functions passed to process_single_position (numpy, scipy, PyTorch, ANTsPy) all release the GIL, making threads sufficient for parallelism. ThreadPoolExecutor avoids the overhead of spawn-based multiprocessing (re-importing libraries, serializing arguments) and plays better with Nextflow's per-task resource accounting. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9beb2a4 commit c989a6d

1 file changed

Lines changed: 10 additions & 12 deletions

File tree

src/iohub/ngff/utils.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import inspect
44
import itertools
5-
import multiprocessing as mp
5+
import os
66
from collections import defaultdict
77
from collections.abc import Callable, Sequence
8+
from concurrent.futures import ThreadPoolExecutor
89
from functools import partial
910
from pathlib import Path
1011
from typing import Any, Literal
@@ -395,21 +396,18 @@ def process_single_position(
395396
output_position_path,
396397
**kwargs,
397398
)
398-
num_processes = min(num_processes, len(flat_iterable), mp.cpu_count())
399-
click.echo(f"\nStarting multiprocess pool with {num_processes} processes")
400-
if num_processes <= 1:
401-
# Run serially — Pool(1) with spawn unnecessarily forks a subprocess
399+
num_workers = min(num_processes, len(flat_iterable), os.cpu_count())
400+
click.echo(f"\nStarting thread pool with {num_workers} workers")
401+
if num_workers <= 1:
402402
for args in flat_iterable:
403403
partial_apply_transform_to_czyx_and_save(*args)
404404
else:
405-
# NOTE: use spawn to work around tensorstore#61
406-
context = mp.get_context("spawn")
407-
with context.Pool(num_processes) as p:
408-
p.starmap(
409-
partial_apply_transform_to_czyx_and_save,
405+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
406+
list(executor.map(
407+
lambda args: partial_apply_transform_to_czyx_and_save(*args),
410408
flat_iterable,
411-
)
412-
click.echo("Shut down multiprocess pool")
409+
))
410+
click.echo("Shut down thread pool")
413411

414412

415413
# -- Pure utility functions ------------------------------------------------

0 commit comments

Comments
 (0)