Skip to content

Commit 876faef

Browse files
jmthomasclaude
andcommitted
Make operator resilient to transient Redis errors
The operator (PID 1 in its container) ran its monitoring loop with no rescue, so a single transient Redis error in update() -> hgetall (the kind of network blip that also makes targets reconnect) would unwind run(), exit the process, and trigger a full container restart. - operator.rb: wrap the monitoring cycle in a rescue so a failed cycle is logged and the loop keeps running, recovering on the next cycle. Only StandardError is caught, so SIGTERM/SystemExit still shut down cleanly. - store_autoload.rb / store_implementation.py: configure the Redis/Valkey client with equal-jitter reconnect backoff (cap=5s, base=0.625, 3 retries) so transient blips are absorbed inside the client and many clients don't reconnect in lockstep. Ruby samples the jittered delay array per connection since redis-rb takes a fixed delay array rather than a per-failure backoff callable. - Add specs covering operator loop resilience and the backoff config in both Ruby and Python. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 6b7e062 commit 876faef

6 files changed

Lines changed: 198 additions & 7 deletions

File tree

openc3/lib/openc3/operators/operator.rb

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,21 @@ def run
381381
# Monitor processes and respawn if died
382382
Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...")
383383
loop do
384-
update()
385-
remove_old()
386-
respawn_changed()
387-
start_new()
388-
respawn_dead()
384+
# A single cycle must never be able to take down the operator process.
385+
# update() in particular hits Redis every cycle and a transient network
386+
# error (the kind that also makes targets reconnect) would otherwise
387+
# unwind run() and exit the process, which in the container looks like a
388+
# full operator restart. Catch, log, and keep cycling so the next cycle
389+
# can recover once Redis is reachable again.
390+
begin
391+
update()
392+
remove_old()
393+
respawn_changed()
394+
start_new()
395+
respawn_dead()
396+
rescue => e
397+
Logger.error("#{self.class} cycle error, continuing: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
398+
end
389399
break if @shutdown
390400

391401
sleep(@cycle_time)

openc3/lib/openc3/utilities/store_autoload.rb

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,46 @@ def initialize(pool_size = 10, db_shard: 0)
123123
@redis_pool = StoreConnectionPool.new(size: pool_size) { build_redis() }
124124
end
125125

126+
# cap/base for the equal-jitter reconnect backoff (seconds)
127+
REDIS_BACKOFF_CAP = 5.0
128+
REDIS_BACKOFF_BASE = 0.625
129+
126130
def build_redis
127-
return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key)
131+
# reconnect_attempts retries the connection a few times with equal-jitter
132+
# backoff so a transient network blip is handled inside the client instead
133+
# of immediately surfacing a connection error to callers. The jitter
134+
# de-syncs many clients retrying the same blip to avoid a thundering herd
135+
# on recovery.
136+
#
137+
# This mirrors the Python store's Retry(EqualJitterBackoff(cap: 5, base:
138+
# 0.625), 3): per-retry backoff tops out at 5s on the final (3rd) retry
139+
# (~0.6-1.25s, ~1.25-2.5s, ~2.5-5s). redis-rb takes a fixed Array of delays
140+
# (no per-failure backoff callable), so we sample the jittered delays once
141+
# per connection here.
142+
# Connection, read, and write timeouts are left as the default: 1s
143+
return Redis.new(
144+
url: @redis_url,
145+
username: @redis_username,
146+
password: @redis_key,
147+
reconnect_attempts: reconnect_backoff_delays()
148+
)
149+
end
150+
151+
# Equal-jitter backoff delays for 3 retries, matching Python's
152+
# EqualJitterBackoff. Each retry's delay is randomized within the upper half
153+
# of an exponentially-growing ceiling.
154+
def reconnect_backoff_delays
155+
(1..3).map do |failures|
156+
# ceiling = exponential growth (base doubles each retry), clamped to cap.
157+
# For base=0.625, cap=5: failures 1,2,3 -> 1.25, 2.5, 5.0 seconds.
158+
# temp = half the ceiling: the guaranteed minimum wait for this retry.
159+
temp = [REDIS_BACKOFF_CAP, REDIS_BACKOFF_BASE * (2 ** failures)].min / 2.0
160+
# Final delay = fixed half (temp) + random half (rand*temp, in [0, temp)),
161+
# i.e. a value uniformly in [temp, 2*temp) = [ceiling/2, ceiling).
162+
# The fixed half keeps a sane floor; the random half de-syncs clients so
163+
# they don't all reconnect in lockstep (thundering herd) after a blip.
164+
temp + rand * temp
165+
end
128166
end
129167

130168
###########################################################################

openc3/python/openc3/utilities/store_implementation.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from contextlib import contextmanager
1414

1515
import valkey
16-
from valkey.exceptions import TimeoutError
16+
from valkey.backoff import EqualJitterBackoff
17+
from valkey.exceptions import BusyLoadingError, ConnectionError, TimeoutError
18+
from valkey.retry import Retry
1719

1820
from openc3.environment import *
1921
from openc3.utilities.connection_pool import ConnectionPool
@@ -165,11 +167,20 @@ def build_redis(self):
165167
# NOTE: We can't use decode_response because it tries to decode the binary
166168
# packet buffer which does not work. Thus strings come back as bytes like
167169
# b"target_name" and we decode them using b"target_name".decode()
170+
#
171+
# retry retries a command a few times with equal-jitter backoff so a
172+
# transient network blip is handled inside the client instead of
173+
# immediately surfacing a connection error to callers. The jitter
174+
# de-syncs many clients retrying the same blip to avoid a thundering
175+
# herd on recovery. With cap=5, base=0.625 the per-retry backoff tops
176+
# out at 5s on the final (3rd) retry: ~0.6-1.25s, ~1.25-2.5s, ~2.5-5s.
168177
return valkey.Valkey(
169178
host=self.redis_host,
170179
port=self.redis_port,
171180
username=OPENC3_REDIS_USERNAME,
172181
password=OPENC3_REDIS_PASSWORD,
182+
retry=Retry(EqualJitterBackoff(cap=5, base=0.625), 3),
183+
retry_on_error=[BusyLoadingError, ConnectionError, TimeoutError],
173184
)
174185

175186
###########################################################################

openc3/python/test/utilities/test_store_implementation.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,43 @@
1010
# if purchased from OpenC3, Inc.
1111

1212
import unittest
13+
from unittest.mock import patch
14+
15+
from valkey.backoff import EqualJitterBackoff
16+
from valkey.exceptions import BusyLoadingError, ConnectionError, TimeoutError
17+
from valkey.retry import Retry
1318

1419
from openc3.utilities.store_implementation import Store
1520

1621

1722
class TestStoreImplementation(unittest.TestCase):
1823
def test_help(self):
1924
help(Store)
25+
26+
def test_build_redis_configures_resilience(self):
27+
# A transient network blip (the same kind that makes targets reconnect)
28+
# must be retried inside the client with jittered backoff instead of
29+
# immediately surfacing a connection error to callers, which would
30+
# otherwise propagate up and kill the caller.
31+
with patch("valkey.Valkey") as valkey_new:
32+
# __new__ bypasses __init__ so we can exercise build_redis in
33+
# isolation without spinning up the connection pool / singleton.
34+
store = Store.__new__(Store)
35+
store.redis_host = "localhost"
36+
store.redis_port = 6379
37+
store.build_redis()
38+
39+
self.assertEqual(valkey_new.call_count, 1)
40+
_, kwargs = valkey_new.call_args
41+
# Client retries with equal-jitter backoff on connection/timeout errors
42+
retry = kwargs["retry"]
43+
self.assertIsInstance(retry, Retry)
44+
self.assertEqual(retry._retries, 3)
45+
self.assertIsInstance(retry._backoff, EqualJitterBackoff)
46+
self.assertIn(BusyLoadingError, kwargs["retry_on_error"])
47+
self.assertIn(ConnectionError, kwargs["retry_on_error"])
48+
self.assertIn(TimeoutError, kwargs["retry_on_error"])
49+
# Per-retry backoff is bounded so a single retry can't hang forever;
50+
# the final (3rd) retry tops out at the 5s cap (jittered, so 2.5-5s).
51+
self.assertEqual(retry._backoff._cap, 5)
52+
self.assertLessEqual(retry._backoff.compute(3), 5)

openc3/spec/operators/microservice_operator_spec.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,5 +261,49 @@ def build_changed(started)
261261
# end
262262
# end
263263
end
264+
265+
describe "redis error resilience" do
266+
before(:each) do
267+
@redis = mock_redis()
268+
ENV['OPERATOR_CYCLE_TIME'] = '0.05'
269+
end
270+
271+
after(:each) do
272+
MicroserviceOperator.instance&.stop
273+
# run() parks in a second loop waiting for @shutdown_complete, which is
274+
# only set by shutdown() via at_exit (stubbed to a no-op in this spec).
275+
# Force-kill the thread so teardown doesn't block on that park loop.
276+
@thread&.kill
277+
@thread&.join
278+
rescue Redis::BaseError
279+
# Before the fix the loop dies on the Redis error and join re-raises it
280+
# here; swallow so the example's own assertion is the reported failure.
281+
end
282+
283+
# A transient network blip (the same kind that makes targets reconnect)
284+
# makes one hgetall raise. The operator must absorb it and keep cycling
285+
# instead of letting the exception unwind run() and exit the process,
286+
# which in the container manifests as a full operator restart.
287+
it "survives a transient Redis error during update and keeps cycling" do
288+
raised = false
289+
allow(@redis).to receive(:hgetall).and_wrap_original do |original, key, *args|
290+
if key.to_s.include?('openc3_microservices') && !raised
291+
raised = true
292+
raise Redis::CannotConnectError.new("Error connecting to Redis on localhost:6379")
293+
end
294+
original.call(key, *args)
295+
end
296+
297+
capture_io do
298+
@thread = Thread.new { MicroserviceOperator.run }
299+
sleep 0.5 # Several cycles; the error fires on the first update
300+
end
301+
302+
# The error must actually have been triggered...
303+
expect(raised).to be true
304+
# ...and the operator loop must have survived it and still be running.
305+
expect(@thread.alive?).to be true
306+
end
307+
end
264308
end
265309
end
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# encoding: ascii-8bit
2+
3+
# Copyright 2026 OpenC3, Inc.
4+
# All Rights Reserved.
5+
#
6+
# This program is distributed in the hope that it will be useful,
7+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
8+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9+
# See LICENSE.md for more details.
10+
11+
# This file may also be used under the terms of a commercial license
12+
# if purchased from OpenC3, Inc.
13+
14+
require "spec_helper"
15+
require "openc3/utilities/store"
16+
17+
module OpenC3
18+
describe Store do
19+
describe "build_redis" do
20+
# A transient network blip (the same kind that makes targets reconnect)
21+
# must be retried inside the client with jittered backoff instead of
22+
# immediately surfacing a connection error to callers, which would
23+
# otherwise propagate up and kill the caller (e.g. the operator).
24+
it "configures equal-jitter reconnect backoff mirroring Python" do
25+
captured = nil
26+
allow(Redis).to receive(:new) do |**kwargs|
27+
captured = kwargs
28+
double("redis").as_null_object
29+
end
30+
31+
store = Store.allocate
32+
store.instance_variable_set(:@redis_url, "redis://localhost:6379")
33+
store.instance_variable_set(:@redis_username, nil)
34+
store.instance_variable_set(:@redis_key, nil)
35+
store.send(:build_redis)
36+
37+
attempts = captured[:reconnect_attempts]
38+
expect(attempts.length).to eq(3)
39+
# Equal-jitter ranges per retry with cap=5, base=0.625:
40+
# t = min(cap, base*2**f); delay in [t/2, t]
41+
expect(attempts[0]).to be_between(0.625, 1.25)
42+
expect(attempts[1]).to be_between(1.25, 2.5)
43+
expect(attempts[2]).to be_between(2.5, 5.0) # final retry caps at 5s
44+
end
45+
46+
it "samples fresh jittered delays each call to de-sync clients" do
47+
store = Store.allocate
48+
a = store.send(:reconnect_backoff_delays)
49+
b = store.send(:reconnect_backoff_delays)
50+
# Astronomically unlikely to be identical unless jitter is missing
51+
expect(a).not_to eq(b)
52+
end
53+
end
54+
end
55+
end

0 commit comments

Comments
 (0)