diff --git a/redis/cluster.py b/redis/cluster.py index b6104be0dc..f7ac34ab3a 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1560,7 +1560,7 @@ def _execute_command(self, target_node, *args, **kwargs): self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise @@ -1576,7 +1576,7 @@ def _execute_command(self, target_node, *args, **kwargs): self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise @@ -1615,11 +1615,10 @@ def _execute_command(self, target_node, *args, **kwargs): # DON'T set redis_connection = None - keep the pool for reuse self.nodes_manager.initialize() - e.connection = connection self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise e @@ -1723,17 +1722,19 @@ def _execute_command(self, target_node, *args, **kwargs): self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise except ResponseError as e: # this is used to report the metrics based on host and port info - e.connection = connection + # ResponseError typically happens after get_connection() succeeds, + # so connection should be available + e.connection = connection if connection else target_node self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise @@ -1748,7 +1749,7 @@ def _execute_command(self, target_node, *args, **kwargs): self._record_command_metric( command_name=command, duration_seconds=time.monotonic() - start_time, - connection=connection, + connection=e.connection, error=e, ) raise e @@ -1780,12 +1781,16 @@ def _record_command_metric( """ Records operation duration metric directly. """ + host = connection.host if connection else "unknown" + port = connection.port if connection else 0 + db = str(connection.db) if connection and hasattr(connection, "db") else "0" + record_operation_duration( command_name=command_name, duration_seconds=duration_seconds, - server_address=connection.host, - server_port=connection.port, - db_namespace=str(connection.db), + server_address=host, + server_port=port, + db_namespace=db, error=error, ) diff --git a/tests/test_asyncio/test_observability/test_cluster_metrics_error_handling.py b/tests/test_asyncio/test_observability/test_cluster_metrics_error_handling.py new file mode 100644 index 0000000000..381dfc1c21 --- /dev/null +++ b/tests/test_asyncio/test_observability/test_cluster_metrics_error_handling.py @@ -0,0 +1,416 @@ +""" +Unit tests for async cluster metrics recording during error handling. + +These tests verify that the async cluster error handling correctly records metrics +when exceptions occur, even when using ClusterNode objects instead of Connection objects. +""" + +import pytest +from unittest.mock import MagicMock, patch, AsyncMock +from redis.asyncio.cluster import RedisCluster, ClusterNode +from redis.exceptions import ( + AuthenticationError, + ConnectionError as RedisConnectionError, + TimeoutError as RedisTimeoutError, + ClusterDownError, + SlotNotCoveredError, + MaxConnectionsError, + ResponseError, +) + + +@pytest.mark.asyncio +class TestAsyncClusterMetricsRecordingDuringErrorHandling: + """ + Tests for async cluster metrics recording during error handling. + + These tests verify that when exceptions occur during command execution, + metrics are recorded correctly using either the Connection object (when + available) or the ClusterNode (as fallback when connection is None). + """ + + async def test_authentication_error_uses_target_node_for_metrics(self): + """ + Test that AuthenticationError uses target_node for metrics when connection is None. + + AuthenticationError typically occurs during connection establishment. + Since the error is raised before connection is established, we use + target_node for metrics. + """ + target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=AuthenticationError("Auth failed"), + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with pytest.raises(AuthenticationError) as exc_info: + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert "Auth failed" in str(exc_info.value) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "127.0.0.1" + assert call_kwargs["server_port"] == 7000 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], AuthenticationError) + + async def test_connection_error_uses_target_node_when_no_connection(self): + """ + Test that ConnectionError records metrics with target_node. + + This validates the async implementation handles the case where connection + fails and metrics are recorded using ClusterNode data. + """ + target_node = ClusterNode(host="10.0.0.50", port=7001, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster.nodes_manager.move_node_to_end_of_cached_nodes = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=RedisConnectionError("Connection refused"), + ): + with patch.object( + ClusterNode, "update_active_connections_for_reconnect" + ): + with patch.object( + ClusterNode, + "disconnect_free_connections", + new_callable=AsyncMock, + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with pytest.raises(RedisConnectionError) as exc_info: + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert "Connection refused" in str(exc_info.value) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "10.0.0.50" + assert call_kwargs["server_port"] == 7001 + assert call_kwargs["db_namespace"] == "0" + assert isinstance( + call_kwargs["error"], RedisConnectionError + ) + + async def test_response_error_uses_target_node(self): + """ + Test that ResponseError uses target_node for metrics. + + When a command succeeds in reaching the server but gets an error response, + we use the target_node for metrics since async cluster doesn't have a + persistent connection object in the same way sync does. + """ + target_node = ClusterNode(host="172.16.0.10", port=6380, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=ResponseError("WRONGTYPE Operation against a key"), + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with pytest.raises(ResponseError) as exc_info: + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert "WRONGTYPE" in str(exc_info.value) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "172.16.0.10" + assert call_kwargs["server_port"] == 6380 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], ResponseError) + + async def test_max_connections_error_records_metrics_with_cluster_node(self): + """ + Test that MaxConnectionsError records metrics using ClusterNode info. + + When MaxConnectionsError occurs, connection is None because we couldn't + get a connection from the pool. Metrics should be recorded using the + ClusterNode's host/port. + """ + target_node = ClusterNode( + host="192.168.1.100", port=7005, server_type="primary" + ) + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=MaxConnectionsError("Pool exhausted"), + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with pytest.raises(MaxConnectionsError): + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["server_address"] == "192.168.1.100" + assert call_kwargs["server_port"] == 7005 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], MaxConnectionsError) + + async def test_timeout_error_uses_target_node_for_metrics(self): + """ + Test that TimeoutError records metrics with target_node data. + """ + target_node = ClusterNode(host="10.0.0.100", port=7003, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster.nodes_manager.move_node_to_end_of_cached_nodes = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=RedisTimeoutError("Timeout connecting"), + ): + with patch.object( + ClusterNode, "update_active_connections_for_reconnect" + ): + with patch.object( + ClusterNode, + "disconnect_free_connections", + new_callable=AsyncMock, + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with pytest.raises(RedisTimeoutError) as exc_info: + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert "Timeout" in str(exc_info.value) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["server_address"] == "10.0.0.100" + assert call_kwargs["server_port"] == 7003 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], RedisTimeoutError) + + async def test_cluster_down_error_with_cluster_node_metrics(self): + """ + Test that ClusterDownError records metrics correctly with target_node data. + """ + target_node = ClusterNode(host="172.20.0.10", port=7006, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.aclose = AsyncMock() + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=ClusterDownError("CLUSTERDOWN"), + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with patch("asyncio.sleep", new_callable=AsyncMock): + with pytest.raises(ClusterDownError): + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["server_address"] == "172.20.0.10" + assert call_kwargs["server_port"] == 7006 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], ClusterDownError) + + async def test_slot_not_covered_error_with_cluster_node_metrics(self): + """ + Test that SlotNotCoveredError records metrics correctly with target_node data. + """ + target_node = ClusterNode(host="172.20.0.20", port=7007, server_type="primary") + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.aclose = AsyncMock() + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + side_effect=SlotNotCoveredError("Slot 1234 not covered"), + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + with patch("asyncio.sleep", new_callable=AsyncMock): + with pytest.raises(SlotNotCoveredError): + await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["server_address"] == "172.20.0.20" + assert call_kwargs["server_port"] == 7007 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], SlotNotCoveredError) + + async def test_successful_command_records_metrics_with_connection_db(self): + """ + Test that successful command execution records metrics with proper db value. + + In async cluster, the execute_command is called on target_node directly, + so we use target_node's connection_kwargs for db lookup. + """ + from redis._parsers.commands import ResponsePolicy + + target_node = ClusterNode( + host="192.168.50.10", port=7008, server_type="primary", db=3 + ) + + with patch.object(RedisCluster, "__init__", return_value=None): + cluster = RedisCluster.__new__(RedisCluster) + cluster.nodes_manager = MagicMock() + cluster._initialize = False + cluster.RedisClusterRequestTTL = 3 + cluster.retry = MagicMock() + cluster.retry.get_retries.return_value = 0 + cluster._parse_target_nodes = MagicMock(return_value=[target_node]) + cluster._policy_resolver = MagicMock() + cluster._policy_resolver.resolve = AsyncMock(return_value=None) + cluster.command_flags = {} + cluster.result_callbacks = {} + cluster._policies_callback_mapping = { + ResponsePolicy.DEFAULT_KEYLESS: lambda x: x, + ResponsePolicy.DEFAULT_KEYED: lambda x: x, + } + + with patch.object( + ClusterNode, + "execute_command", + new_callable=AsyncMock, + return_value=b"value", + ): + with patch( + "redis.asyncio.cluster.record_operation_duration", + new_callable=AsyncMock, + ) as mock_record: + result = await cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert result == b"value" + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "192.168.50.10" + assert call_kwargs["server_port"] == 7008 + assert call_kwargs["db_namespace"] == "3" + assert call_kwargs.get("error") is None diff --git a/tests/test_observability/test_cluster_metrics_error_handling.py b/tests/test_observability/test_cluster_metrics_error_handling.py index d8dec934f3..7d030395b9 100644 --- a/tests/test_observability/test_cluster_metrics_error_handling.py +++ b/tests/test_observability/test_cluster_metrics_error_handling.py @@ -14,41 +14,98 @@ MaxConnectionsError, ConnectionError as RedisConnectionError, ResponseError, + TimeoutError as RedisTimeoutError, + ClusterDownError, + SlotNotCoveredError, ) @pytest.mark.onlycluster -class TestClusterErrorHandlingMetrics: - """Tests for cluster error handling with metrics.""" +class TestClusterMetricsRecordingDuringErrorHandling: + """ + Tests for cluster metrics recording during error handling. - def test_authentication_error_uses_connection_when_available(self): + These tests verify that when exceptions occur during command execution, + metrics are recorded correctly using either the Connection object (when + available) or the ClusterNode (as fallback when connection is None). + """ + + def test_authentication_error_uses_target_node_for_metrics(self): + """ + Test that AuthenticationError uses target_node for metrics when connection is None. + + AuthenticationError typically occurs during get_connection() when the connection + is being established and authenticated. Since the error is raised before + get_connection() returns, the connection variable is still None, so we + fall back to target_node for metrics. + """ + target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") + + with patch("redis.cluster.NodesManager") as MockNodesManager: + mock_nodes_manager = MagicMock() + mock_nodes_manager.initialize.return_value = None + mock_nodes_manager.default_node = target_node + MockNodesManager.return_value = mock_nodes_manager + + with patch("redis.cluster.CommandsParser"): + cluster = RedisCluster(host="127.0.0.1", port=7000) + + mock_redis_conn = MagicMock() + + with patch.object( + cluster, "get_redis_connection", return_value=mock_redis_conn + ): + with patch( + "redis.cluster.get_connection", + side_effect=AuthenticationError("Auth failed"), + ): + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(AuthenticationError) as exc_info: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert hasattr(exc_info.value, "connection") + assert exc_info.value.connection == target_node + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "127.0.0.1" + assert call_kwargs["server_port"] == 7000 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], AuthenticationError) + + def test_connection_error_uses_connection_when_available(self): """ - Test that AuthenticationError uses connection when available, otherwise target_node. + Test that ConnectionError uses actual connection if available. - This validates the error handling in cluster.py lines 1558-1564. - The code prefers the actual connection object when available. + When the error occurs AFTER get_connection() returns (e.g., during + parse_response), the connection object is available for metrics. """ - # Create a real ClusterNode target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") - # Create cluster with mocked NodesManager with patch("redis.cluster.NodesManager") as MockNodesManager: mock_nodes_manager = MagicMock() mock_nodes_manager.initialize.return_value = None mock_nodes_manager.default_node = target_node + mock_nodes_manager.move_node_to_end_of_cached_nodes = MagicMock() MockNodesManager.return_value = mock_nodes_manager with patch("redis.cluster.CommandsParser"): cluster = RedisCluster(host="127.0.0.1", port=7000) - # Mock get_redis_connection to return a redis connection mock_redis_conn = MagicMock() - # Make parse_response raise AuthenticationError (simulates auth failure) - mock_redis_conn.parse_response.side_effect = AuthenticationError( - "Auth failed" + mock_redis_conn.parse_response.side_effect = RedisConnectionError( + "Connection lost" ) mock_connection = MagicMock() + mock_connection.host = "192.168.1.100" + mock_connection.port = 6379 + mock_connection.db = 3 with patch.object( cluster, "get_redis_connection", return_value=mock_redis_conn @@ -56,70 +113,86 @@ def test_authentication_error_uses_connection_when_available(self): with patch( "redis.cluster.get_connection", return_value=mock_connection ): - # Execute command and expect AuthenticationError - with pytest.raises(AuthenticationError) as exc_info: - cluster._execute_command(target_node, "GET", "key") + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(RedisConnectionError) as exc_info: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) - # Verify the library code set connection attribute to the connection - # (prefers connection over target_node when connection is available) - assert hasattr(exc_info.value, "connection") - assert exc_info.value.connection == mock_connection + assert hasattr(exc_info.value, "connection") + assert exc_info.value.connection == mock_connection + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "192.168.1.100" + assert call_kwargs["server_port"] == 6379 + assert call_kwargs["db_namespace"] == "3" + assert isinstance( + call_kwargs["error"], RedisConnectionError + ) - def test_max_connections_error_uses_target_node_for_metrics(self): + def test_connection_error_uses_target_node_when_no_connection(self): """ - Test that MaxConnectionsError uses target_node for metrics when connection - pool is exhausted. + Test that ConnectionError uses target_node when connection is not available. - This validates the error handling in cluster.py lines 1565-1574. + When ConnectionError occurs DURING get_connection() (before it returns), + the connection variable is None. The code should fall back to using + target_node for metrics to provide valid host/port information. """ - # Create a real ClusterNode - target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") + target_node = ClusterNode(host="10.0.0.50", port=7001, server_type="primary") - # Create cluster with mocked NodesManager with patch("redis.cluster.NodesManager") as MockNodesManager: mock_nodes_manager = MagicMock() mock_nodes_manager.initialize.return_value = None mock_nodes_manager.default_node = target_node + mock_nodes_manager.move_node_to_end_of_cached_nodes = MagicMock() MockNodesManager.return_value = mock_nodes_manager with patch("redis.cluster.CommandsParser"): cluster = RedisCluster(host="127.0.0.1", port=7000) - # Mock get_redis_connection to return a redis connection mock_redis_conn = MagicMock() - # Make get_connection raise MaxConnectionsError (simulates pool exhaustion) + with patch.object( cluster, "get_redis_connection", return_value=mock_redis_conn ): with patch( "redis.cluster.get_connection", - side_effect=MaxConnectionsError("Pool exhausted"), + side_effect=RedisConnectionError("Cannot connect"), ): - # Mock _record_command_metric since connection is None - with patch.object( - cluster, "_record_command_metric" + with patch( + "redis.cluster.record_operation_duration" ) as mock_record: - # Execute command and expect MaxConnectionsError - with pytest.raises(MaxConnectionsError) as exc_info: - cluster._execute_command(target_node, "GET", "key") + with pytest.raises(RedisConnectionError) as exc_info: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) - # Verify the library code set connection attribute to target_node assert hasattr(exc_info.value, "connection") assert exc_info.value.connection == target_node - # Verify _record_command_metric was called - assert mock_record.called - def test_connection_error_uses_connection_if_available(self): + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "10.0.0.50" + assert call_kwargs["server_port"] == 7001 + assert call_kwargs["db_namespace"] == "0" + assert isinstance( + call_kwargs["error"], RedisConnectionError + ) + + def test_response_error_uses_connection(self): """ - Test that ConnectionError uses actual connection if available. + Test that ResponseError uses the actual connection for metrics. - This validates the error handling in cluster.py lines 1575-1605 where - ConnectionError is caught and e.connection is set to connection if available. + ResponseError typically occurs after get_connection() succeeds (during + parse_response), so we should have a valid connection for metrics. """ - # Create a real ClusterNode target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") - # Create cluster with mocked NodesManager with patch("redis.cluster.NodesManager") as MockNodesManager: mock_nodes_manager = MagicMock() mock_nodes_manager.initialize.return_value = None @@ -129,16 +202,13 @@ def test_connection_error_uses_connection_if_available(self): with patch("redis.cluster.CommandsParser"): cluster = RedisCluster(host="127.0.0.1", port=7000) - # Mock get_redis_connection to return a redis connection mock_redis_conn = MagicMock() - # Make parse_response raise ConnectionError after connection is obtained - mock_redis_conn.parse_response.side_effect = RedisConnectionError( - "Connection lost" - ) + mock_redis_conn.parse_response.side_effect = ResponseError("WRONGTYPE") mock_connection = MagicMock() - mock_connection.host = "127.0.0.1" - mock_connection.port = 7000 + mock_connection.host = "172.16.0.10" + mock_connection.port = 6380 + mock_connection.db = 2 with patch.object( cluster, "get_redis_connection", return_value=mock_redis_conn @@ -146,27 +216,39 @@ def test_connection_error_uses_connection_if_available(self): with patch( "redis.cluster.get_connection", return_value=mock_connection ): - # Execute command and expect ConnectionError - with pytest.raises(RedisConnectionError) as exc_info: - cluster._execute_command(target_node, "GET", "key") + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(ResponseError) as exc_info: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) - # Verify the library code set connection attribute to the actual connection - assert hasattr(exc_info.value, "connection") - assert exc_info.value.connection == mock_connection + assert hasattr(exc_info.value, "connection") + assert exc_info.value.connection == mock_connection - def test_connection_error_uses_target_node_when_no_connection(self): + mock_record.assert_called_once() + call_kwargs = mock_record.call_args.kwargs + assert call_kwargs["command_name"] == "GET" + assert call_kwargs["server_address"] == "172.16.0.10" + assert call_kwargs["server_port"] == 6380 + assert call_kwargs["db_namespace"] == "2" + assert isinstance(call_kwargs["error"], ResponseError) + + def test_max_connections_error_records_metrics_with_cluster_node(self): """ - Test that ConnectionError uses target_node when connection is not available. + Test that MaxConnectionsError records metrics using ClusterNode info. - This validates the error handling in cluster.py lines 1575-1605 where - ConnectionError is caught and e.connection is set. - Note: The current implementation sets e.connection = connection (which may be None) - after the initial assignment of target_node. + When MaxConnectionsError occurs, connection is None because we couldn't + get a connection from the pool. The code sets e.connection = target_node + and metrics should be recorded using the ClusterNode's host/port. """ - # Create a real ClusterNode target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") - # Create cluster with mocked NodesManager + assert not hasattr(target_node, "db") + assert hasattr(target_node, "host") + assert hasattr(target_node, "port") + with patch("redis.cluster.NodesManager") as MockNodesManager: mock_nodes_manager = MagicMock() mock_nodes_manager.initialize.return_value = None @@ -176,40 +258,37 @@ def test_connection_error_uses_target_node_when_no_connection(self): with patch("redis.cluster.CommandsParser"): cluster = RedisCluster(host="127.0.0.1", port=7000) - # Mock get_redis_connection to return a redis connection mock_redis_conn = MagicMock() - - # Make get_connection raise ConnectionError before connection is obtained with patch.object( cluster, "get_redis_connection", return_value=mock_redis_conn ): with patch( "redis.cluster.get_connection", - side_effect=RedisConnectionError("Cannot connect"), + side_effect=MaxConnectionsError("Pool exhausted"), ): - # Mock _record_command_metric since connection is None - with patch.object( - cluster, "_record_command_metric" - ) as mock_record: - # Execute command and expect ConnectionError - with pytest.raises(RedisConnectionError) as exc_info: - cluster._execute_command(target_node, "GET", "key") - - # Verify the library code set connection attribute - assert hasattr(exc_info.value, "connection") - # Verify _record_command_metric was called - assert mock_record.called - - def test_response_error_uses_connection(self): + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record_duration: + with pytest.raises(MaxConnectionsError): + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record_duration.assert_called() + call_kwargs = mock_record_duration.call_args[1] + assert call_kwargs["server_address"] == "127.0.0.1" + assert call_kwargs["server_port"] == 7000 + assert call_kwargs["db_namespace"] == "0" + + def test_successful_command_records_metrics_with_connection_db(self): """ - Test that ResponseError uses the actual connection for metrics. + Test that successful command execution records metrics with Connection's db. - This validates the error handling in cluster.py lines 1704-1713. + When a command succeeds, we have an actual Connection object which has + a db attribute. Verify the metrics use the actual db value. """ - # Create a real ClusterNode target_node = ClusterNode(host="127.0.0.1", port=7000, server_type="primary") - # Create cluster with mocked NodesManager with patch("redis.cluster.NodesManager") as MockNodesManager: mock_nodes_manager = MagicMock() mock_nodes_manager.initialize.return_value = None @@ -219,14 +298,13 @@ def test_response_error_uses_connection(self): with patch("redis.cluster.CommandsParser"): cluster = RedisCluster(host="127.0.0.1", port=7000) - # Mock get_redis_connection to return a redis connection mock_redis_conn = MagicMock() - # Make parse_response raise ResponseError - mock_redis_conn.parse_response.side_effect = ResponseError("WRONGTYPE") + mock_redis_conn.parse_response.return_value = b"value" mock_connection = MagicMock() mock_connection.host = "127.0.0.1" mock_connection.port = 7000 + mock_connection.db = 5 with patch.object( cluster, "get_redis_connection", return_value=mock_redis_conn @@ -234,10 +312,139 @@ def test_response_error_uses_connection(self): with patch( "redis.cluster.get_connection", return_value=mock_connection ): - # Execute command and expect ResponseError - with pytest.raises(ResponseError) as exc_info: - cluster._execute_command(target_node, "GET", "key") + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record_duration: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + call_kwargs = mock_record_duration.call_args[1] + assert call_kwargs["db_namespace"] == "5" + + def test_timeout_error_uses_target_node_for_metrics(self): + """ + Test that TimeoutError uses target_node for metrics when connection is None. + + When TimeoutError occurs during get_connection(), connection is None. + The code uses target_node for metrics to provide valid host/port info. + """ + target_node = ClusterNode(host="10.0.0.100", port=7003, server_type="primary") + + with patch("redis.cluster.NodesManager") as MockNodesManager: + mock_nodes_manager = MagicMock() + mock_nodes_manager.initialize.return_value = None + mock_nodes_manager.default_node = target_node + mock_nodes_manager.move_node_to_end_of_cached_nodes = MagicMock() + MockNodesManager.return_value = mock_nodes_manager + + with patch("redis.cluster.CommandsParser"): + cluster = RedisCluster(host="127.0.0.1", port=7000) + + mock_redis_conn = MagicMock() + with patch.object( + cluster, "get_redis_connection", return_value=mock_redis_conn + ): + with patch( + "redis.cluster.get_connection", + side_effect=RedisTimeoutError("Timeout connecting to server"), + ): + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(RedisTimeoutError) as exc_info: + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + assert "Timeout" in str(exc_info.value) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args[1] + assert call_kwargs["server_address"] == "10.0.0.100" + assert call_kwargs["server_port"] == 7003 + assert call_kwargs["db_namespace"] == "0" + + def test_cluster_down_error_with_cluster_node_metrics(self): + """ + Test that ClusterDownError records metrics correctly when connection is None. + + When ClusterDownError occurs before connection is established, + e.connection is set to target_node (ClusterNode), and metrics should + be recorded with valid host/port from target_node. + """ + target_node = ClusterNode(host="172.20.0.10", port=7004, server_type="primary") + + with patch("redis.cluster.NodesManager") as MockNodesManager: + mock_nodes_manager = MagicMock() + mock_nodes_manager.initialize.return_value = None + mock_nodes_manager.default_node = target_node + MockNodesManager.return_value = mock_nodes_manager + + with patch("redis.cluster.CommandsParser"): + cluster = RedisCluster(host="127.0.0.1", port=7000) + + mock_redis_conn = MagicMock() + with patch.object( + cluster, "get_redis_connection", return_value=mock_redis_conn + ): + with patch( + "redis.cluster.get_connection", + side_effect=ClusterDownError("CLUSTERDOWN"), + ): + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(ClusterDownError): + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args[1] + assert call_kwargs["server_address"] == "172.20.0.10" + assert call_kwargs["server_port"] == 7004 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], ClusterDownError) + + def test_slot_not_covered_error_with_cluster_node_metrics(self): + """ + Test that SlotNotCoveredError records metrics correctly when connection is None. + + When SlotNotCoveredError occurs before connection is established, + e.connection is set to target_node, and metrics should be recorded + with valid host/port from target_node. + """ + target_node = ClusterNode(host="172.20.0.20", port=7005, server_type="primary") - # Verify the library code set connection attribute to the actual connection - assert hasattr(exc_info.value, "connection") - assert exc_info.value.connection == mock_connection + with patch("redis.cluster.NodesManager") as MockNodesManager: + mock_nodes_manager = MagicMock() + mock_nodes_manager.initialize.return_value = None + mock_nodes_manager.default_node = target_node + MockNodesManager.return_value = mock_nodes_manager + + with patch("redis.cluster.CommandsParser"): + cluster = RedisCluster(host="127.0.0.1", port=7000) + + mock_redis_conn = MagicMock() + with patch.object( + cluster, "get_redis_connection", return_value=mock_redis_conn + ): + with patch( + "redis.cluster.get_connection", + side_effect=SlotNotCoveredError("Slot 1234 not covered"), + ): + with patch( + "redis.cluster.record_operation_duration" + ) as mock_record: + with pytest.raises(SlotNotCoveredError): + cluster.execute_command( + "GET", "key", target_nodes=target_node + ) + + mock_record.assert_called_once() + call_kwargs = mock_record.call_args[1] + assert call_kwargs["server_address"] == "172.20.0.20" + assert call_kwargs["server_port"] == 7005 + assert call_kwargs["db_namespace"] == "0" + assert isinstance(call_kwargs["error"], SlotNotCoveredError)