Skip to content

Commit a989e8c

Browse files
ChickenBennyauvipy
authored andcommitted
fix: clear the timer while catch the exception (#10218)
* fix: clear the timer while catch the exception * fix: move the timer clear to another try catch block * Update t/unit/worker/test_loops.py --------- Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
1 parent d06de5f commit a989e8c

2 files changed

Lines changed: 99 additions & 0 deletions

File tree

celery/worker/loops.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
100100
except Exception:
101101
# Reset the hub on error (e.g. connection loss) to clean up
102102
# stale file descriptors and callbacks from the old connection.
103+
# Also clear the timer queue so that stale periodic entries added by
104+
# register_with_event_loop (e.g. maybe_restore_messages) do not fire
105+
# against the broken connection after reconnect and trigger another
106+
# crash before the new connection is fully established.
107+
# All hub timers are re-registered during blueprint.start() once this
108+
# exception propagates and the consumer reconnects.
103109
# We intentionally do NOT reset on normal exit (graceful shutdown)
104110
# so that timers (e.g. heartbeat) keep firing while the pool drains.
105111
# WorkerShutdown/WorkerTerminate extend SystemExit (not Exception)
@@ -109,6 +115,18 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
109115
except Exception as exc: # pylint: disable=broad-except
110116
logger.exception(
111117
'Error cleaning up after event loop: %r', exc)
118+
# Clear stale timer entries accumulated across reconnects (e.g.
119+
# maybe_restore_messages registered via call_repeatedly). Without
120+
# this, each reconnect appends a new entry; all of them fire during
121+
# the reconnect window, raise again, and trigger another restart.
122+
# Use a separate try/except so this always runs even if hub.reset()
123+
# raised above. Timers are re-registered by register_with_event_loop
124+
# when blueprint.start() is called after reconnect.
125+
try:
126+
hub.timer.clear()
127+
except Exception as exc: # pylint: disable=broad-except
128+
logger.exception(
129+
'Error clearing hub timer after event loop: %r', exc)
112130
raise
113131

114132

t/unit/worker/test_loops.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,87 @@ def test_hub_reset_on_connection_error(self):
466466
asynloop(*x.args)
467467
x.hub.reset.assert_called_once()
468468

469+
def test_hub_timer_cleared_on_connection_error(self):
470+
# Stale timer entries (e.g. maybe_restore_messages) must be cleared
471+
# when the event loop exits due to a connection error. Without this,
472+
# entries accumulated across reconnects can fire against the broken
473+
# connection and crash the loop again before the new connection is
474+
# fully established, causing a rapid restart loop.
475+
x = X(self.app)
476+
x.hub.readers = {6: Mock()}
477+
x.hub.timer._queue = [1]
478+
x.hub.reset = Mock(name='hub.reset()')
479+
x.close_then_error(x.hub.poller.poll)
480+
x.hub.fire_timers.return_value = 33.37
481+
x.hub.poller.poll.return_value = []
482+
with pytest.raises(socket.error):
483+
asynloop(*x.args)
484+
x.hub.timer.clear.assert_called_once()
485+
486+
def test_hub_timer_not_cleared_on_graceful_shutdown(self):
487+
# On graceful shutdown the timer queue must be left intact so that
488+
# periodic timers (e.g. heartbeat) keep firing while the pool drains.
489+
x = X(self.app)
490+
x.hub.reset = Mock(name='hub.reset()')
491+
x.hub.on_tick.add(x.closer(mod=2))
492+
asynloop(*x.args)
493+
x.hub.timer.clear.assert_not_called()
494+
495+
def test_hub_timer_not_cleared_on_worker_shutdown(self):
496+
x = X(self.app)
497+
x.hub.reset = Mock(name='hub.reset()')
498+
state.should_stop = 303
499+
try:
500+
with pytest.raises(WorkerShutdown):
501+
asynloop(*x.args)
502+
finally:
503+
state.should_stop = None
504+
x.hub.timer.clear.assert_not_called()
505+
506+
def test_hub_timer_not_cleared_on_worker_terminate(self):
507+
x = X(self.app)
508+
x.hub.reset = Mock(name='hub.reset()')
509+
state.should_terminate = True
510+
try:
511+
with pytest.raises(WorkerTerminate):
512+
asynloop(*x.args)
513+
finally:
514+
state.should_terminate = None
515+
x.hub.timer.clear.assert_not_called()
516+
517+
def test_hub_timer_clear_error_still_reraises_original(self):
518+
# If hub.timer.clear() itself raises, the original connection error
519+
# must still be propagated, not the cleanup error.
520+
x = X(self.app)
521+
x.hub.readers = {6: Mock()}
522+
x.hub.timer._queue = [1]
523+
x.hub.reset = Mock(name='hub.reset()')
524+
x.hub.timer.clear = Mock(
525+
name='hub.timer.clear()', side_effect=RuntimeError('clear failed')
526+
)
527+
x.close_then_error(x.hub.poller.poll)
528+
x.hub.fire_timers.return_value = 33.37
529+
x.hub.poller.poll.return_value = []
530+
with pytest.raises(socket.error):
531+
asynloop(*x.args)
532+
x.hub.timer.clear.assert_called_once()
533+
534+
def test_hub_timer_cleared_even_when_reset_raises(self):
535+
# hub.timer.clear() must still be called even if hub.reset() raises.
536+
# The two cleanup calls are in separate try/except blocks so that a
537+
# failure in hub.reset() does not prevent stale timer entries from
538+
# being discarded, avoiding stale timers persisting after a reset error.
539+
x = X(self.app)
540+
x.hub.readers = {6: Mock()}
541+
x.hub.timer._queue = [1]
542+
x.hub.reset = Mock(name='hub.reset()', side_effect=RuntimeError('reset failed'))
543+
x.close_then_error(x.hub.poller.poll)
544+
x.hub.fire_timers.return_value = 33.37
545+
x.hub.poller.poll.return_value = []
546+
with pytest.raises(socket.error):
547+
asynloop(*x.args)
548+
x.hub.timer.clear.assert_called_once()
549+
469550
def test_hub_not_reset_on_graceful_shutdown(self):
470551
x = X(self.app)
471552
x.hub.reset = Mock(name='hub.reset()')

0 commit comments

Comments
 (0)