Skip to content

Commit 523f6e6

Browse files
sightseekerxrmx
andcommitted
opentelemetry-instrumentation-confluent-kafka: Replace SpanAttributes with semconv constants (open-telemetry#4057)
* refactor: Replace SpanAttributes with semconv constants * update CHANGELOG.md * fix CHANGELOG.md * Update utils.py * Update __init__.py * Update utils.py * Update __init__.py --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 3fbaaa7 commit 523f6e6

4 files changed

Lines changed: 41 additions & 27 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4444
([#4058](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4058))
4545
- `opentelemetry-instrumentation-django`: Replace SpanAttributes with semconv constants where applicable
4646
([#4059](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4059))
47+
- `opentelemetry-instrumentation-confluent-kafka`: Replace SpanAttributes with semconv constants where applicable
48+
([#4057](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4057))
4749

4850
## Version 1.39.0/0.60b0 (2025-12-03)
4951

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
110110
from opentelemetry import context, propagate, trace
111111
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
112112
from opentelemetry.instrumentation.utils import unwrap
113-
from opentelemetry.semconv.trace import MessagingOperationValues
113+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
114+
MessagingOperationTypeValues,
115+
)
114116
from opentelemetry.trace import Tracer
115117

116118
from .package import _instruments
@@ -363,7 +365,7 @@ def wrap_produce(func, instance, tracer, args, kwargs):
363365
_enrich_span(
364366
span,
365367
topic,
366-
operation=MessagingOperationValues.RECEIVE,
368+
operation=MessagingOperationTypeValues.RECEIVE,
367369
) # Replace
368370
propagate.inject(
369371
headers,
@@ -387,7 +389,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
387389
record.topic(),
388390
record.partition(),
389391
record.offset(),
390-
operation=MessagingOperationValues.PROCESS,
392+
operation=MessagingOperationTypeValues.PROCESS,
391393
)
392394
instance._current_context_token = context.attach(
393395
trace.set_span_in_context(instance._current_consume_span)
@@ -409,7 +411,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
409411
_enrich_span(
410412
instance._current_consume_span,
411413
records[0].topic(),
412-
operation=MessagingOperationValues.PROCESS,
414+
operation=MessagingOperationTypeValues.PROCESS,
413415
)
414416

415417
instance._current_context_token = context.attach(

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33

44
from opentelemetry import context, propagate
55
from opentelemetry.propagators import textmap
6+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
7+
MESSAGING_MESSAGE_ID,
8+
MESSAGING_OPERATION,
9+
MESSAGING_SYSTEM,
10+
MessagingOperationTypeValues,
11+
)
612
from opentelemetry.semconv.trace import (
713
MessagingDestinationKindValues,
8-
MessagingOperationValues,
914
SpanAttributes,
1015
)
1116
from opentelemetry.trace import Link, SpanKind
@@ -115,12 +120,12 @@ def _enrich_span(
115120
topic,
116121
partition: Optional[int] = None,
117122
offset: Optional[int] = None,
118-
operation: Optional[MessagingOperationValues] = None,
123+
operation: Optional[MessagingOperationTypeValues] = None,
119124
):
120125
if not span.is_recording():
121126
return
122127

123-
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
128+
span.set_attribute(MESSAGING_SYSTEM, "kafka")
124129
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
125130

126131
if partition is not None:
@@ -132,15 +137,15 @@ def _enrich_span(
132137
)
133138

134139
if operation:
135-
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value)
140+
span.set_attribute(MESSAGING_OPERATION, operation.value)
136141
else:
137142
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
138143

139144
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
140145
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
141146
if partition is not None and offset is not None and topic:
142147
span.set_attribute(
143-
SpanAttributes.MESSAGING_MESSAGE_ID,
148+
MESSAGING_MESSAGE_ID,
144149
f"{topic}.{partition}.{offset}",
145150
)
146151

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
KafkaContextGetter,
2626
KafkaContextSetter,
2727
)
28+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
29+
MESSAGING_MESSAGE_ID,
30+
MESSAGING_OPERATION,
31+
MESSAGING_SYSTEM,
32+
)
2833
from opentelemetry.semconv.trace import (
2934
MessagingDestinationKindValues,
3035
SpanAttributes,
@@ -122,36 +127,36 @@ def test_poll(self) -> None:
122127
{
123128
"name": "topic-10 process",
124129
"attributes": {
125-
SpanAttributes.MESSAGING_OPERATION: "process",
130+
MESSAGING_OPERATION: "process",
126131
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
127-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
132+
MESSAGING_SYSTEM: "kafka",
128133
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
129134
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
130-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
135+
MESSAGING_MESSAGE_ID: "topic-10.0.0",
131136
},
132137
},
133138
{"name": "recv", "attributes": {}},
134139
{
135140
"name": "topic-20 process",
136141
"attributes": {
137-
SpanAttributes.MESSAGING_OPERATION: "process",
142+
MESSAGING_OPERATION: "process",
138143
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
139-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
144+
MESSAGING_SYSTEM: "kafka",
140145
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
141146
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
142-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
147+
MESSAGING_MESSAGE_ID: "topic-20.2.4",
143148
},
144149
},
145150
{"name": "recv", "attributes": {}},
146151
{
147152
"name": "topic-30 process",
148153
"attributes": {
149-
SpanAttributes.MESSAGING_OPERATION: "process",
154+
MESSAGING_OPERATION: "process",
150155
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
151-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
156+
MESSAGING_SYSTEM: "kafka",
152157
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
153158
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
154-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
159+
MESSAGING_MESSAGE_ID: "topic-30.1.3",
155160
},
156161
},
157162
{"name": "recv", "attributes": {}},
@@ -190,8 +195,8 @@ def test_consume(self) -> None:
190195
{
191196
"name": "topic-1 process",
192197
"attributes": {
193-
SpanAttributes.MESSAGING_OPERATION: "process",
194-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
198+
MESSAGING_OPERATION: "process",
199+
MESSAGING_SYSTEM: "kafka",
195200
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
196201
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
197202
},
@@ -200,8 +205,8 @@ def test_consume(self) -> None:
200205
{
201206
"name": "topic-2 process",
202207
"attributes": {
203-
SpanAttributes.MESSAGING_OPERATION: "process",
204-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
208+
MESSAGING_OPERATION: "process",
209+
MESSAGING_SYSTEM: "kafka",
205210
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
206211
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
207212
},
@@ -210,8 +215,8 @@ def test_consume(self) -> None:
210215
{
211216
"name": "topic-3 process",
212217
"attributes": {
213-
SpanAttributes.MESSAGING_OPERATION: "process",
214-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
218+
MESSAGING_OPERATION: "process",
219+
MESSAGING_SYSTEM: "kafka",
215220
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
216221
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
217222
},
@@ -247,12 +252,12 @@ def test_close(self) -> None:
247252
{
248253
"name": "topic-a process",
249254
"attributes": {
250-
SpanAttributes.MESSAGING_OPERATION: "process",
255+
MESSAGING_OPERATION: "process",
251256
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
252-
SpanAttributes.MESSAGING_SYSTEM: "kafka",
257+
MESSAGING_SYSTEM: "kafka",
253258
SpanAttributes.MESSAGING_DESTINATION: "topic-a",
254259
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
255-
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0",
260+
MESSAGING_MESSAGE_ID: "topic-a.0.0",
256261
},
257262
},
258263
]

0 commit comments

Comments
 (0)