Skip to content

Commit 4d266a0

Browse files
dorlibemdnetoxrmx
authored
Fix/aiokafka tests fixes (#4384)
* fix(aiokafka): close AIOKafkaProducer * fix(aiokafka): use AsyncMock to fix unawaited coroutine warning * docs(aiokafka): add CHANGELOG entry for test warning fixes * fix formatting * Apply suggestion from @xrmx --------- Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 58b554e commit 4d266a0

3 files changed

Lines changed: 41 additions & 32 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4040
([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302))
4141
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
4242
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
43+
- `opentelemetry-instrumentation-aiokafka`: fix `Unclosed AIOKafkaProducer` warning and `RuntimeWarning: coroutine was never awaited` in tests
44+
([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384))
4345
- `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling
4446
`_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method
4547
whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)`

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -503,37 +503,43 @@ async def test_send_and_wait(self) -> None:
503503
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
504504

505505
producer = await self.producer_factory()
506-
add_message_mock: mock.AsyncMock = (
507-
producer._message_accumulator.add_message
508-
)
509-
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
510-
511-
tracer = self.tracer_provider.get_tracer(__name__)
512-
with tracer.start_as_current_span("test_span") as span:
513-
await producer.send_and_wait("topic_1", b"value_1")
506+
try:
507+
add_message_mock: mock.AsyncMock = (
508+
producer._message_accumulator.add_message
509+
)
510+
add_message_mock.side_effect = [
511+
mock.AsyncMock()(),
512+
mock.AsyncMock()(),
513+
]
514514

515-
add_message_mock.assert_awaited_with(
516-
TopicPartition(topic="topic_1", partition=1),
517-
None,
518-
b"value_1",
519-
40.0,
520-
timestamp_ms=None,
521-
headers=[("traceparent", mock.ANY)],
522-
)
523-
assert (
524-
add_message_mock.call_args_list[0]
525-
.kwargs["headers"][0][1]
526-
.startswith(
527-
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
515+
tracer = self.tracer_provider.get_tracer(__name__)
516+
with tracer.start_as_current_span("test_span") as span:
517+
await producer.send_and_wait("topic_1", b"value_1")
518+
519+
add_message_mock.assert_awaited_with(
520+
TopicPartition(topic="topic_1", partition=1),
521+
None,
522+
b"value_1",
523+
40.0,
524+
timestamp_ms=None,
525+
headers=[("traceparent", mock.ANY)],
526+
)
527+
assert (
528+
add_message_mock.call_args_list[0]
529+
.kwargs["headers"][0][1]
530+
.startswith(
531+
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
532+
)
528533
)
529-
)
530534

531-
await producer.send_and_wait("topic_2", b"value_2")
532-
add_message_mock.assert_awaited_with(
533-
TopicPartition(topic="topic_2", partition=1),
534-
None,
535-
b"value_2",
536-
40.0,
537-
timestamp_ms=None,
538-
headers=[("traceparent", mock.ANY)],
539-
)
535+
await producer.send_and_wait("topic_2", b"value_2")
536+
add_message_mock.assert_awaited_with(
537+
TopicPartition(topic="topic_2", partition=1),
538+
None,
539+
b"value_2",
540+
40.0,
541+
timestamp_ms=None,
542+
headers=[("traceparent", mock.ANY)],
543+
)
544+
finally:
545+
await producer.stop()

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ async def test_wrap_getmany(
256256
extract_bootstrap_servers: mock.MagicMock,
257257
_enrich_getmany_poll_span: mock.MagicMock,
258258
_enrich_getmany_topic_span: mock.MagicMock,
259-
_create_consumer_span: mock.MagicMock,
259+
_create_consumer_span: mock.AsyncMock,
260260
extract: mock.MagicMock,
261261
) -> None:
262262
tracer = mock.MagicMock()
@@ -270,6 +270,7 @@ async def test_wrap_getmany(
270270
}
271271
)
272272
kafka_consumer = mock.MagicMock()
273+
_create_consumer_span.return_value = mock.MagicMock()
273274

274275
wrapped_getmany = _wrap_getmany(tracer, consume_hook)
275276
records = await wrapped_getmany(

0 commit comments

Comments
 (0)