Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,6 @@ def _disconnect_raise_on_watching(
failure_count: Optional[int] = None,
start_time: Optional[float] = None,
command_name: Optional[str] = None,
batch_size: Optional[int] = None,
) -> None:
"""
Close the connection, raise an exception if we were watching.
Expand All @@ -1910,7 +1909,6 @@ def _disconnect_raise_on_watching(
db_namespace=str(conn.db),
error=error,
retry_attempts=failure_count,
batch_size=batch_size,
)
conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand Down Expand Up @@ -1946,12 +1944,11 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
start_time = time.monotonic()
# Track actual retry attempts for error reporting
actual_retry_attempts = [0]
stack_len = len(stack)

def failure_callback(error, failure_count):
actual_retry_attempts[0] = failure_count
self._disconnect_raise_on_watching(
conn, error, failure_count, start_time, operation_name, stack_len
conn, error, failure_count, start_time, operation_name
)

try:
Expand All @@ -1967,7 +1964,6 @@ def failure_callback(error, failure_count):
server_address=getattr(conn, "host", None),
server_port=getattr(conn, "port", None),
db_namespace=str(conn.db),
batch_size=stack_len,
)
return response
except Exception as e:
Expand Down
3 changes: 0 additions & 3 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3349,7 +3349,6 @@ def _raise_first_error(self, stack, start_time):
record_operation_duration(
command_name="PIPELINE",
duration_seconds=time.monotonic() - start_time,
batch_size=len(stack),
error=r,
)

Expand Down Expand Up @@ -3554,7 +3553,6 @@ def _send_cluster_commands(
server_address=n.connection.host,
server_port=n.connection.port,
db_namespace=str(n.connection.db),
batch_size=len(n.commands),
)
nodes_read += 1
finally:
Expand Down Expand Up @@ -4042,7 +4040,6 @@ def _execute_transaction(
server_address=connection.host,
server_port=connection.port,
db_namespace=str(connection.db),
batch_size=len(self._command_queue),
)

# EXEC clears any watched keys
Expand Down
1 change: 0 additions & 1 deletion redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4519,7 +4519,6 @@ def xreadgroup(
record_streaming_lag_from_response(
response=response,
consumer_group=groupname,
consumer_name=consumername,
)

return response
Expand Down
8 changes: 0 additions & 8 deletions redis/observability/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
DB_SYSTEM = "db.system"
DB_NAMESPACE = "db.namespace"
DB_OPERATION_NAME = "db.operation.name"
DB_OPERATION_BATCH_SIZE = "db.operation.batch.size"
DB_RESPONSE_STATUS_CODE = "db.response.status_code"
DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"

Expand Down Expand Up @@ -60,7 +59,6 @@
REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
REDIS_CLIENT_CONSUMER_NAME = "redis.client.consumer_name"
REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"

Expand Down Expand Up @@ -158,9 +156,6 @@ def build_operation_attributes(
if command_name is not None:
attrs[DB_OPERATION_NAME] = command_name.upper()

if batch_size is not None:
attrs[DB_OPERATION_BATCH_SIZE] = batch_size

if network_peer_address is not None:
attrs[NETWORK_PEER_ADDRESS] = network_peer_address

Expand Down Expand Up @@ -304,9 +299,6 @@ def build_streaming_attributes(
if consumer_group is not None:
attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group

if consumer_name is not None:
attrs[REDIS_CLIENT_CONSUMER_NAME] = consumer_name

return attrs

@staticmethod
Expand Down
13 changes: 11 additions & 2 deletions redis/observability/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
get_pool_name,
)
from redis.observability.config import MetricGroup, OTelConfig
from redis.utils import deprecated_args

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -426,6 +427,11 @@ def record_connection_wait_time(

# Command execution metric recording methods

@deprecated_args(
args_to_warn=["batch_size"],
reason="The batch_size argument is no longer used and will be removed in the next major version.",
version="7.2.1",
Comment thread
vladvildanov marked this conversation as resolved.
)
def record_operation_duration(
self,
command_name: str,
Expand Down Expand Up @@ -473,7 +479,6 @@ def record_operation_duration(
attrs.update(
self.attr_builder.build_operation_attributes(
command_name=command_name,
batch_size=batch_size,
network_peer_address=network_peer_address,
network_peer_port=network_peer_port,
retry_attempts=retry_attempts,
Expand Down Expand Up @@ -582,6 +587,11 @@ def record_pubsub_message(

# Streaming metric recording methods

@deprecated_args(
args_to_warn=["consumer_name"],
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
version="7.2.1",
Comment thread
vladvildanov marked this conversation as resolved.
)
def record_streaming_lag(
self,
lag_seconds: float,
Expand All @@ -604,7 +614,6 @@ def record_streaming_lag(
attrs = self.attr_builder.build_streaming_attributes(
stream_name=stream_name,
consumer_group=consumer_group,
consumer_name=consumer_name,
)
self.stream_lag.record(lag_seconds, attributes=attrs)

Expand Down
21 changes: 16 additions & 5 deletions redis/observability/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from redis.observability.metrics import CloseReason, RedisMetricsCollector
from redis.observability.providers import get_observability_instance
from redis.observability.registry import get_observables_registry_instance
from redis.utils import str_if_bytes
from redis.utils import deprecated_args, str_if_bytes

if TYPE_CHECKING:
from redis.connection import ConnectionPoolInterface
Expand All @@ -46,6 +46,11 @@
CSC_ITEMS_REGISTRY_KEY = "csc_items"


@deprecated_args(
args_to_warn=["batch_size"],
reason="The batch_size argument is no longer used and will be removed in the next major version.",
version="7.2.1",
Comment thread
vladvildanov marked this conversation as resolved.
)
def record_operation_duration(
command_name: str,
duration_seconds: float,
Comment thread
vladvildanov marked this conversation as resolved.
Expand Down Expand Up @@ -100,7 +105,6 @@ def record_operation_duration(
network_peer_address=server_address,
network_peer_port=server_port,
is_blocking=is_blocking,
batch_size=batch_size,
retry_attempts=retry_attempts,
)
except Exception:
Expand Down Expand Up @@ -436,6 +440,11 @@ def record_pubsub_message(
pass


@deprecated_args(
args_to_warn=["consumer_name"],
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
version="7.2.1",
)
Comment thread
vladvildanov marked this conversation as resolved.
def record_streaming_lag(
lag_seconds: float,
stream_name: Optional[str] = None,
Comment thread
vladvildanov marked this conversation as resolved.
Expand Down Expand Up @@ -470,12 +479,16 @@ def record_streaming_lag(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
consumer_name=consumer_name,
)
except Exception:
pass


@deprecated_args(
args_to_warn=["consumer_name"],
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
version="7.2.1",
)
Comment thread
vladvildanov marked this conversation as resolved.
def record_streaming_lag_from_response(
response,
consumer_group: Optional[str] = None,
Comment thread
vladvildanov marked this conversation as resolved.
Expand Down Expand Up @@ -527,7 +540,6 @@ def record_streaming_lag_from_response(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
consumer_name=consumer_name,
)
else:
# RESP2 format: list
Expand All @@ -546,7 +558,6 @@ def record_streaming_lag_from_response(
lag_seconds=lag_seconds,
stream_name=effective_stream_name,
consumer_group=consumer_group,
consumer_name=consumer_name,
)
except Exception:
pass
Expand Down
61 changes: 0 additions & 61 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4420,32 +4420,6 @@ def test_pipeline_execute_records_metric(self, cluster_pipeline_with_otel):
attrs = call_args[1]["attributes"]
assert attrs["db.operation.name"] == "PIPELINE"

def test_pipeline_batch_size_recorded(self, cluster_pipeline_with_otel):
"""
Test that pipeline batch_size is correctly recorded.
"""
cluster, operation_duration_mock = cluster_pipeline_with_otel

# Execute a pipeline with 3 commands
pipe = cluster.pipeline()
pipe.set("batch_key", "value1")
pipe.get("batch_key")
pipe.delete("batch_key")
pipe.execute()

# Find the PIPELINE event call
pipeline_call = None
for call_obj in operation_duration_mock.record.call_args_list:
attrs = call_obj[1]["attributes"]
if attrs.get("db.operation.name") == "PIPELINE":
pipeline_call = call_obj
break

assert pipeline_call is not None
attrs = pipeline_call[1]["attributes"]
assert "db.operation.batch.size" in attrs
assert attrs["db.operation.batch.size"] == 3

def test_pipeline_server_attributes_recorded(self, cluster_pipeline_with_otel):
"""
Test that server address, port, and db namespace are recorded for pipeline.
Expand Down Expand Up @@ -4643,41 +4617,6 @@ def test_pipeline_multi_node_records_multiple_metrics(
assert "server.port" in attrs
assert "db.namespace" in attrs

def test_pipeline_metric_contains_batch_size_per_node(
self, cluster_pipeline_with_otel
):
"""
Test that pipeline metrics contain correct batch_size for each node.
"""
cluster, operation_duration_mock = cluster_pipeline_with_otel

# Execute pipeline with commands
pipe = cluster.pipeline()
pipe.set("batch_node_key1", "value1")
pipe.set("batch_node_key2", "value2")
pipe.set("batch_node_key3", "value3")
pipe.execute()

# Find PIPELINE events
pipeline_calls = [
call_obj
for call_obj in operation_duration_mock.record.call_args_list
if call_obj[1]["attributes"].get("db.operation.name") == "PIPELINE"
]

# Should have at least one PIPELINE event
assert len(pipeline_calls) >= 1

# Each event should have batch_size
total_batch_size = 0
for call_obj in pipeline_calls:
attrs = call_obj[1]["attributes"]
assert "db.operation.batch.size" in attrs
total_batch_size += attrs["db.operation.batch.size"]

# Total batch size should equal number of commands
assert total_batch_size == 3

def test_pipeline_duration_recorded_per_node(self, cluster_pipeline_with_otel):
"""
Test that pipeline duration is recorded for each node's commands.
Expand Down
26 changes: 0 additions & 26 deletions tests/test_cluster_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,32 +532,6 @@ def test_transaction_server_attributes_recorded(

assert "db.namespace" in attrs

def test_transaction_batch_size_recorded(self, cluster_transaction_with_otel):
"""
Test that transaction batch_size is correctly recorded.
"""
cluster, operation_duration_mock = cluster_transaction_with_otel

# Execute a transaction with 3 commands
with cluster.pipeline(transaction=True) as tx:
tx.set("{batch}key1", "value1")
tx.get("{batch}key1")
tx.delete("{batch}key1")
tx.execute()

# Find the TRANSACTION event call
transaction_call = None
for call_obj in operation_duration_mock.record.call_args_list:
attrs = call_obj[1]["attributes"]
if attrs.get("db.operation.name") == "TRANSACTION":
transaction_call = call_obj
break

assert transaction_call is not None
attrs = transaction_call[1]["attributes"]
assert "db.operation.batch.size" in attrs
assert attrs["db.operation.batch.size"] == 3

def test_transaction_duration_is_positive(self, cluster_transaction_with_otel):
"""
Test that the recorded duration for transaction is a positive float.
Expand Down
Loading
Loading