Skip to content

Commit df77935

Browse files
committed
šŸ“š Add internal architecture docs for ZMQ broker (#7284)
Document the process/thread model, socket architecture, message flow, dead peer detection, persistent queue, service files, message types, the AMQP-to-ZMQ mapping, write format and all timeout constants with diagrams.
1 parent 625b1b4 commit df77935

2 files changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 392 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
.. _internal_architecture:broker:
2+
3+
**********
4+
ZMQ Broker
5+
**********
6+
7+
.. versionadded:: 2.9
8+
9+
The ZMQ broker is AiiDA's built-in message broker, replacing the need for an external RabbitMQ service.
10+
It implements the subset of AMQP semantics that AiiDA requires (tasks, RPC, broadcasts) using ZeroMQ as the transport layer.
11+
12+
This page documents the internal architecture for developers working on the broker itself.
13+
For user-facing documentation, see the :ref:`installation guide <installation:guide-complete:broker:zmq>`.
14+
15+
16+
Process and thread architecture
17+
===============================
18+
19+
When ``verdi daemon start`` is run with a ZMQ-configured profile, circus launches two types of processes:
20+
21+
.. code-block:: text
22+
23+
verdi daemon start
24+
└── circus (daemon supervisor)
25+
ā”œā”€ā”€ verdi devel zmq-broker ← broker process (1 instance)
26+
│ └── ZmqBrokerService
27+
│ └── ZmqBrokerServer (single-threaded, zmq.Poller event loop)
28+
│ └── PersistentQueue (file-based task durability)
29+
│
30+
└── verdi daemon worker ← worker process(es) (1..N instances)
31+
└── Runner
32+
└── ZmqBroker
33+
└── ZmqCommunicator
34+
ā”œā”€ā”€ main thread (public API: task_send, rpc_send, ...)
35+
└── loop thread (private asyncio loop with ZMQ DEALER socket)
36+
37+
Key points:
38+
39+
- The **broker process** is single-threaded.
40+
``ZmqBrokerServer`` uses a ``zmq.Poller`` event loop (non-blocking I/O via epoll/kqueue underneath) — no asyncio, no threads.
41+
- Each **worker process** creates a ``ZmqCommunicator`` that runs a private asyncio event loop on a dedicated **background thread**.
42+
All ZMQ socket I/O happens on that thread; public methods schedule work via ``call_soon_threadsafe``, so no locks are needed.
43+
- The broker process is started **before** workers by circus, so its IPC socket is ready when workers connect.
44+
If it isn't ready yet, ``get_communicator()`` polls until the socket file appears.
45+
46+
47+
Module overview
48+
===============
49+
50+
``ZmqCommunicator`` implements ``kiwipy.Communicator`` — the same interface that ``RmqThreadCommunicator`` implements for RabbitMQ.
51+
plumpy and the AiiDA engine are unaware of which broker backend is in use; they only interact through this interface.
52+
53+
.. code-block:: text
54+
55+
src/aiida/brokers/zmq/
56+
ā”œā”€ā”€ broker.py ZmqBroker — the Broker interface for workers
57+
ā”œā”€ā”€ communicator.py ZmqCommunicator — kiwipy.Communicator over ZMQ
58+
ā”œā”€ā”€ server.py ZmqBrokerServer — the broker's message router
59+
ā”œā”€ā”€ service.py ZmqBrokerService — process wrapper (PID, signals, status files)
60+
ā”œā”€ā”€ queue.py PersistentQueue — file-based durable task queue
61+
ā”œā”€ā”€ protocol.py Message types, encoding/decoding, factory functions
62+
└── defaults.py Developer-tunable constants (not user-facing)
63+
64+
65+
Endpoint discovery
66+
==================
67+
68+
Unlike RabbitMQ, the ZMQ broker requires no connection configuration (no host, port, or credentials).
69+
Discovery is file-based: both sides derive the broker directory from the profile UUID.
70+
71+
1. On startup, the broker process writes the IPC socket path to ``~/.aiida/broker/{profile-uuid}/broker.sockets``.
72+
2. When a worker calls ``get_communicator()``, it reads that file to obtain the endpoint (e.g. ``ipc:///tmp/aiida_zmq_xyz/router.sock``).
73+
3. The worker connects its DEALER socket to that endpoint.
74+
75+
This means the ZMQ broker is **local-only** — IPC sockets do not work across machines.
76+
For distributed setups (workers on different hosts), use RabbitMQ.
77+
78+
79+
Socket architecture
80+
===================
81+
82+
All traffic flows through a single ZMQ ROUTER/DEALER socket pair over IPC:
83+
84+
.. code-block:: text
85+
86+
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
87+
│ Worker process 1 │ │ Worker process 2 │
88+
│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ │ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │
89+
│ │ DEALER socket │ │ │ │ DEALER socket │ │
90+
│ │ (async, loop │ │ │ │ (async, loop │ │
91+
│ │ thread) │ │ │ │ thread) │ │
92+
│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │ │ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │
93+
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
94+
│ ipc:// │ ipc://
95+
└────────────┐ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
96+
ā–¼ ā–¼
97+
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
98+
│ Broker process │
99+
│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │
100+
│ │ ROUTER socket │ │
101+
│ │ (sync, zmq.Poller)│ │
102+
│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │
103+
│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │
104+
│ │ PersistentQueue │ │
105+
│ │ (disk storage) │ │
106+
│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │
107+
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
108+
109+
The ROUTER socket auto-prepends the sender's identity to incoming frames, enabling the broker to route replies back to specific clients.
110+
``ROUTER_MANDATORY`` is set so that sending to a disconnected identity raises ``ZMQError`` immediately rather than silently dropping the message.
111+
112+
113+
Message protocol
114+
================
115+
116+
The protocol is defined in ``protocol.py``.
117+
All messages are JSON-encoded dicts sent as single ZMQ frames.
118+
Every message has a ``type`` field (a ``MessageType`` enum value) and an ``id`` (UUID hex).
119+
120+
Message types
121+
-------------
122+
123+
.. list-table::
124+
:header-rows: 1
125+
:widths: 25 20 55
126+
127+
* - Message type
128+
- Direction
129+
- Purpose
130+
* - ``TASK``
131+
- client → broker → worker
132+
- Submit a task for processing. Contains ``body``, ``sender``, ``no_reply``.
133+
* - ``TASK_ACK``
134+
- worker → broker
135+
- Acknowledge successful receipt/completion. Broker removes task from persistent queue.
136+
* - ``TASK_NACK``
137+
- worker → broker
138+
- Negative acknowledgment. Broker requeues the task for redelivery.
139+
* - ``TASK_RESPONSE``
140+
- worker → broker → client
141+
- Return the result of a completed task. Broker forwards to original sender.
142+
* - ``RPC``
143+
- client → broker → recipient
144+
- Remote procedure call to a named recipient. Broker routes by subscriber ID.
145+
* - ``RPC_RESPONSE``
146+
- recipient → broker → client
147+
- Return RPC result. Broker forwards to original caller.
148+
* - ``BROADCAST``
149+
- client → broker → all
150+
- Fan-out to all connected clients (derived from subscriber registries).
151+
* - ``SUBSCRIBE_TASK``
152+
- worker → broker
153+
- Register as a task consumer. Broker adds worker to dispatch pool.
154+
* - ``UNSUBSCRIBE_TASK``
155+
- worker → broker
156+
- Deregister as a task consumer.
157+
* - ``SUBSCRIBE_RPC``
158+
- worker → broker
159+
- Register as an RPC handler under a given identifier.
160+
* - ``UNSUBSCRIBE_RPC``
161+
- worker → broker
162+
- Deregister as an RPC handler.
163+
* - ``PING``
164+
- broker → worker
165+
- Liveness probe. Worker ignores it; failure to deliver (``EHOSTUNREACH``) tells the broker the worker is dead.
166+
167+
AMQP mapping
168+
------------
169+
170+
The protocol maps AMQP concepts to ZMQ message types:
171+
172+
.. code-block:: text
173+
174+
AMQP concept ZMQ broker equivalent
175+
──────────────────────── ──────────────────────────────────
176+
basic.ack TASK_ACK
177+
basic.nack TASK_NACK
178+
consumer with prefetch TASK dispatch to available workers
179+
fanout exchange BROADCAST via ROUTER fan-out
180+
direct exchange RPC routed to named recipient
181+
durable queue PersistentQueue (file-based)
182+
basic.consume SUBSCRIBE_TASK / SUBSCRIBE_RPC
183+
184+
Wire format
185+
-----------
186+
187+
Messages travel as ZMQ multipart frames:
188+
189+
.. code-block:: text
190+
191+
Client (DEALER) sends: [ empty-delimiter | json-payload ]
192+
Broker (ROUTER) receives: [ client-identity | empty-delimiter | json-payload ]
193+
Broker (ROUTER) sends: [ target-identity | empty-delimiter | json-payload ]
194+
195+
The empty delimiter frame is a ZMQ convention for ROUTER/DEALER interop.
196+
The ROUTER socket automatically prepends the sender's identity on receive and uses the first frame as the routing target on send.
197+
198+
Payload fields like ``body`` and ``result`` are opaque to the broker — they are pre-encoded by the sender (typically as YAML strings by plumpy/kiwipy) and passed through without inspection.
199+
200+
201+
Message flow: task submission
202+
=============================
203+
204+
.. code-block:: text
205+
206+
verdi run / submit() Worker (daemon) Broker
207+
──────────────────── ─────────────── ──────
208+
│ │ │
209+
│ task_send(task) │ │
210+
│──── TASK ──────────────────────────────────────────▶│
211+
│ │ push to PersistentQueue
212+
│ │ │
213+
│ │◀─── TASK ────────────│ dispatch
214+
│ │ │
215+
│ │ (process runs...) │
216+
│ │ │
217+
│ │──── TASK_ACK ───────▶│ ack: remove from queue
218+
│ │──── TASK_RESPONSE ──▶│
219+
│ │ │
220+
│◀────────────── TASK_RESPONSE (forwarded) ──────────│
221+
│ │ │
222+
223+
Key details:
224+
225+
- The broker persists the task to disk **before** dispatching it.
226+
If the broker crashes, tasks are recovered from disk on restart.
227+
- Workers delay the ACK until the task Future resolves (see :ref:`deferred ACK <internal_architecture:broker:deferred_ack>` below).
228+
If a worker dies before ACK'ing, the broker detects the disconnect (via ZMTP heartbeats + PING probing) and requeues the task.
229+
- If ``no_reply=True``, no TASK_RESPONSE is sent.
230+
231+
232+
Task dispatch strategy
233+
======================
234+
235+
The broker maintains a ``_available_workers`` deque.
236+
When a worker sends ``SUBSCRIBE_TASK``, it is added to this pool.
237+
The ROUTER socket provides identity-based routing — the broker picks the next available worker from the deque and sends directly to it.
238+
239+
After dispatching a task, the worker is **immediately re-added** to the available pool — it does not wait for the ACK.
240+
This means a single worker can have multiple tasks in flight concurrently, matching RabbitMQ's multi-prefetch behavior.
241+
The ACK only affects the ``PersistentQueue`` (removing the task from disk), not worker availability.
242+
243+
.. code-block:: text
244+
245+
Dispatch loop (runs after every poll):
246+
247+
while available_workers AND pending_tasks:
248+
worker = available_workers.popleft()
249+
task = task_queue.pop() # pending → processing
250+
send task to worker
251+
available_workers.append(worker) # immediately re-available
252+
253+
254+
.. _internal_architecture:broker:deferred_ack:
255+
256+
Deferred ACK pattern
257+
====================
258+
259+
The kiwipy/plumpy stack requires that task subscribers can return ``Future`` objects — plumpy's process runner does this because AiiDA processes are long-running and asynchronous.
260+
The ZMQ communicator must support this to be a compatible ``kiwipy.Communicator``.
261+
262+
When a task subscriber returns a result, two paths are possible:
263+
264+
- **Direct result**: the communicator sends ``TASK_ACK`` and ``TASK_RESPONSE`` immediately.
265+
- **Future**: the communicator stores the task in ``_in_progress_tasks`` and registers a done-callback on the Future.
266+
The ACK is deferred until the Future resolves. This is critical for reliability — if the worker process dies before the Future completes, no ACK is sent, and the broker requeues the task.
267+
268+
.. code-block:: text
269+
270+
Worker receives TASK
271+
│
272+
ā”œā”€ā”€ subscriber returns result directly
273+
│ └── send TASK_ACK + TASK_RESPONSE immediately
274+
│
275+
└── subscriber returns Future
276+
└── store in _in_progress_tasks
277+
└── Future resolves → _finalize_task()
278+
└── send TASK_ACK + TASK_RESPONSE
279+
280+
The same pattern applies to RPC: if the subscriber returns a ``Future``, the response is deferred until it resolves.
281+
282+
283+
Dead worker detection
284+
=====================
285+
286+
Dead worker detection combines ZMQ transport features with application-level logic.
287+
288+
**Detection** (ZMQ built-in):
289+
the ROUTER socket is configured with ``HEARTBEAT_IVL`` and ``HEARTBEAT_TIMEOUT``.
290+
ZMQ sends periodic ping frames at the transport level; if a worker stops responding within the timeout, ZMQ disconnects it internally and emits an ``EVENT_DISCONNECTED`` on the monitor socket we attached to the ROUTER.
291+
292+
The problem is that this event only reports the file descriptor, not which client identity disconnected.
293+
So the event is just a **trigger** — it tells us *someone* died, but not *who*.
294+
295+
**Identification** (our logic):
296+
on each ``EVENT_DISCONNECTED``, ``_handle_disconnect_event`` calls ``_probe_workers``, which sends a ``PING`` message to every worker identity that has in-flight tasks.
297+
With ``ROUTER_MANDATORY`` set on the socket, sending to a dead identity immediately raises ``ZMQError(EHOSTUNREACH)``.
298+
Any worker that fails the probe is removed via ``_remove_dead_worker``, which requeues all its assigned tasks and cleans up the subscriber registries.
299+
300+
301+
Persistent queue (crash recovery)
302+
=================================
303+
304+
``PersistentQueue`` stores tasks as individual JSON files:
305+
306+
.. code-block:: text
307+
308+
{storage_path}/tasks/
309+
ā”œā”€ā”€ pending/ ← waiting to be dispatched
310+
│ └── {timestamp}_{id}.json
311+
└── processing/ ← dispatched, awaiting ACK
312+
└── {timestamp}_{id}.json
313+
314+
- **push**: writes to ``pending/`` (atomic: write ``.tmp`` then ``rename``)
315+
- **pop**: moves from ``pending/`` to ``processing/``
316+
- **ack**: deletes from ``processing/``
317+
- **nack**: moves back from ``processing/`` to ``pending/`` (front of queue for retry)
318+
- **crash recovery**: on startup, all files in ``processing/`` are moved back to ``pending/``
319+
320+
321+
Service files
322+
=============
323+
324+
``ZmqBrokerService`` manages the broker process lifecycle and writes files that ``ZmqBroker`` (in worker processes) reads to discover the broker:
325+
326+
.. code-block:: text
327+
328+
~/.aiida/broker/{profile-uuid}/
329+
ā”œā”€ā”€ broker.pid "aiida-zmq-broker {pid}" — sentinel + PID for ownership check
330+
ā”œā”€ā”€ broker.status JSON with task counts, updated every STATUS_INTERVAL seconds
331+
ā”œā”€ā”€ broker.sockets path to the temp socket directory
332+
└── storage/ PersistentQueue data
333+
334+
/tmp/aiida_zmq_{random}/
335+
└── router.sock IPC socket (temp dir avoids 107-byte Unix path limit)
336+
337+
338+
Known limitations
339+
=================
340+
341+
- **Single point of failure**: the broker is a single process. If it crashes, no tasks are dispatched until it restarts (circus auto-restarts it). Tasks in the persistent queue survive the crash.
342+
- **Local-only**: IPC sockets do not work across machines. For distributed setups with workers on different hosts, use RabbitMQ.
343+
- **No message TTL**: tasks remain in the queue indefinitely until consumed or manually cleared.
344+
- **No dead letter queue**: NACKed tasks are requeued to the front of the queue. There is no mechanism to route repeatedly-failing tasks to a separate queue.
345+
- **Single ROUTER socket**: all traffic (tasks, RPC, broadcasts) shares one socket. This is sufficient for AiiDA's workload but could become a throughput bottleneck under extreme fan-out.
346+
347+
348+
Timeouts
349+
========
350+
351+
.. list-table::
352+
:header-rows: 1
353+
:widths: 30 10 60
354+
355+
* - Constant / option
356+
- Default
357+
- Purpose
358+
* - ``zmq.task_timeout`` |br| (``verdi config``)
359+
- 10s
360+
- How long a caller waits for a task or RPC response from the broker.
361+
Fires a ``TimeoutError`` on the pending Future. Analogous to ``rmq.task_timeout``.
362+
* - ``BROKER_READY_TIMEOUT``
363+
- 10s
364+
- How long ``get_communicator()`` polls for the broker to write its socket files at startup.
365+
A warning is logged after 5s.
366+
* - ``LOOP_TIMEOUT``
367+
- 5s
368+
- How long the main thread waits when scheduling work onto the communicator's background event loop.
369+
Fires if the loop thread is blocked or dead.
370+
* - ``LOOP_JOIN_TIMEOUT``
371+
- 3s
372+
- How long ``close()`` waits for the loop thread to shut down.
373+
* - ``HEARTBEAT_IVL``
374+
- 2s
375+
- ZMTP heartbeat interval — how often the broker pings connected workers.
376+
* - ``HEARTBEAT_TIMEOUT``
377+
- 6s
378+
- Peer considered dead after no heartbeat response for this duration.
379+
* - ``POLL_TIMEOUT``
380+
- 1s
381+
- Server-side ``zmq.Poller`` timeout per iteration. Controls how quickly the broker responds to shutdown signals.
382+
* - ``STATUS_INTERVAL``
383+
- 5s
384+
- How often the broker service writes its status JSON to disk.
385+
386+
Only ``zmq.task_timeout`` is user-configurable.
387+
All other values are developer-tunable constants in ``defaults.py``.
388+
389+
390+
.. |br| raw:: html
391+
392+
<br/>

0 commit comments

Comments
Ā (0)
⚔