Skip to content

Commit 56ce2ad

Browse files
committed
Merge branch 'main' into genai-utils-api-cleanup
2 parents baf3b95 + 5430c75 commit 56ce2ad

7 files changed

Lines changed: 91 additions & 41 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
### Added
1515

16+
- `opentelemetry-instrumentation-asgi`: Respect `suppress_http_instrumentation` context in ASGI middleware to skip server span creation when HTTP instrumentation is suppressed
17+
([#4375](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4375))
1618
- `opentelemetry-instrumentation-confluent-kafka`: Loosen confluent-kafka upper bound to <3.0.0
1719
([#4289](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4289))
1820
- `opentelemetry-instrumentation`: Add support for wrapt 2.x
@@ -40,6 +42,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4042
([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302))
4143
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
4244
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
45+
- `opentelemetry-instrumentation-aiokafka`: fix `Unclosed AIOKafkaProducer` warning and `RuntimeWarning: coroutine was never awaited` in tests
46+
([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384))
47+
- `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling
48+
`_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method
49+
whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)`
50+
([#4379](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4379))
4351

4452
### Breaking changes
4553

instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,17 @@ async def _extract_send_partition(
162162
key = _extract_send_key(args, kwargs)
163163
value = _extract_send_value(args, kwargs)
164164
partition = _extract_argument("partition", 3, None, args, kwargs)
165-
key_bytes, value_bytes = cast(
166-
"tuple[bytes | None, bytes | None]",
167-
instance._serialize(topic, key, value), # type: ignore[reportUnknownMemberType]
165+
key_bytes = cast(
166+
"bytes | None",
167+
instance._key_serializer(key) # type: ignore[reportUnknownMemberType]
168+
if instance._key_serializer # type: ignore[reportUnknownMemberType]
169+
else key,
170+
)
171+
value_bytes = cast(
172+
"bytes | None",
173+
instance._value_serializer(value) # type: ignore[reportUnknownMemberType]
174+
if instance._value_serializer # type: ignore[reportUnknownMemberType]
175+
else value,
168176
)
169177
valid_types = (bytes, bytearray, memoryview, type(None))
170178
if (
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
aiokafka==0.11.0
1+
aiokafka==0.13.0; python_version >= "3.10"
2+
aiokafka==0.12.0; python_version < "3.10"
23
pytest==7.4.4
34
-e opentelemetry-instrumentation
45
-e instrumentation/opentelemetry-instrumentation-aiokafka

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414

1515
from __future__ import annotations
1616

17+
import sys
1718
import uuid
1819
from typing import Any, Sequence, cast
19-
from unittest import IsolatedAsyncioTestCase, TestCase, mock
20+
from unittest import IsolatedAsyncioTestCase, TestCase, mock, skipIf
2021

2122
import aiokafka
2223
from aiokafka import (
@@ -63,6 +64,10 @@ def test_instrument_api(self) -> None:
6364
)
6465

6566

67+
@skipIf(
68+
sys.version_info < (3, 10),
69+
"aiokafka >= 0.13 requires Python 3.10+",
70+
)
6671
class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
6772
@staticmethod
6873
def consumer_record_factory(
@@ -126,7 +131,7 @@ async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
126131

127132
@staticmethod
128133
async def producer_factory() -> AIOKafkaProducer:
129-
producer = AIOKafkaProducer(api_version="1.0")
134+
producer = AIOKafkaProducer()
130135

131136
producer.client._wait_on_metadata = mock.AsyncMock()
132137
producer.client.bootstrap = mock.AsyncMock()
@@ -498,37 +503,43 @@ async def test_send_and_wait(self) -> None:
498503
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
499504

500505
producer = await self.producer_factory()
501-
add_message_mock: mock.AsyncMock = (
502-
producer._message_accumulator.add_message
503-
)
504-
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
505-
506-
tracer = self.tracer_provider.get_tracer(__name__)
507-
with tracer.start_as_current_span("test_span") as span:
508-
await producer.send_and_wait("topic_1", b"value_1")
506+
try:
507+
add_message_mock: mock.AsyncMock = (
508+
producer._message_accumulator.add_message
509+
)
510+
add_message_mock.side_effect = [
511+
mock.AsyncMock()(),
512+
mock.AsyncMock()(),
513+
]
509514

510-
add_message_mock.assert_awaited_with(
511-
TopicPartition(topic="topic_1", partition=1),
512-
None,
513-
b"value_1",
514-
40.0,
515-
timestamp_ms=None,
516-
headers=[("traceparent", mock.ANY)],
517-
)
518-
assert (
519-
add_message_mock.call_args_list[0]
520-
.kwargs["headers"][0][1]
521-
.startswith(
522-
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
515+
tracer = self.tracer_provider.get_tracer(__name__)
516+
with tracer.start_as_current_span("test_span") as span:
517+
await producer.send_and_wait("topic_1", b"value_1")
518+
519+
add_message_mock.assert_awaited_with(
520+
TopicPartition(topic="topic_1", partition=1),
521+
None,
522+
b"value_1",
523+
40.0,
524+
timestamp_ms=None,
525+
headers=[("traceparent", mock.ANY)],
526+
)
527+
assert (
528+
add_message_mock.call_args_list[0]
529+
.kwargs["headers"][0][1]
530+
.startswith(
531+
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
532+
)
523533
)
524-
)
525534

526-
await producer.send_and_wait("topic_2", b"value_2")
527-
add_message_mock.assert_awaited_with(
528-
TopicPartition(topic="topic_2", partition=1),
529-
None,
530-
b"value_2",
531-
40.0,
532-
timestamp_ms=None,
533-
headers=[("traceparent", mock.ANY)],
534-
)
535+
await producer.send_and_wait("topic_2", b"value_2")
536+
add_message_mock.assert_awaited_with(
537+
TopicPartition(topic="topic_2", partition=1),
538+
None,
539+
b"value_2",
540+
40.0,
541+
timestamp_ms=None,
542+
headers=[("traceparent", mock.ANY)],
543+
)
544+
finally:
545+
await producer.stop()

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ async def test_wrap_getmany(
256256
extract_bootstrap_servers: mock.MagicMock,
257257
_enrich_getmany_poll_span: mock.MagicMock,
258258
_enrich_getmany_topic_span: mock.MagicMock,
259-
_create_consumer_span: mock.MagicMock,
259+
_create_consumer_span: mock.AsyncMock,
260260
extract: mock.MagicMock,
261261
) -> None:
262262
tracer = mock.MagicMock()
@@ -270,6 +270,7 @@ async def test_wrap_getmany(
270270
}
271271
)
272272
kafka_consumer = mock.MagicMock()
273+
_create_consumer_span.return_value = mock.MagicMock()
273274

274275
wrapped_getmany = _wrap_getmany(tracer, consume_hook)
275276
records = await wrapped_getmany(
@@ -370,7 +371,8 @@ async def test_create_consumer_span(
370371

371372
async def test_kafka_properties_extractor(self):
372373
aiokafka_instance_mock = mock.Mock()
373-
aiokafka_instance_mock._serialize.return_value = None, None
374+
aiokafka_instance_mock._key_serializer = None
375+
aiokafka_instance_mock._value_serializer = None
374376
aiokafka_instance_mock._partition.return_value = "partition"
375377
aiokafka_instance_mock.client._wait_on_metadata = mock.AsyncMock()
376378
assert (

instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,10 @@ def client_response_hook(span: Span, scope: Scope, message: dict[str, Any]):
255255
from opentelemetry.instrumentation.propagators import (
256256
get_global_response_propagator,
257257
)
258-
from opentelemetry.instrumentation.utils import _start_internal_or_server_span
258+
from opentelemetry.instrumentation.utils import (
259+
_start_internal_or_server_span,
260+
is_http_instrumentation_enabled,
261+
)
259262
from opentelemetry.metrics import get_meter
260263
from opentelemetry.propagators.textmap import Getter, Setter
261264
from opentelemetry.semconv._incubating.attributes.http_attributes import (
@@ -745,7 +748,10 @@ async def __call__(
745748
send: An awaitable callable taking a single dictionary as argument.
746749
"""
747750
start = default_timer()
748-
if scope["type"] not in ("http", "websocket"):
751+
if not is_http_instrumentation_enabled() or scope["type"] not in (
752+
"http",
753+
"websocket",
754+
):
749755
return await self.app(scope, receive, send)
750756

751757
_, _, url = get_host_port_url_tuple(scope)

instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
get_global_response_propagator,
3838
set_global_response_propagator,
3939
)
40+
from opentelemetry.instrumentation.utils import suppress_http_instrumentation
4041
from opentelemetry.sdk import resources
4142
from opentelemetry.sdk.metrics.export import (
4243
HistogramDataPoint,
@@ -1880,6 +1881,19 @@ async def test_no_excluded_urls(self):
18801881
spans = self.get_finished_spans()
18811882
self.assertGreater(len(spans), 0)
18821883

1884+
async def test_suppress_http_instrumentation(self):
1885+
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
1886+
1887+
async def suppression_wrapper(scope, receive, send):
1888+
with suppress_http_instrumentation():
1889+
await app(scope, receive, send)
1890+
1891+
self.seed_app(suppression_wrapper)
1892+
await self.send_default_request()
1893+
await self.get_all_output()
1894+
spans = self.get_finished_spans()
1895+
self.assertEqual(len(spans), 0)
1896+
18831897

18841898
class TestAsgiAttributes(unittest.TestCase):
18851899
def setUp(self):

0 commit comments

Comments
 (0)