Skip to content

Commit 58b554e

Browse files
dorlibemdnetoxrmx
authored
Fix/aiokafka 0.13 compatibility (#4379)
* tests(aiokafka): bump aiokafka 0.13 * tests(aiokafka): remove removed api_version argument from AIOKafkaProducer * fix(aiokafka): call serializers directly instead of _serialize * chore: add CHANGELOG entry for aiokafka 0.13 compatibility fix * tests(aiokafka): align tests to support python 3.9 * Apply suggestion from @xrmx --------- Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 048b4ad commit 58b554e

5 files changed

Lines changed: 26 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4040
([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302))
4141
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
4242
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
43+
- `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling
44+
`_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method
45+
whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)`
46+
([#4379](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4379))
4347

4448
### Breaking changes
4549

instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,17 @@ async def _extract_send_partition(
162162
key = _extract_send_key(args, kwargs)
163163
value = _extract_send_value(args, kwargs)
164164
partition = _extract_argument("partition", 3, None, args, kwargs)
165-
key_bytes, value_bytes = cast(
166-
"tuple[bytes | None, bytes | None]",
167-
instance._serialize(topic, key, value), # type: ignore[reportUnknownMemberType]
165+
key_bytes = cast(
166+
"bytes | None",
167+
instance._key_serializer(key) # type: ignore[reportUnknownMemberType]
168+
if instance._key_serializer # type: ignore[reportUnknownMemberType]
169+
else key,
170+
)
171+
value_bytes = cast(
172+
"bytes | None",
173+
instance._value_serializer(value) # type: ignore[reportUnknownMemberType]
174+
if instance._value_serializer # type: ignore[reportUnknownMemberType]
175+
else value,
168176
)
169177
valid_types = (bytes, bytearray, memoryview, type(None))
170178
if (
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
aiokafka==0.11.0
1+
aiokafka==0.13.0; python_version >= "3.10"
2+
aiokafka==0.12.0; python_version < "3.10"
23
pytest==7.4.4
34
-e opentelemetry-instrumentation
45
-e instrumentation/opentelemetry-instrumentation-aiokafka

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414

1515
from __future__ import annotations
1616

17+
import sys
1718
import uuid
1819
from typing import Any, Sequence, cast
19-
from unittest import IsolatedAsyncioTestCase, TestCase, mock
20+
from unittest import IsolatedAsyncioTestCase, TestCase, mock, skipIf
2021

2122
import aiokafka
2223
from aiokafka import (
@@ -63,6 +64,10 @@ def test_instrument_api(self) -> None:
6364
)
6465

6566

67+
@skipIf(
68+
sys.version_info < (3, 10),
69+
"aiokafka >= 0.13 requires Python 3.10+",
70+
)
6671
class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
6772
@staticmethod
6873
def consumer_record_factory(
@@ -126,7 +131,7 @@ async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
126131

127132
@staticmethod
128133
async def producer_factory() -> AIOKafkaProducer:
129-
producer = AIOKafkaProducer(api_version="1.0")
134+
producer = AIOKafkaProducer()
130135

131136
producer.client._wait_on_metadata = mock.AsyncMock()
132137
producer.client.bootstrap = mock.AsyncMock()

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ async def test_create_consumer_span(
370370

371371
async def test_kafka_properties_extractor(self):
372372
aiokafka_instance_mock = mock.Mock()
373-
aiokafka_instance_mock._serialize.return_value = None, None
373+
aiokafka_instance_mock._key_serializer = None
374+
aiokafka_instance_mock._value_serializer = None
374375
aiokafka_instance_mock._partition.return_value = "partition"
375376
aiokafka_instance_mock.client._wait_on_metadata = mock.AsyncMock()
376377
assert (

0 commit comments

Comments
 (0)