Skip to content

Commit a21f768

Browse files
vladvildanovpetyaslavova
authored andcommitted
Removed batch_size and consumer_name attributes from OTel metrics (#3978)
* Removed batch_size and consumer_name attributes from OTel metrics * Exclude args from linters * Left only #noqa
1 parent 2098114 commit a21f768

10 files changed

Lines changed: 42 additions & 168 deletions

File tree

redis/client.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,7 +1890,6 @@ def _disconnect_raise_on_watching(
18901890
failure_count: Optional[int] = None,
18911891
start_time: Optional[float] = None,
18921892
command_name: Optional[str] = None,
1893-
batch_size: Optional[int] = None,
18941893
) -> None:
18951894
"""
18961895
Close the connection, raise an exception if we were watching.
@@ -1910,7 +1909,6 @@ def _disconnect_raise_on_watching(
19101909
db_namespace=str(conn.db),
19111910
error=error,
19121911
retry_attempts=failure_count,
1913-
batch_size=batch_size,
19141912
)
19151913
conn.disconnect()
19161914
# if we were watching a variable, the watch is no longer valid
@@ -1946,12 +1944,11 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
19461944
start_time = time.monotonic()
19471945
# Track actual retry attempts for error reporting
19481946
actual_retry_attempts = [0]
1949-
stack_len = len(stack)
19501947

19511948
def failure_callback(error, failure_count):
19521949
actual_retry_attempts[0] = failure_count
19531950
self._disconnect_raise_on_watching(
1954-
conn, error, failure_count, start_time, operation_name, stack_len
1951+
conn, error, failure_count, start_time, operation_name
19551952
)
19561953

19571954
try:
@@ -1967,7 +1964,6 @@ def failure_callback(error, failure_count):
19671964
server_address=getattr(conn, "host", None),
19681965
server_port=getattr(conn, "port", None),
19691966
db_namespace=str(conn.db),
1970-
batch_size=stack_len,
19711967
)
19721968
return response
19731969
except Exception as e:

redis/cluster.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3349,7 +3349,6 @@ def _raise_first_error(self, stack, start_time):
33493349
record_operation_duration(
33503350
command_name="PIPELINE",
33513351
duration_seconds=time.monotonic() - start_time,
3352-
batch_size=len(stack),
33533352
error=r,
33543353
)
33553354

@@ -3554,7 +3553,6 @@ def _send_cluster_commands(
35543553
server_address=n.connection.host,
35553554
server_port=n.connection.port,
35563555
db_namespace=str(n.connection.db),
3557-
batch_size=len(n.commands),
35583556
)
35593557
nodes_read += 1
35603558
finally:
@@ -4042,7 +4040,6 @@ def _execute_transaction(
40424040
server_address=connection.host,
40434041
server_port=connection.port,
40444042
db_namespace=str(connection.db),
4045-
batch_size=len(self._command_queue),
40464043
)
40474044

40484045
# EXEC clears any watched keys

redis/commands/core.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4519,7 +4519,6 @@ def xreadgroup(
45194519
record_streaming_lag_from_response(
45204520
response=response,
45214521
consumer_group=groupname,
4522-
consumer_name=consumername,
45234522
)
45244523

45254524
return response

redis/observability/attributes.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
DB_SYSTEM = "db.system"
2222
DB_NAMESPACE = "db.namespace"
2323
DB_OPERATION_NAME = "db.operation.name"
24-
DB_OPERATION_BATCH_SIZE = "db.operation.batch.size"
2524
DB_RESPONSE_STATUS_CODE = "db.response.status_code"
2625
DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"
2726

@@ -60,7 +59,6 @@
6059
REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
6160
REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
6261
REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
63-
REDIS_CLIENT_CONSUMER_NAME = "redis.client.consumer_name"
6462
REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
6563
REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"
6664

@@ -131,7 +129,7 @@ def build_base_attributes(
131129
@staticmethod
132130
def build_operation_attributes(
133131
command_name: Optional[str] = None,
134-
batch_size: Optional[int] = None,
132+
batch_size: Optional[int] = None, # noqa
135133
network_peer_address: Optional[str] = None,
136134
network_peer_port: Optional[int] = None,
137135
stored_procedure_name: Optional[str] = None,
@@ -158,9 +156,6 @@ def build_operation_attributes(
158156
if command_name is not None:
159157
attrs[DB_OPERATION_NAME] = command_name.upper()
160158

161-
if batch_size is not None:
162-
attrs[DB_OPERATION_BATCH_SIZE] = batch_size
163-
164159
if network_peer_address is not None:
165160
attrs[NETWORK_PEER_ADDRESS] = network_peer_address
166161

@@ -283,7 +278,7 @@ def build_pubsub_message_attributes(
283278
def build_streaming_attributes(
284279
stream_name: Optional[str] = None,
285280
consumer_group: Optional[str] = None,
286-
consumer_name: Optional[str] = None,
281+
consumer_name: Optional[str] = None, # noqa
287282
) -> Dict[str, Any]:
288283
"""
289284
Build attributes for a streaming operation.
@@ -304,9 +299,6 @@ def build_streaming_attributes(
304299
if consumer_group is not None:
305300
attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group
306301

307-
if consumer_name is not None:
308-
attrs[REDIS_CLIENT_CONSUMER_NAME] = consumer_name
309-
310302
return attrs
311303

312304
@staticmethod

redis/observability/metrics.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
get_pool_name,
2626
)
2727
from redis.observability.config import MetricGroup, OTelConfig
28+
from redis.utils import deprecated_args
2829

2930
logger = logging.getLogger(__name__)
3031

@@ -426,14 +427,19 @@ def record_connection_wait_time(
426427

427428
# Command execution metric recording methods
428429

430+
@deprecated_args(
431+
args_to_warn=["batch_size"],
432+
reason="The batch_size argument is no longer used and will be removed in the next major version.",
433+
version="7.2.1",
434+
)
429435
def record_operation_duration(
430436
self,
431437
command_name: str,
432438
duration_seconds: float,
433439
server_address: Optional[str] = None,
434440
server_port: Optional[int] = None,
435441
db_namespace: Optional[int] = None,
436-
batch_size: Optional[int] = None,
442+
batch_size: Optional[int] = None, # noqa
437443
error_type: Optional[Exception] = None,
438444
network_peer_address: Optional[str] = None,
439445
network_peer_port: Optional[int] = None,
@@ -473,7 +479,6 @@ def record_operation_duration(
473479
attrs.update(
474480
self.attr_builder.build_operation_attributes(
475481
command_name=command_name,
476-
batch_size=batch_size,
477482
network_peer_address=network_peer_address,
478483
network_peer_port=network_peer_port,
479484
retry_attempts=retry_attempts,
@@ -582,12 +587,17 @@ def record_pubsub_message(
582587

583588
# Streaming metric recording methods
584589

590+
@deprecated_args(
591+
args_to_warn=["consumer_name"],
592+
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
593+
version="7.2.1",
594+
)
585595
def record_streaming_lag(
586596
self,
587597
lag_seconds: float,
588598
stream_name: Optional[str] = None,
589599
consumer_group: Optional[str] = None,
590-
consumer_name: Optional[str] = None,
600+
consumer_name: Optional[str] = None, # noqa
591601
) -> None:
592602
"""
593603
Record the lag of a streaming message.
@@ -604,7 +614,6 @@ def record_streaming_lag(
604614
attrs = self.attr_builder.build_streaming_attributes(
605615
stream_name=stream_name,
606616
consumer_group=consumer_group,
607-
consumer_name=consumer_name,
608617
)
609618
self.stream_lag.record(lag_seconds, attributes=attrs)
610619

redis/observability/recorder.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from redis.observability.metrics import CloseReason, RedisMetricsCollector
3333
from redis.observability.providers import get_observability_instance
3434
from redis.observability.registry import get_observables_registry_instance
35-
from redis.utils import str_if_bytes
35+
from redis.utils import deprecated_args, str_if_bytes
3636

3737
if TYPE_CHECKING:
3838
from redis.connection import ConnectionPoolInterface
@@ -46,6 +46,11 @@
4646
CSC_ITEMS_REGISTRY_KEY = "csc_items"
4747

4848

49+
@deprecated_args(
50+
args_to_warn=["batch_size"],
51+
reason="The batch_size argument is no longer used and will be removed in the next major version.",
52+
version="7.2.1",
53+
)
4954
def record_operation_duration(
5055
command_name: str,
5156
duration_seconds: float,
@@ -54,7 +59,7 @@ def record_operation_duration(
5459
db_namespace: Optional[str] = None,
5560
error: Optional[Exception] = None,
5661
is_blocking: Optional[bool] = None,
57-
batch_size: Optional[int] = None,
62+
batch_size: Optional[int] = None, # noqa
5863
retry_attempts: Optional[int] = None,
5964
) -> None:
6065
"""
@@ -100,7 +105,6 @@ def record_operation_duration(
100105
network_peer_address=server_address,
101106
network_peer_port=server_port,
102107
is_blocking=is_blocking,
103-
batch_size=batch_size,
104108
retry_attempts=retry_attempts,
105109
)
106110
except Exception:
@@ -436,11 +440,16 @@ def record_pubsub_message(
436440
pass
437441

438442

443+
@deprecated_args(
444+
args_to_warn=["consumer_name"],
445+
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
446+
version="7.2.1",
447+
)
439448
def record_streaming_lag(
440449
lag_seconds: float,
441450
stream_name: Optional[str] = None,
442451
consumer_group: Optional[str] = None,
443-
consumer_name: Optional[str] = None,
452+
consumer_name: Optional[str] = None, # noqa
444453
) -> None:
445454
"""
446455
Record the lag of a streaming message.
@@ -470,16 +479,20 @@ def record_streaming_lag(
470479
lag_seconds=lag_seconds,
471480
stream_name=effective_stream_name,
472481
consumer_group=consumer_group,
473-
consumer_name=consumer_name,
474482
)
475483
except Exception:
476484
pass
477485

478486

487+
@deprecated_args(
488+
args_to_warn=["consumer_name"],
489+
reason="The consumer_name argument is no longer used and will be removed in the next major version.",
490+
version="7.2.1",
491+
)
479492
def record_streaming_lag_from_response(
480493
response,
481494
consumer_group: Optional[str] = None,
482-
consumer_name: Optional[str] = None,
495+
consumer_name: Optional[str] = None, # noqa
483496
) -> None:
484497
"""
485498
Record streaming lag from XREAD/XREADGROUP response.
@@ -527,7 +540,6 @@ def record_streaming_lag_from_response(
527540
lag_seconds=lag_seconds,
528541
stream_name=effective_stream_name,
529542
consumer_group=consumer_group,
530-
consumer_name=consumer_name,
531543
)
532544
else:
533545
# RESP2 format: list
@@ -546,7 +558,6 @@ def record_streaming_lag_from_response(
546558
lag_seconds=lag_seconds,
547559
stream_name=effective_stream_name,
548560
consumer_group=consumer_group,
549-
consumer_name=consumer_name,
550561
)
551562
except Exception:
552563
pass

tests/test_cluster.py

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4420,32 +4420,6 @@ def test_pipeline_execute_records_metric(self, cluster_pipeline_with_otel):
44204420
attrs = call_args[1]["attributes"]
44214421
assert attrs["db.operation.name"] == "PIPELINE"
44224422

4423-
def test_pipeline_batch_size_recorded(self, cluster_pipeline_with_otel):
4424-
"""
4425-
Test that pipeline batch_size is correctly recorded.
4426-
"""
4427-
cluster, operation_duration_mock = cluster_pipeline_with_otel
4428-
4429-
# Execute a pipeline with 3 commands
4430-
pipe = cluster.pipeline()
4431-
pipe.set("batch_key", "value1")
4432-
pipe.get("batch_key")
4433-
pipe.delete("batch_key")
4434-
pipe.execute()
4435-
4436-
# Find the PIPELINE event call
4437-
pipeline_call = None
4438-
for call_obj in operation_duration_mock.record.call_args_list:
4439-
attrs = call_obj[1]["attributes"]
4440-
if attrs.get("db.operation.name") == "PIPELINE":
4441-
pipeline_call = call_obj
4442-
break
4443-
4444-
assert pipeline_call is not None
4445-
attrs = pipeline_call[1]["attributes"]
4446-
assert "db.operation.batch.size" in attrs
4447-
assert attrs["db.operation.batch.size"] == 3
4448-
44494423
def test_pipeline_server_attributes_recorded(self, cluster_pipeline_with_otel):
44504424
"""
44514425
Test that server address, port, and db namespace are recorded for pipeline.
@@ -4643,41 +4617,6 @@ def test_pipeline_multi_node_records_multiple_metrics(
46434617
assert "server.port" in attrs
46444618
assert "db.namespace" in attrs
46454619

4646-
def test_pipeline_metric_contains_batch_size_per_node(
4647-
self, cluster_pipeline_with_otel
4648-
):
4649-
"""
4650-
Test that pipeline metrics contain correct batch_size for each node.
4651-
"""
4652-
cluster, operation_duration_mock = cluster_pipeline_with_otel
4653-
4654-
# Execute pipeline with commands
4655-
pipe = cluster.pipeline()
4656-
pipe.set("batch_node_key1", "value1")
4657-
pipe.set("batch_node_key2", "value2")
4658-
pipe.set("batch_node_key3", "value3")
4659-
pipe.execute()
4660-
4661-
# Find PIPELINE events
4662-
pipeline_calls = [
4663-
call_obj
4664-
for call_obj in operation_duration_mock.record.call_args_list
4665-
if call_obj[1]["attributes"].get("db.operation.name") == "PIPELINE"
4666-
]
4667-
4668-
# Should have at least one PIPELINE event
4669-
assert len(pipeline_calls) >= 1
4670-
4671-
# Each event should have batch_size
4672-
total_batch_size = 0
4673-
for call_obj in pipeline_calls:
4674-
attrs = call_obj[1]["attributes"]
4675-
assert "db.operation.batch.size" in attrs
4676-
total_batch_size += attrs["db.operation.batch.size"]
4677-
4678-
# Total batch size should equal number of commands
4679-
assert total_batch_size == 3
4680-
46814620
def test_pipeline_duration_recorded_per_node(self, cluster_pipeline_with_otel):
46824621
"""
46834622
Test that pipeline duration is recorded for each node's commands.

tests/test_cluster_transaction.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -532,32 +532,6 @@ def test_transaction_server_attributes_recorded(
532532

533533
assert "db.namespace" in attrs
534534

535-
def test_transaction_batch_size_recorded(self, cluster_transaction_with_otel):
536-
"""
537-
Test that transaction batch_size is correctly recorded.
538-
"""
539-
cluster, operation_duration_mock = cluster_transaction_with_otel
540-
541-
# Execute a transaction with 3 commands
542-
with cluster.pipeline(transaction=True) as tx:
543-
tx.set("{batch}key1", "value1")
544-
tx.get("{batch}key1")
545-
tx.delete("{batch}key1")
546-
tx.execute()
547-
548-
# Find the TRANSACTION event call
549-
transaction_call = None
550-
for call_obj in operation_duration_mock.record.call_args_list:
551-
attrs = call_obj[1]["attributes"]
552-
if attrs.get("db.operation.name") == "TRANSACTION":
553-
transaction_call = call_obj
554-
break
555-
556-
assert transaction_call is not None
557-
attrs = transaction_call[1]["attributes"]
558-
assert "db.operation.batch.size" in attrs
559-
assert attrs["db.operation.batch.size"] == 3
560-
561535
def test_transaction_duration_is_positive(self, cluster_transaction_with_otel):
562536
"""
563537
Test that the recorded duration for transaction is a positive float.

0 commit comments

Comments
 (0)