@@ -85,8 +85,7 @@ def __init__(
8585 # rpc_subscribers: identifier -> client_identity (bytes)
8686 self ._rpc_subscribers : dict [str , bytes ] = {}
8787
88- # Pending responses: correlation_id -> (client_identity, timestamp)
89- self ._pending_task_responses : dict [str , tuple [bytes , float ]] = {}
88+ # Pending RPC responses: correlation_id -> (client_identity, timestamp)
9089 self ._pending_rpc_responses : dict [str , tuple [bytes , float ]] = {}
9190
9291 # Task-worker assignments: task_id -> worker_identity
@@ -283,6 +282,14 @@ def _handle_task(self, identity: bytes, msg: dict[str, Any]) -> None:
283282 """Handle incoming task message.
284283
285284 Queue the task and try to dispatch to an available worker.
285+
286+ When the sender expects a reply (``no_reply=False``), an immediate
287+ acknowledgment response is sent back as soon as the task is persisted
288+ to disk. This mirrors RabbitMQ's publisher-confirm behaviour: the
289+ caller learns that the broker accepted the task without having to wait
290+ for a worker to process it. The worker's eventual result (if any) is
291+ *not* forwarded back to the original sender — callers like
292+ ``continue_process`` only need the confirmation, not the outcome.
286293 """
287294 task_id = msg ['id' ]
288295 sender = msg .get ('sender' , '' )
@@ -299,33 +306,27 @@ def _handle_task(self, identity: bytes, msg: dict[str, Any]) -> None:
299306 }
300307 self ._task_queue .push (task_id , task_data )
301308
302- # Track pending response if reply expected
309+ # Send an immediate acknowledgment to the sender so its Future
310+ # resolves without waiting for a worker (matches RabbitMQ semantics).
303311 if not no_reply :
304- self ._pending_task_responses [task_id ] = (identity , time .time ())
312+ response = {
313+ 'type' : MessageType .TASK_RESPONSE .value ,
314+ 'task_id' : task_id ,
315+ 'result' : True ,
316+ }
317+ self ._send_to_client (identity , response )
305318
306319 # Try to dispatch immediately
307320 self ._dispatch_pending_tasks ()
308321
309322 def _handle_task_response (self , identity : bytes , msg : dict [str , Any ]) -> None :
310323 """Handle task response from worker.
311324
312- Route the response back to the original task sender.
325+ Since the broker already sends an immediate acknowledgment when a task
326+ is queued, the worker's result response is simply logged and discarded.
313327 """
314- task_id = msg .get ('task_id' )
315- if not task_id :
316- _LOGGER .warning ('Task response missing task_id' )
317- return
318-
319- # Find original sender
320- pending = self ._pending_task_responses .pop (task_id , None )
321- if not pending :
322- _LOGGER .warning ('No pending response for task: %s' , task_id )
323- return
324-
325- original_sender , _ = pending
326-
327- # Forward response to original sender
328- self ._send_to_client (original_sender , msg )
328+ task_id = msg .get ('task_id' , '?' )
329+ _LOGGER .debug ('Received task response for %s (discarded — sender already acknowledged)' , task_id )
329330
330331 def _handle_task_ack (self , identity : bytes , msg : dict [str , Any ]) -> None :
331332 """Handle task acknowledgment from worker."""
@@ -602,7 +603,6 @@ def get_status(self) -> dict[str, Any]:
602603 'task_subscribers' : len (self ._task_subscribers ),
603604 'rpc_subscribers' : len (self ._rpc_subscribers ),
604605 'available_workers' : len (self ._available_workers ),
605- 'pending_task_responses' : len (self ._pending_task_responses ),
606606 'pending_rpc_responses' : len (self ._pending_rpc_responses ),
607607 }
608608
0 commit comments