Skip to content

Commit 1d4d9eb

Browse files
committed
fix(aiokafka): close AIOKafkaProducer
1 parent 7f87154 commit 1d4d9eb

1 file changed

Lines changed: 35 additions & 32 deletions

File tree

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -498,37 +498,40 @@ async def test_send_and_wait(self) -> None:
498498
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
499499

500500
producer = await self.producer_factory()
501-
add_message_mock: mock.AsyncMock = (
502-
producer._message_accumulator.add_message
503-
)
504-
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
505-
506-
tracer = self.tracer_provider.get_tracer(__name__)
507-
with tracer.start_as_current_span("test_span") as span:
508-
await producer.send_and_wait("topic_1", b"value_1")
509-
510-
add_message_mock.assert_awaited_with(
511-
TopicPartition(topic="topic_1", partition=1),
512-
None,
513-
b"value_1",
514-
40.0,
515-
timestamp_ms=None,
516-
headers=[("traceparent", mock.ANY)],
517-
)
518-
assert (
519-
add_message_mock.call_args_list[0]
520-
.kwargs["headers"][0][1]
521-
.startswith(
522-
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
501+
try:
502+
add_message_mock: mock.AsyncMock = (
503+
producer._message_accumulator.add_message
504+
)
505+
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
506+
507+
tracer = self.tracer_provider.get_tracer(__name__)
508+
with tracer.start_as_current_span("test_span") as span:
509+
await producer.send_and_wait("topic_1", b"value_1")
510+
511+
add_message_mock.assert_awaited_with(
512+
TopicPartition(topic="topic_1", partition=1),
513+
None,
514+
b"value_1",
515+
40.0,
516+
timestamp_ms=None,
517+
headers=[("traceparent", mock.ANY)],
518+
)
519+
assert (
520+
add_message_mock.call_args_list[0]
521+
.kwargs["headers"][0][1]
522+
.startswith(
523+
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
524+
)
523525
)
524-
)
525526

526-
await producer.send_and_wait("topic_2", b"value_2")
527-
add_message_mock.assert_awaited_with(
528-
TopicPartition(topic="topic_2", partition=1),
529-
None,
530-
b"value_2",
531-
40.0,
532-
timestamp_ms=None,
533-
headers=[("traceparent", mock.ANY)],
534-
)
527+
await producer.send_and_wait("topic_2", b"value_2")
528+
add_message_mock.assert_awaited_with(
529+
TopicPartition(topic="topic_2", partition=1),
530+
None,
531+
b"value_2",
532+
40.0,
533+
timestamp_ms=None,
534+
headers=[("traceparent", mock.ANY)],
535+
)
536+
finally:
537+
await producer.stop()

0 commit comments

Comments
 (0)