Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/iohub/ngff/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import inspect
import itertools
import multiprocessing as mp
import os
from collections import defaultdict
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Any, Literal
Expand Down Expand Up @@ -395,21 +396,18 @@ def process_single_position(
output_position_path,
**kwargs,
)
num_processes = min(num_processes, len(flat_iterable), mp.cpu_count())
click.echo(f"\nStarting multiprocess pool with {num_processes} processes")
if num_processes <= 1:
# Run serially — Pool(1) with spawn unnecessarily forks a subprocess
num_workers = min(num_processes, len(flat_iterable), os.cpu_count())
Comment thread
aofei-liu marked this conversation as resolved.
Outdated
click.echo(f"\nStarting thread pool with {num_workers} workers")
if num_workers <= 1:
for args in flat_iterable:
partial_apply_transform_to_czyx_and_save(*args)
else:
# NOTE: use spawn to work around tensorstore#61
context = mp.get_context("spawn")
with context.Pool(num_processes) as p:
p.starmap(
partial_apply_transform_to_czyx_and_save,
with ThreadPoolExecutor(max_workers=num_workers) as executor:
list(executor.map(
Comment thread
aofei-liu marked this conversation as resolved.
lambda args: partial_apply_transform_to_czyx_and_save(*args),
flat_iterable,
)
click.echo("Shut down multiprocess pool")
))
Comment thread
aofei-liu marked this conversation as resolved.
click.echo("Shut down thread pool")


# -- Pure utility functions ------------------------------------------------
Expand Down
Loading