Skip to content

Commit 161fda9

Browse files
committed
šŸ‘Œ Unify task timeout into broker.task_timeout config option (#7284)
Replace `rmq.task_timeout` with a broker-agnostic `broker.task_timeout` option used by both ZMQ and RabbitMQ backends. The old option is kept with a `deprecated_by` marker on the Field, enabling generic deprecation handling in `Manager.get_option` and `verdi config set`.
1 parent df77935 commit 161fda9

11 files changed

Lines changed: 56 additions & 9 deletions

File tree

ā€Ždocs/source/howto/installation.rstā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ To view all configuration options set for the current profile:
158158
logging.paramiko_loglevel default WARNING
159159
logging.plumpy_loglevel default WARNING
160160
logging.sqlalchemy_loglevel default WARNING
161-
rmq.task_timeout default 10
161+
broker.task_timeout default 10
162162
runner.poll.interval profile 50
163163
transport.task_maximum_attempts global 6
164164
transport.task_retry_initial_interval default 20

ā€Ždocs/source/installation/guide_complete.rstā€Ž

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ Or when using ``verdi profile setup``:
142142

143143
The ``verdi presto`` command automatically falls back to the ZMQ broker when RabbitMQ is not available on the localhost.
144144

145+
.. tip::
146+
147+
The ``broker.task_timeout`` configuration option controls how long a caller waits for a response from the broker (default: 10 seconds).
148+
This option applies to both the ZMQ and RabbitMQ backends.
149+
It can be changed with ``verdi config set broker.task_timeout <seconds>``.
150+
145151
.. _installation:guide-complete:rabbitmq:
146152

147153
RabbitMQ

ā€Ždocs/source/internals/broker.rstā€Ž

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,10 @@ Timeouts
355355
* - Constant / option
356356
- Default
357357
- Purpose
358-
* - ``zmq.task_timeout`` |br| (``verdi config``)
358+
* - ``broker.task_timeout`` |br| (``verdi config``)
359359
- 10s
360360
- How long a caller waits for a task or RPC response from the broker.
361-
Fires a ``TimeoutError`` on the pending Future. Analogous to ``rmq.task_timeout``.
361+
Fires a ``TimeoutError`` on the pending Future. Replaces the deprecated ``rmq.task_timeout``.
362362
* - ``BROKER_READY_TIMEOUT``
363363
- 10s
364364
- How long ``get_communicator()`` polls for the broker to write its socket files at startup.
@@ -383,7 +383,7 @@ Timeouts
383383
- 5s
384384
- How often the broker service writes its status JSON to disk.
385385

386-
Only ``zmq.task_timeout`` is user-configurable.
386+
Only ``broker.task_timeout`` is user-configurable.
387387
All other values are developer-tunable constants in ``defaults.py``.
388388

389389

ā€Žsrc/aiida/brokers/rabbitmq/broker.pyā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator':
8282
task_exchange=get_task_exchange_name(self._prefix),
8383
task_queue=get_launch_queue_name(self._prefix),
8484
task_prefetch_count=get_config_option('daemon.worker_process_slots'),
85-
async_task_timeout=get_config_option('rmq.task_timeout'),
85+
async_task_timeout=get_config_option('broker.task_timeout'),
8686
# This is needed because the verdi commands will call this function and when called in unit tests the
8787
# testing_mode cannot be set.
8888
testing_mode=self._profile.is_test_profile,

ā€Žsrc/aiida/brokers/zmq/broker.pyā€Ž

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,11 @@ def get_communicator(self, wait_for_broker: float = BROKER_READY_TIMEOUT) -> Zmq
158158
msg = f'Broker did not become ready within {wait_for_broker}s: {self}'
159159
raise ConnectionError(msg)
160160

161+
from aiida.manage.configuration import get_config_option
162+
161163
self._communicator = ZmqCommunicator(
162164
router_endpoint=router_endpoint,
163-
task_timeout=10,
165+
task_timeout=get_config_option('broker.task_timeout'),
164166
)
165167
self._communicator.start()
166168

ā€Žsrc/aiida/brokers/zmq/defaults.pyā€Ž

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"""Default constants for the ZMQ broker.
1010
1111
These are **developer-tunable** defaults, not exposed to end users via
12-
``verdi config``.
12+
``verdi config``. User-facing options live in the config schema
13+
(e.g. ``broker.task_timeout``).
1314
"""
1415

1516
from __future__ import annotations

ā€Žsrc/aiida/cmdline/commands/cmd_config.pyā€Ž

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ def verdi_config_set(ctx, option, value, globally, append, remove):
160160
elif option.valid_type == typing.List[str]:
161161
value = [value]
162162

163+
# Warn about deprecated option names
164+
if option.deprecated_by:
165+
echo.echo_warning(f'`{option.name}` is deprecated, use `{option.deprecated_by}` instead.')
166+
163167
# Set the specified option
164168
try:
165169
value = config.set_option(option.name, value, scope=scope)

ā€Žsrc/aiida/manage/configuration/config.pyā€Ž

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,14 @@ class ProfileOptionsSchema(BaseModel, defer_build=True):
115115
transport__task_maximum_attempts: int = Field(
116116
5, description='Maximum number of transport task attempts before a Process is Paused.'
117117
)
118-
rmq__task_timeout: int = Field(10, description='Timeout in seconds for communications with RabbitMQ.')
118+
broker__task_timeout: int = Field(
119+
10, description='Timeout in seconds for task/RPC communications with the message broker.'
120+
)
121+
rmq__task_timeout: int = Field(
122+
10,
123+
description='Timeout in seconds for communications with RabbitMQ.',
124+
json_schema_extra={'deprecated_by': 'broker.task_timeout'},
125+
)
119126
storage__sandbox: Optional[str] = Field(
120127
None, description='Absolute path to the directory to store sandbox folders.'
121128
)

ā€Žsrc/aiida/manage/configuration/options.pyā€Ž

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ def default(self) -> Any:
4646
def description(self) -> str:
4747
return self._field.description
4848

49+
@property
50+
def deprecated_by(self) -> str | None:
51+
"""Return the name of the option that replaces this one, or ``None``."""
52+
return self._schema.get('deprecated_by')
53+
4954
@property
5055
def global_only(self) -> bool:
5156
from .config import ProfileOptionsSchema

ā€Žsrc/aiida/manage/configuration/schema/config-v9.schema.jsonā€Ž

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,18 @@
143143
"description": "Toggle whether the profile can be specified in requests submitted to the REST API",
144144
"global_only": true
145145
},
146+
"broker.task_timeout": {
147+
"type": "integer",
148+
"default": 10,
149+
"minimum": 1,
150+
"description": "Timeout in seconds for task/RPC communications with the message broker"
151+
},
146152
"rmq.task_timeout": {
147153
"type": "integer",
148154
"default": 10,
149155
"minimum": 1,
150-
"description": "Timeout in seconds for communications with RabbitMQ"
156+
"description": "Timeout in seconds for communications with RabbitMQ",
157+
"deprecated_by": "broker.task_timeout"
151158
},
152159
"storage.sandbox": {
153160
"type": "string",

0 commit comments

Comments
Ā (0)
⚔