Skip to content

Commit 32c19a0

Browse files
authored
Merge branch 'master' into ps_fix_read_response_timeout_for_pubsub
2 parents a5846a6 + e045654 commit 32c19a0

10 files changed

Lines changed: 196 additions & 20 deletions

File tree

redis/asyncio/cluster.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,15 @@ def __init__(
19891989
else TransactionStrategy(self)
19901990
)
19911991

1992+
@property
1993+
def nodes_manager(self) -> "NodesManager":
1994+
"""Get the nodes manager from the cluster client."""
1995+
return self.cluster_client.nodes_manager
1996+
1997+
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
1998+
"""Set a custom response callback on the cluster client."""
1999+
self.cluster_client.set_response_callback(command, callback)
2000+
19922001
async def initialize(self) -> "ClusterPipeline":
19932002
await self._execution_strategy.initialize()
19942003
return self

redis/client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import logging
23
import re
34
import threading
45
import time
@@ -90,6 +91,20 @@
9091
NEVER_DECODE = "NEVER_DECODE"
9192

9293

94+
logger = logging.getLogger(__name__)
95+
96+
97+
def is_debug_log_enabled():
98+
return logger.isEnabledFor(logging.DEBUG)
99+
100+
101+
def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
102+
logger.debug(
103+
f"Operation failed, "
104+
f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
105+
)
106+
107+
93108
class CaseInsensitiveDict(dict):
94109
"Case insensitive dict implementation. Assumes string keys only."
95110

@@ -728,6 +743,8 @@ def _execute_command(self, *args, **options):
728743
actual_retry_attempts = [0]
729744

730745
def failure_callback(error, failure_count):
746+
if is_debug_log_enabled():
747+
add_debug_log_for_operation_failure(conn)
731748
actual_retry_attempts[0] = failure_count
732749
self._close_connection(conn, error, failure_count, start_time, command_name)
733750

@@ -1751,6 +1768,8 @@ def immediate_execute_command(self, *args, **options):
17511768
actual_retry_attempts = [0]
17521769

17531770
def failure_callback(error, failure_count):
1771+
if is_debug_log_enabled():
1772+
add_debug_log_for_operation_failure(conn)
17541773
actual_retry_attempts[0] = failure_count
17551774
self._disconnect_reset_raise_on_watching(
17561775
conn, error, failure_count, start_time, command_name
@@ -1988,6 +2007,8 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
19882007
actual_retry_attempts = [0]
19892008

19902009
def failure_callback(error, failure_count):
2010+
if is_debug_log_enabled():
2011+
add_debug_log_for_operation_failure(conn)
19912012
actual_retry_attempts[0] = failure_count
19922013
self._disconnect_raise_on_watching(
19932014
conn, error, failure_count, start_time, operation_name

redis/cluster.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3955,6 +3955,12 @@ def _reinitialize_on_error(self, error, failure_count):
39553955
or type(error) in self.CONNECTION_ERRORS
39563956
):
39573957
if self._transaction_connection:
3958+
if is_debug_log_enabled():
3959+
logger.debug(
3960+
f"Operation failed, "
3961+
f"with connection: {self._transaction_connection}, "
3962+
f"details: {self._transaction_connection.extract_connection_details()}",
3963+
)
39583964
# Disconnect and release back to pool
39593965
self._transaction_connection.disconnect()
39603966
node = self._nodes_manager.find_connection_owner(

redis/commands/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2900,11 +2900,11 @@ class ListCommands(CommandsProtocol):
29002900
"""
29012901

29022902
def blpop(
2903-
self, keys: List, timeout: Optional[Number] = 0
2903+
self, keys: KeysT, timeout: Optional[Number] = 0
29042904
) -> Union[Awaitable[list], list]:
29052905
"""
29062906
LPOP a value off of the first non-empty list
2907-
named in the ``keys`` list.
2907+
named in ``keys``.
29082908
29092909
If none of the lists in ``keys`` has a value to LPOP, then block
29102910
for ``timeout`` seconds, or until a value gets pushed on to one
@@ -2921,11 +2921,11 @@ def blpop(
29212921
return self.execute_command("BLPOP", *keys)
29222922

29232923
def brpop(
2924-
self, keys: List, timeout: Optional[Number] = 0
2924+
self, keys: KeysT, timeout: Optional[Number] = 0
29252925
) -> Union[Awaitable[list], list]:
29262926
"""
29272927
RPOP a value off of the first non-empty list
2928-
named in the ``keys`` list.
2928+
named in ``keys``.
29292929
29302930
If none of the lists in ``keys`` has a value to RPOP, then block
29312931
for ``timeout`` seconds, or until a value gets pushed on to one

redis/connection.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ def reset_should_reconnect(self):
287287
"""
288288
pass
289289

290+
@abstractmethod
291+
def extract_connection_details(self) -> str:
292+
pass
293+
290294

291295
class MaintNotificationsAbstractConnection:
292296
"""
@@ -1448,6 +1452,18 @@ def socket_connect_timeout(self) -> Optional[Union[float, int]]:
14481452
def socket_connect_timeout(self, value: Optional[Union[float, int]]):
14491453
self._socket_connect_timeout = value
14501454

1455+
def extract_connection_details(self) -> str:
1456+
socket_address = None
1457+
if self._sock is None:
1458+
return "not connected"
1459+
try:
1460+
socket_address = self._sock.getsockname() if self._sock else None
1461+
socket_address = socket_address[1] if socket_address else None
1462+
except (AttributeError, OSError):
1463+
pass
1464+
1465+
return f"connected to ip {self.get_resolved_ip()}, local socket port: {socket_address}"
1466+
14511467

14521468
class Connection(AbstractConnection):
14531469
"Manages TCP communication to and from a Redis server"
@@ -1911,6 +1927,9 @@ def _on_invalidation_callback(self, data: List[Union[str, Optional[List[bytes]]]
19111927
reason=CSCReason.INVALIDATION,
19121928
)
19131929

1930+
def extract_connection_details(self) -> str:
1931+
return self._conn.extract_connection_details()
1932+
19141933

19151934
class SSLConnection(Connection):
19161935
"""Manages SSL connections to and from the Redis server(s).

redis/maint_notifications.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,12 @@ def handle_maintenance_completed_notification(self, **kwargs):
10061006
or not self.config.is_relaxed_timeouts_enabled()
10071007
):
10081008
return
1009-
add_debug_log_for_notification(self.connection, "MAINTENANCE_COMPLETED")
1009+
notification = None
1010+
if kwargs.get("notification"):
1011+
notification = kwargs["notification"]
1012+
add_debug_log_for_notification(
1013+
self.connection, notification if notification else "MAINTENANCE_COMPLETED"
1014+
)
10101015
self.connection.reset_tmp_settings(reset_relaxed_timeout=True)
10111016
# Maintenance completed - reset the connection
10121017
# timeouts by providing -1 as the relaxed timeout
@@ -1016,8 +1021,7 @@ def handle_maintenance_completed_notification(self, **kwargs):
10161021
# notifications and skipped end maint notifications
10171022
self.connection.reset_received_notifications()
10181023

1019-
if kwargs.get("notification"):
1020-
notification = kwargs["notification"]
1024+
if notification:
10211025
record_connection_relaxed_timeout(
10221026
connection_name=repr(self.connection),
10231027
maint_notification=notification.__class__.__name__,
@@ -1076,7 +1080,8 @@ def handle_oss_maintenance_completed_notification(
10761080
# process the same notification twice
10771081
return
10781082

1079-
logger.debug(f"Handling SMIGRATED notification: {notification}")
1083+
if logger.isEnabledFor(logging.DEBUG):
1084+
logger.debug(f"Handling SMIGRATED notification: {notification}")
10801085
self._in_progress.add(notification)
10811086

10821087
# Extract the information about the src and destination nodes that are affected
@@ -1130,7 +1135,16 @@ def handle_oss_maintenance_completed_notification(
11301135
# Some of them might be used by sub sub and we don't know which ones - so we disconnect
11311136
# all in flight connections after they are done with current command execution
11321137
for conn in current_node.redis_connection.connection_pool._get_in_use_connections():
1138+
add_debug_log_for_notification(
1139+
conn, "SMIGRATED - mark for reconnect"
1140+
)
11331141
conn.mark_for_reconnect()
1142+
else:
1143+
if logger.isEnabledFor(logging.DEBUG):
1144+
logger.debug(
1145+
f"SMIGRATED: Node {current_node.name} not affected by maintenance, "
1146+
f"skipping mark for reconnect"
1147+
)
11341148

11351149
if (
11361150
current_node

tests/test_asyncio/test_cluster.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3162,6 +3162,44 @@ def cmd_init_mock(
31623162
class TestClusterPipeline:
31633163
"""Tests for the ClusterPipeline class."""
31643164

3165+
async def test_pipeline_nodes_manager_property(self) -> None:
3166+
"""
3167+
Test that ClusterPipeline exposes nodes_manager property
3168+
that delegates to the cluster client's nodes_manager.
3169+
"""
3170+
r = await get_mocked_redis_client(host=default_host, port=default_port)
3171+
try:
3172+
pipeline = r.pipeline()
3173+
# Verify that nodes_manager property exists and returns the same object
3174+
# as the cluster client's nodes_manager
3175+
assert pipeline.nodes_manager is r.nodes_manager
3176+
# Verify that we can access nodes_manager attributes
3177+
assert pipeline.nodes_manager.default_node is not None
3178+
finally:
3179+
await r.aclose()
3180+
3181+
async def test_pipeline_set_response_callback(self) -> None:
3182+
"""
3183+
Test that ClusterPipeline exposes set_response_callback method
3184+
that delegates to the cluster client's set_response_callback.
3185+
"""
3186+
r = await get_mocked_redis_client(host=default_host, port=default_port)
3187+
try:
3188+
pipeline = r.pipeline()
3189+
3190+
# Define a custom callback
3191+
def custom_callback(response):
3192+
return f"custom_{response}"
3193+
3194+
# Set the callback via the pipeline
3195+
pipeline.set_response_callback("CUSTOM_CMD", custom_callback)
3196+
3197+
# Verify that the callback was set on the cluster client
3198+
assert "CUSTOM_CMD" in r.response_callbacks
3199+
assert r.response_callbacks["CUSTOM_CMD"] is custom_callback
3200+
finally:
3201+
await r.aclose()
3202+
31653203
async def test_blocked_arguments(self, r: RedisCluster) -> None:
31663204
"""Test handling for blocked pipeline arguments."""
31673205

tests/test_asyncio/test_pipeline.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,26 @@ async def test_pipeline_with_msetex(self, r):
466466
{"key1_transaction": "value1", "key2_transaction": "value2"}, ex=10
467467
)
468468

469+
async def test_pipeline_json_module_access(self, r):
470+
"""
471+
Test that pipeline can access the json() module.
472+
The JSON module requires nodes_manager (for cluster) and set_response_callback
473+
on the client during initialization.
474+
475+
"""
476+
pipeline = r.pipeline()
477+
478+
# This should not raise an AttributeError
479+
json_pipeline = pipeline.json()
480+
481+
# Verify the JSON module was created successfully
482+
assert json_pipeline is not None
483+
assert json_pipeline.client is pipeline
484+
485+
# Verify that JSON callbacks were registered
486+
assert "JSON.SET" in r.response_callbacks
487+
assert "JSON.GET" in r.response_callbacks
488+
469489

470490
@pytest.mark.asyncio
471491
class TestAsyncPipelineOperationDurationMetricsRecording:

tests/test_scenario/conftest.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,39 @@ def extract_cluster_fqdn(url):
226226
return f"https://{cleaned_hostname}"
227227

228228

229+
def _prepare_ssl_certificates(cert_chain: bool) -> dict:
230+
"""
231+
Prepare SSL certificates for Redis cluster connection.
232+
233+
Args:
234+
cert_chain: PEM-encoded certificate chain containing client cert + intermediate + CA cert.
235+
This is the full certificate chain that will be used to validate the server.
236+
237+
Returns:
238+
dict: SSL configuration kwargs for RedisCluster
239+
"""
240+
certs_config_path = os.environ.get("MTLS_CONFIG_PATH", None)
241+
242+
if not cert_chain:
243+
return {
244+
"ssl_cert_reqs": "none",
245+
"ssl_check_hostname": False,
246+
}
247+
248+
if not certs_config_path:
249+
raise ValueError(
250+
"MTLS enabled test is triggered but MTLS_CONFIG_PATH environment variable not set"
251+
)
252+
253+
# The cert_chain contains the full chain (client cert + intermediate + root CA)
254+
# Use it as CA data for validating the server's certificate
255+
return {
256+
"ssl_cert_reqs": "none",
257+
"ssl_keyfile": os.path.join(certs_config_path, "client.key"),
258+
"ssl_certfile": os.path.join(certs_config_path, "client.crt"),
259+
}
260+
261+
229262
@pytest.fixture()
230263
def client_maint_notifications(endpoints_config):
231264
return _get_client_maint_notifications(endpoints_config)
@@ -307,8 +340,8 @@ def get_cluster_client_maint_notifications(
307340
enable_relaxed_timeout: bool = True,
308341
enable_proactive_reconnect: bool = True,
309342
disable_retries: bool = False,
343+
auth_ssl_client_certs: bool = False,
310344
socket_timeout: Optional[float] = None,
311-
host_config: Optional[str] = None,
312345
):
313346
"""Create Redis cluster client with maintenance notifications enabled."""
314347
# Get credentials from the configuration
@@ -337,6 +370,13 @@ def get_cluster_client_maint_notifications(
337370
tls_enabled = True if parsed.scheme == "rediss" else False
338371
logging.info(f"TLS enabled: {tls_enabled}")
339372

373+
tls_kwargs = {"ssl": tls_enabled}
374+
375+
if tls_enabled:
376+
# Prepare SSL certificate configuration
377+
ssl_config = _prepare_ssl_certificates(auth_ssl_client_certs)
378+
tls_kwargs.update(ssl_config)
379+
340380
# Configure maintenance notifications
341381
maintenance_config = MaintNotificationsConfig(
342382
enabled=enable_maintenance_notifications,
@@ -352,12 +392,10 @@ def get_cluster_client_maint_notifications(
352392
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
353393
username=username,
354394
password=password,
355-
ssl=tls_enabled,
356-
ssl_cert_reqs="none",
357-
ssl_check_hostname=False,
358395
protocol=protocol, # RESP3 required for push notifications
359396
maint_notifications_config=maintenance_config,
360397
retry=retry,
398+
**tls_kwargs,
361399
)
362400
logging.info("Redis cluster client created with maintenance notifications enabled")
363401
logging.info(

0 commit comments

Comments
 (0)