Skip to content

Commit 34c73c7

Browse files
authored
Add maintenance notifications support for OSS API cluster clients (#3946)
Description: Adds support for smart connection hand-off during maintenance operations for Redis Cloud databases with OSS API enabled. When a cluster node undergoes maintenance, the client automatically updates its cluster topology and manages connections to ensure minimal disruption (in case the feature is enabled in the server). Changes: New notification types: OSSNodeMigratingNotification and OSSNodeMigratedNotification for OSS cluster maintenance events (SMIGRATING/SMIGRATED push messages) New handler: OSSMaintNotificationsHandler to process cluster-wide maintenance notifications and trigger topology refresh Cluster client integration: MaintNotificationsAbstractRedisCluster mixin adds maintenance notification support to RedisCluster Connection management: Affected connections are marked for reconnect; removed nodes have their connections disconnected Parser updates: Handle new SMIGRATING/SMIGRATED push message types Auto-enabled for RESP3: Maintenance notifications are enabled by default when using protocol version 3
1 parent 24974fe commit 34c73c7

30 files changed

Lines changed: 5242 additions & 639 deletions

.github/workflows/integration.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ jobs:
4444
inputs: dev_requirements.txt
4545
ignore-vulns: |
4646
GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here.
47+
CVE-2026-26007 # dependency for entraid tests
4748
4849
lint:
4950
name: Code linters

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ docker/stunnel/keys
2727
/dockers/replica/
2828
/dockers/sentinel/
2929
/dockers/redis-stack/
30+
/experimenting/

docker-compose.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ x-client-libs-stack-image: &client-libs-stack-image
66
x-client-libs-image: &client-libs-image
77
image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_IMAGE_TAG:-8.4.0}"
88

9+
networks:
10+
redis-net:
11+
driver: bridge
912
services:
1013

1114
redis:
@@ -108,3 +111,44 @@ services:
108111
- standalone
109112
- all-stack
110113
- all
114+
115+
redis-proxied:
116+
<<: *client-libs-image
117+
container_name: redis-proxied
118+
ports:
119+
- "3000:3000"
120+
networks:
121+
- redis-net
122+
healthcheck:
123+
test: ["CMD", "redis-cli", "-p", "3000", "PING"]
124+
interval: 10s
125+
timeout: 3s
126+
retries: 5
127+
128+
resp-proxy:
129+
image: redislabs/client-resp-proxy
130+
container_name: resp-proxy
131+
environment:
132+
LISTEN_HOST: "0.0.0.0"
133+
LISTEN_PORT: "15379,15380,15381"
134+
TARGET_HOST: "redis-proxied"
135+
TARGET_PORT: "3000"
136+
API_PORT: "4000"
137+
ENABLE_LOGGING: true
138+
SIMULATE_CLUSTER: true
139+
DEFAULT_INTERCEPTORS: "cluster,hitless,logger"
140+
141+
ports:
142+
- "15379:15379"
143+
- "15380:15380"
144+
- "15381:15381"
145+
- "4000:4000"
146+
depends_on:
147+
- redis-proxied
148+
networks:
149+
- redis-net
150+
healthcheck:
151+
test: ["CMD", "sh", "-c", "wget -qO- http://localhost:4000/stats || exit 1"]
152+
interval: 10s
153+
timeout: 3s
154+
retries: 5

redis/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def int_or_str(value):
4747
return value
4848

4949

50-
__version__ = "7.0.1"
50+
__version__ = "7.1.1"
5151

5252
VERSION = tuple(map(int_or_str, __version__.split(".")))
5353

redis/_parsers/base.py

Lines changed: 103 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
NodeMigratedNotification,
1212
NodeMigratingNotification,
1313
NodeMovingNotification,
14+
OSSNodeMigratedNotification,
15+
OSSNodeMigratingNotification,
1416
)
17+
from redis.utils import safe_str
1518

1619
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
1720
from asyncio import timeout as async_timeout
@@ -178,16 +181,56 @@ async def read_response(
178181
class MaintenanceNotificationsParser:
179182
"""Protocol defining maintenance push notification parsing functionality"""
180183

184+
@staticmethod
185+
def parse_oss_maintenance_start_msg(response):
186+
# Expected message format is:
187+
# SMIGRATING <seq_number> <slot, range1-range2,...>
188+
id = response[1]
189+
slots = safe_str(response[2])
190+
return OSSNodeMigratingNotification(id, slots)
191+
192+
@staticmethod
193+
def parse_oss_maintenance_completed_msg(response):
194+
# Expected message format is:
195+
# SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...]
196+
id = response[1]
197+
nodes_to_slots_mapping_data = response[2]
198+
# Build the nodes_to_slots_mapping dict structure:
199+
# {
200+
# "src_host:port": [
201+
# {"dest_host:port": "slot_range"},
202+
# ...
203+
# ],
204+
# ...
205+
# }
206+
nodes_to_slots_mapping = {}
207+
for src_node, dest_node, slots in nodes_to_slots_mapping_data:
208+
src_node_str = safe_str(src_node)
209+
dest_node_str = safe_str(dest_node)
210+
slots_str = safe_str(slots)
211+
212+
if src_node_str not in nodes_to_slots_mapping:
213+
nodes_to_slots_mapping[src_node_str] = []
214+
nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str})
215+
216+
return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)
217+
181218
@staticmethod
182219
def parse_maintenance_start_msg(response, notification_type):
183220
# Expected message format is: <notification_type> <seq_number> <time>
221+
# Examples:
222+
# MIGRATING 1 10
223+
# FAILING_OVER 2 20
184224
id = response[1]
185225
ttl = response[2]
186226
return notification_type(id, ttl)
187227

188228
@staticmethod
189229
def parse_maintenance_completed_msg(response, notification_type):
190230
# Expected message format is: <notification_type> <seq_number>
231+
# Examples:
232+
# MIGRATED 1
233+
# FAILED_OVER 2
191234
id = response[1]
192235
return notification_type(id)
193236

@@ -199,9 +242,7 @@ def parse_moving_msg(response):
199242
if response[3] is None:
200243
host, port = None, None
201244
else:
202-
value = response[3]
203-
if isinstance(value, bytes):
204-
value = value.decode()
245+
value = safe_str(response[3])
205246
host, port = value.split(":")
206247
port = int(port) if port is not None else None
207248

@@ -214,12 +255,15 @@ def parse_moving_msg(response):
214255
_MIGRATED_MESSAGE = "MIGRATED"
215256
_FAILING_OVER_MESSAGE = "FAILING_OVER"
216257
_FAILED_OVER_MESSAGE = "FAILED_OVER"
258+
_SMIGRATING_MESSAGE = "SMIGRATING"
259+
_SMIGRATED_MESSAGE = "SMIGRATED"
217260

218261
_MAINTENANCE_MESSAGES = (
219262
_MIGRATING_MESSAGE,
220263
_MIGRATED_MESSAGE,
221264
_FAILING_OVER_MESSAGE,
222265
_FAILED_OVER_MESSAGE,
266+
_SMIGRATING_MESSAGE,
223267
)
224268

225269
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
@@ -245,6 +289,14 @@ def parse_moving_msg(response):
245289
NodeMovingNotification,
246290
MaintenanceNotificationsParser.parse_moving_msg,
247291
),
292+
_SMIGRATING_MESSAGE: (
293+
OSSNodeMigratingNotification,
294+
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
295+
),
296+
_SMIGRATED_MESSAGE: (
297+
OSSNodeMigratedNotification,
298+
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
299+
),
248300
}
249301

250302

@@ -255,6 +307,7 @@ class PushNotificationsParser(Protocol):
255307
invalidation_push_handler_func: Optional[Callable] = None
256308
node_moving_push_handler_func: Optional[Callable] = None
257309
maintenance_push_handler_func: Optional[Callable] = None
310+
oss_cluster_maint_push_handler_func: Optional[Callable] = None
258311

259312
def handle_pubsub_push_response(self, response):
260313
"""Handle pubsub push responses"""
@@ -269,6 +322,7 @@ def handle_push_response(self, response, **kwargs):
269322
_INVALIDATION_MESSAGE,
270323
*_MAINTENANCE_MESSAGES,
271324
_MOVING_MESSAGE,
325+
_SMIGRATED_MESSAGE,
272326
):
273327
return self.pubsub_push_handler_func(response)
274328

@@ -291,13 +345,30 @@ def handle_push_response(self, response, **kwargs):
291345
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
292346
msg_type
293347
][1]
294-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
295-
msg_type
296-
][0]
297-
notification = parser_function(response, notification_type)
348+
if msg_type == _SMIGRATING_MESSAGE:
349+
notification = parser_function(response)
350+
else:
351+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
352+
msg_type
353+
][0]
354+
notification = parser_function(response, notification_type)
298355

299356
if notification is not None:
300357
return self.maintenance_push_handler_func(notification)
358+
if msg_type == _SMIGRATED_MESSAGE and (
359+
self.oss_cluster_maint_push_handler_func
360+
or self.maintenance_push_handler_func
361+
):
362+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
363+
msg_type
364+
][1]
365+
notification = parser_function(response)
366+
367+
if notification is not None:
368+
if self.maintenance_push_handler_func:
369+
self.maintenance_push_handler_func(notification)
370+
if self.oss_cluster_maint_push_handler_func:
371+
self.oss_cluster_maint_push_handler_func(notification)
301372
except Exception as e:
302373
logger.error(
303374
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -317,6 +388,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
317388
def set_maintenance_push_handler(self, maintenance_push_handler_func):
318389
self.maintenance_push_handler_func = maintenance_push_handler_func
319390

391+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
392+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
393+
320394

321395
class AsyncPushNotificationsParser(Protocol):
322396
"""Protocol defining async RESP3-specific parsing functionality"""
@@ -325,6 +399,7 @@ class AsyncPushNotificationsParser(Protocol):
325399
invalidation_push_handler_func: Optional[Callable] = None
326400
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
327401
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
402+
oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328403

329404
async def handle_pubsub_push_response(self, response):
330405
"""Handle pubsub push responses asynchronously"""
@@ -341,6 +416,7 @@ async def handle_push_response(self, response, **kwargs):
341416
_INVALIDATION_MESSAGE,
342417
*_MAINTENANCE_MESSAGES,
343418
_MOVING_MESSAGE,
419+
_SMIGRATED_MESSAGE,
344420
):
345421
return await self.pubsub_push_handler_func(response)
346422

@@ -365,13 +441,26 @@ async def handle_push_response(self, response, **kwargs):
365441
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
366442
msg_type
367443
][1]
368-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
369-
msg_type
370-
][0]
371-
notification = parser_function(response, notification_type)
444+
if msg_type == _SMIGRATING_MESSAGE:
445+
notification = parser_function(response)
446+
else:
447+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
448+
msg_type
449+
][0]
450+
notification = parser_function(response, notification_type)
372451

373452
if notification is not None:
374453
return await self.maintenance_push_handler_func(notification)
454+
if (
455+
msg_type == _SMIGRATED_MESSAGE
456+
and self.oss_cluster_maint_push_handler_func
457+
):
458+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
459+
msg_type
460+
][1]
461+
notification = parser_function(response)
462+
if notification is not None:
463+
return await self.oss_cluster_maint_push_handler_func(notification)
375464
except Exception as e:
376465
logger.error(
377466
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -393,6 +482,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
393482
def set_maintenance_push_handler(self, maintenance_push_handler_func):
394483
self.maintenance_push_handler_func = maintenance_push_handler_func
395484

485+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
486+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
487+
396488

397489
class _AsyncRESPBase(AsyncBaseParser):
398490
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
5050
self.node_moving_push_handler_func = None
5151
self.maintenance_push_handler_func = None
52+
self.oss_cluster_maint_push_handler_func = None
5253
self.invalidation_push_handler_func = None
5354
self._hiredis_PushNotificationType = None
5455

redis/_parsers/resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
2121
self.node_moving_push_handler_func = None
2222
self.maintenance_push_handler_func = None
23+
self.oss_cluster_maint_push_handler_func = None
2324
self.invalidation_push_handler_func = None
2425

2526
def handle_pubsub_push_response(self, response):

redis/_parsers/socket.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def _read_from_socket(
6262
sock.settimeout(timeout)
6363
try:
6464
while True:
65-
data = self._sock.recv(socket_read_size)
65+
data = sock.recv(socket_read_size)
6666
# an empty string indicates the server shutdown the socket
6767
if isinstance(data, bytes) and len(data) == 0:
6868
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

redis/asyncio/cluster.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
856856
slot = None
857857
else:
858858
slot = await self._determine_slot(*args)
859-
if not slot:
859+
if slot is None:
860860
command_policies = CommandPolicies()
861861
else:
862862
command_policies = CommandPolicies(
@@ -1212,6 +1212,9 @@ def __repr__(self) -> str:
12121212
def __eq__(self, obj: Any) -> bool:
12131213
return isinstance(obj, ClusterNode) and obj.name == self.name
12141214

1215+
def __hash__(self) -> int:
1216+
return hash(self.name)
1217+
12151218
_DEL_MESSAGE = "Unclosed ClusterNode object"
12161219

12171220
def __del__(
@@ -2188,7 +2191,7 @@ async def _execute(
21882191
slot = None
21892192
else:
21902193
slot = await client._determine_slot(*cmd.args)
2191-
if not slot:
2194+
if slot is None:
21922195
command_policies = CommandPolicies()
21932196
else:
21942197
command_policies = CommandPolicies(

0 commit comments

Comments
 (0)