Skip to content

Commit e689342

Browse files
committed
[async] Adding access to cluster client's nodes_manager and set_response_callback in ClusterPipeline objects
1 parent abf5bcb commit e689342

3 files changed

Lines changed: 67 additions & 0 deletions

File tree

redis/asyncio/cluster.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,15 @@ def __init__(
19891989
else TransactionStrategy(self)
19901990
)
19911991

1992+
@property
1993+
def nodes_manager(self) -> "NodesManager":
1994+
"""Get the nodes manager from the cluster client."""
1995+
return self.cluster_client.nodes_manager
1996+
1997+
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
1998+
"""Set a custom response callback on the cluster client."""
1999+
self.cluster_client.set_response_callback(command, callback)
2000+
19922001
async def initialize(self) -> "ClusterPipeline":
19932002
await self._execution_strategy.initialize()
19942003
return self

tests/test_asyncio/test_cluster.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3162,6 +3162,44 @@ def cmd_init_mock(
31623162
class TestClusterPipeline:
31633163
"""Tests for the ClusterPipeline class."""
31643164

3165+
async def test_pipeline_nodes_manager_property(self) -> None:
3166+
"""
3167+
Test that ClusterPipeline exposes nodes_manager property
3168+
that delegates to the cluster client's nodes_manager.
3169+
"""
3170+
r = await get_mocked_redis_client(host=default_host, port=default_port)
3171+
try:
3172+
pipeline = r.pipeline()
3173+
# Verify that nodes_manager property exists and returns the same object
3174+
# as the cluster client's nodes_manager
3175+
assert pipeline.nodes_manager is r.nodes_manager
3176+
# Verify that we can access nodes_manager attributes
3177+
assert pipeline.nodes_manager.default_node is not None
3178+
finally:
3179+
await r.aclose()
3180+
3181+
async def test_pipeline_set_response_callback(self) -> None:
3182+
"""
3183+
Test that ClusterPipeline exposes set_response_callback method
3184+
that delegates to the cluster client's set_response_callback.
3185+
"""
3186+
r = await get_mocked_redis_client(host=default_host, port=default_port)
3187+
try:
3188+
pipeline = r.pipeline()
3189+
3190+
# Define a custom callback
3191+
def custom_callback(response):
3192+
return f"custom_{response}"
3193+
3194+
# Set the callback via the pipeline
3195+
pipeline.set_response_callback("CUSTOM_CMD", custom_callback)
3196+
3197+
# Verify that the callback was set on the cluster client
3198+
assert "CUSTOM_CMD" in r.response_callbacks
3199+
assert r.response_callbacks["CUSTOM_CMD"] is custom_callback
3200+
finally:
3201+
await r.aclose()
3202+
31653203
async def test_blocked_arguments(self, r: RedisCluster) -> None:
31663204
"""Test handling for blocked pipeline arguments."""
31673205

tests/test_asyncio/test_pipeline.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,26 @@ async def test_pipeline_with_msetex(self, r):
466466
{"key1_transaction": "value1", "key2_transaction": "value2"}, ex=10
467467
)
468468

469+
async def test_pipeline_json_module_access(self, r):
470+
"""
471+
Test that pipeline can access the json() module.
472+
The JSON module requires nodes_manager (for cluster) and set_response_callback
473+
on the client during initialization.
474+
475+
"""
476+
pipeline = r.pipeline()
477+
478+
# This should not raise an AttributeError
479+
json_pipeline = pipeline.json()
480+
481+
# Verify the JSON module was created successfully
482+
assert json_pipeline is not None
483+
assert json_pipeline.client is pipeline
484+
485+
# Verify that JSON callbacks were registered
486+
assert "JSON.SET" in r.response_callbacks
487+
assert "JSON.GET" in r.response_callbacks
488+
469489

470490
@pytest.mark.asyncio
471491
class TestAsyncPipelineOperationDurationMetricsRecording:

0 commit comments

Comments
 (0)