Skip to content

Commit e045654

Browse files
authored
New abstract method added to ConnectionInterface. Adding more debug logs when processing maint notifications. Filtering some e2e tests for notifications on new connections. (#3988)
* Adding debug logs for standalone client when a retryable error is raised * Adding more logs. Excluding slot shuffle with failover tests for new connections * Adding mtls support for the oss api maint notifications e2e tests * Fixing initialization of local variable used for debug logging. * Simplifying debug log enabled checks in client.py * Applying review comments * Applying review comments
1 parent 11043df commit e045654

6 files changed

Lines changed: 125 additions & 16 deletions

File tree

redis/client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import logging
23
import re
34
import threading
45
import time
@@ -89,6 +90,20 @@
8990
NEVER_DECODE = "NEVER_DECODE"
9091

9192

93+
logger = logging.getLogger(__name__)
94+
95+
96+
def is_debug_log_enabled():
97+
return logger.isEnabledFor(logging.DEBUG)
98+
99+
100+
def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
101+
logger.debug(
102+
f"Operation failed, "
103+
f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
104+
)
105+
106+
92107
class CaseInsensitiveDict(dict):
93108
"Case insensitive dict implementation. Assumes string keys only."
94109

@@ -727,6 +742,8 @@ def _execute_command(self, *args, **options):
727742
actual_retry_attempts = [0]
728743

729744
def failure_callback(error, failure_count):
745+
if is_debug_log_enabled():
746+
add_debug_log_for_operation_failure(conn)
730747
actual_retry_attempts[0] = failure_count
731748
self._close_connection(conn, error, failure_count, start_time, command_name)
732749

@@ -1709,6 +1726,8 @@ def immediate_execute_command(self, *args, **options):
17091726
actual_retry_attempts = [0]
17101727

17111728
def failure_callback(error, failure_count):
1729+
if is_debug_log_enabled():
1730+
add_debug_log_for_operation_failure(conn)
17121731
actual_retry_attempts[0] = failure_count
17131732
self._disconnect_reset_raise_on_watching(
17141733
conn, error, failure_count, start_time, command_name
@@ -1946,6 +1965,8 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
19461965
actual_retry_attempts = [0]
19471966

19481967
def failure_callback(error, failure_count):
1968+
if is_debug_log_enabled():
1969+
add_debug_log_for_operation_failure(conn)
19491970
actual_retry_attempts[0] = failure_count
19501971
self._disconnect_raise_on_watching(
19511972
conn, error, failure_count, start_time, operation_name

redis/cluster.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3951,6 +3951,12 @@ def _reinitialize_on_error(self, error, failure_count):
39513951
or type(error) in self.CONNECTION_ERRORS
39523952
):
39533953
if self._transaction_connection:
3954+
if is_debug_log_enabled():
3955+
logger.debug(
3956+
f"Operation failed, "
3957+
f"with connection: {self._transaction_connection}, "
3958+
f"details: {self._transaction_connection.extract_connection_details()}",
3959+
)
39543960
# Disconnect and release back to pool
39553961
self._transaction_connection.disconnect()
39563962
node = self._nodes_manager.find_connection_owner(

redis/connection.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ def reset_should_reconnect(self):
287287
"""
288288
pass
289289

290+
@abstractmethod
291+
def extract_connection_details(self) -> str:
292+
pass
293+
290294

291295
class MaintNotificationsAbstractConnection:
292296
"""
@@ -1442,6 +1446,18 @@ def socket_connect_timeout(self) -> Optional[Union[float, int]]:
14421446
def socket_connect_timeout(self, value: Optional[Union[float, int]]):
14431447
self._socket_connect_timeout = value
14441448

1449+
def extract_connection_details(self) -> str:
1450+
socket_address = None
1451+
if self._sock is None:
1452+
return "not connected"
1453+
try:
1454+
socket_address = self._sock.getsockname() if self._sock else None
1455+
socket_address = socket_address[1] if socket_address else None
1456+
except (AttributeError, OSError):
1457+
pass
1458+
1459+
return f"connected to ip {self.get_resolved_ip()}, local socket port: {socket_address}"
1460+
14451461

14461462
class Connection(AbstractConnection):
14471463
"Manages TCP communication to and from a Redis server"
@@ -1899,6 +1915,9 @@ def _on_invalidation_callback(self, data: List[Union[str, Optional[List[bytes]]]
18991915
reason=CSCReason.INVALIDATION,
19001916
)
19011917

1918+
def extract_connection_details(self) -> str:
1919+
return self._conn.extract_connection_details()
1920+
19021921

19031922
class SSLConnection(Connection):
19041923
"""Manages SSL connections to and from the Redis server(s).

redis/maint_notifications.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,12 @@ def handle_maintenance_completed_notification(self, **kwargs):
10061006
or not self.config.is_relaxed_timeouts_enabled()
10071007
):
10081008
return
1009-
add_debug_log_for_notification(self.connection, "MAINTENANCE_COMPLETED")
1009+
notification = None
1010+
if kwargs.get("notification"):
1011+
notification = kwargs["notification"]
1012+
add_debug_log_for_notification(
1013+
self.connection, notification if notification else "MAINTENANCE_COMPLETED"
1014+
)
10101015
self.connection.reset_tmp_settings(reset_relaxed_timeout=True)
10111016
# Maintenance completed - reset the connection
10121017
# timeouts by providing -1 as the relaxed timeout
@@ -1016,8 +1021,7 @@ def handle_maintenance_completed_notification(self, **kwargs):
10161021
# notifications and skipped end maint notifications
10171022
self.connection.reset_received_notifications()
10181023

1019-
if kwargs.get("notification"):
1020-
notification = kwargs["notification"]
1024+
if notification:
10211025
record_connection_relaxed_timeout(
10221026
connection_name=repr(self.connection),
10231027
maint_notification=notification.__class__.__name__,
@@ -1076,7 +1080,8 @@ def handle_oss_maintenance_completed_notification(
10761080
# process the same notification twice
10771081
return
10781082

1079-
logger.debug(f"Handling SMIGRATED notification: {notification}")
1083+
if logger.isEnabledFor(logging.DEBUG):
1084+
logger.debug(f"Handling SMIGRATED notification: {notification}")
10801085
self._in_progress.add(notification)
10811086

10821087
# Extract the information about the src and destination nodes that are affected
@@ -1130,7 +1135,16 @@ def handle_oss_maintenance_completed_notification(
11301135
# Some of them might be used by sub sub and we don't know which ones - so we disconnect
11311136
# all in flight connections after they are done with current command execution
11321137
for conn in current_node.redis_connection.connection_pool._get_in_use_connections():
1138+
add_debug_log_for_notification(
1139+
conn, "SMIGRATED - mark for reconnect"
1140+
)
11331141
conn.mark_for_reconnect()
1142+
else:
1143+
if logger.isEnabledFor(logging.DEBUG):
1144+
logger.debug(
1145+
f"SMIGRATED: Node {current_node.name} not affected by maintenance, "
1146+
f"skipping mark for reconnect"
1147+
)
11341148

11351149
if (
11361150
current_node

tests/test_scenario/conftest.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,39 @@ def extract_cluster_fqdn(url):
226226
return f"https://{cleaned_hostname}"
227227

228228

229+
def _prepare_ssl_certificates(cert_chain: bool) -> dict:
230+
"""
231+
Prepare SSL certificates for Redis cluster connection.
232+
233+
Args:
234+
cert_chain: PEM-encoded certificate chain containing client cert + intermediate + CA cert.
235+
This is the full certificate chain that will be used to validate the server.
236+
237+
Returns:
238+
dict: SSL configuration kwargs for RedisCluster
239+
"""
240+
certs_config_path = os.environ.get("MTLS_CONFIG_PATH", None)
241+
242+
if not cert_chain:
243+
return {
244+
"ssl_cert_reqs": "none",
245+
"ssl_check_hostname": False,
246+
}
247+
248+
if not certs_config_path:
249+
raise ValueError(
250+
"MTLS enabled test is triggered but MTLS_CONFIG_PATH environment variable not set"
251+
)
252+
253+
# The cert_chain contains the full chain (client cert + intermediate + root CA)
254+
# Use it as CA data for validating the server's certificate
255+
return {
256+
"ssl_cert_reqs": "none",
257+
"ssl_keyfile": os.path.join(certs_config_path, "client.key"),
258+
"ssl_certfile": os.path.join(certs_config_path, "client.crt"),
259+
}
260+
261+
229262
@pytest.fixture()
230263
def client_maint_notifications(endpoints_config):
231264
return _get_client_maint_notifications(endpoints_config)
@@ -307,8 +340,8 @@ def get_cluster_client_maint_notifications(
307340
enable_relaxed_timeout: bool = True,
308341
enable_proactive_reconnect: bool = True,
309342
disable_retries: bool = False,
343+
auth_ssl_client_certs: bool = False,
310344
socket_timeout: Optional[float] = None,
311-
host_config: Optional[str] = None,
312345
):
313346
"""Create Redis cluster client with maintenance notifications enabled."""
314347
# Get credentials from the configuration
@@ -337,6 +370,13 @@ def get_cluster_client_maint_notifications(
337370
tls_enabled = True if parsed.scheme == "rediss" else False
338371
logging.info(f"TLS enabled: {tls_enabled}")
339372

373+
tls_kwargs = {"ssl": tls_enabled}
374+
375+
if tls_enabled:
376+
# Prepare SSL certificate configuration
377+
ssl_config = _prepare_ssl_certificates(auth_ssl_client_certs)
378+
tls_kwargs.update(ssl_config)
379+
340380
# Configure maintenance notifications
341381
maintenance_config = MaintNotificationsConfig(
342382
enabled=enable_maintenance_notifications,
@@ -352,12 +392,10 @@ def get_cluster_client_maint_notifications(
352392
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
353393
username=username,
354394
password=password,
355-
ssl=tls_enabled,
356-
ssl_cert_reqs="none",
357-
ssl_check_hostname=False,
358395
protocol=protocol, # RESP3 required for push notifications
359396
maint_notifications_config=maintenance_config,
360397
retry=retry,
398+
**tls_kwargs,
361399
)
362400
logging.info("Redis cluster client created with maintenance notifications enabled")
363401
logging.info(

tests/test_scenario/test_maint_notifications.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,16 +1239,10 @@ def execute_commands(duration: int, errors: Queue):
12391239
assert errors.empty(), f"Errors occurred in threads: {errors.queue}"
12401240

12411241

1242-
# 5 minutes timeout for this test
1243-
# @pytest.mark.skipif(
1244-
# use_mock_proxy(),
1245-
# reason="Mock proxy doesn't support sending notifications to new connections.",
1246-
# )
1247-
1248-
12491242
def generate_params(
12501243
fault_injector_client: FaultInjectorClient,
12511244
effect_names: list[SlotMigrateEffects],
1245+
skip_combinations: list[tuple[SlotMigrateEffects, str]] = [],
12521246
):
12531247
# params should produce list of tuples: (effect_name, trigger_name, bdb_config, bdb_name)
12541248
params = []
@@ -1261,6 +1255,8 @@ def generate_params(
12611255

12621256
for trigger_info in triggers_data["triggers"]:
12631257
trigger = trigger_info["name"]
1258+
if (effect_name, trigger) in skip_combinations:
1259+
continue
12641260
if trigger == "maintenance_mode":
12651261
continue
12661262
trigger_requirements = trigger_info["requirements"]
@@ -1332,11 +1328,23 @@ def setup_env(
13321328
self._bdb_name = db_config["name"]
13331329
socket_timeout = DEFAULT_OSS_API_CLIENT_SOCKET_TIMEOUT
13341330

1331+
auth_ssl_client_certs_config_info = db_config.get(
1332+
"authentication_ssl_client_certs", None
1333+
)
1334+
1335+
auth_ssl_client_certs = (
1336+
True
1337+
if auth_ssl_client_certs_config_info
1338+
and auth_ssl_client_certs_config_info[0]["client_cert"] is not None
1339+
else False
1340+
)
1341+
13351342
cluster_client_maint_notifications = get_cluster_client_maint_notifications(
13361343
endpoints_config=cluster_endpoint_config,
13371344
disable_retries=True,
13381345
socket_timeout=socket_timeout,
13391346
enable_maintenance_notifications=True,
1347+
auth_ssl_client_certs=auth_ssl_client_certs,
13401348
)
13411349
return cluster_client_maint_notifications, cluster_endpoint_config
13421350

@@ -1741,9 +1749,12 @@ def test_notification_handling_with_node_remove(
17411749
SlotMigrateEffects.REMOVE,
17421750
SlotMigrateEffects.ADD,
17431751
],
1752+
skip_combinations=[
1753+
(SlotMigrateEffects.SLOT_SHUFFLE, "failover"),
1754+
], # maintenance ends too fast for the test to be reliable
17441755
),
17451756
)
1746-
def test_new_connections_receive_last_notification_with_migrating(
1757+
def test_new_connections_receive_last_smigrating_smigrated_notification(
17471758
self,
17481759
fault_injector_client_oss_api: FaultInjectorClient,
17491760
effect_name: SlotMigrateEffects,

0 commit comments

Comments
 (0)