diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py index 1e0dbf79e1..c32e786b3f 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py @@ -30,10 +30,14 @@ producer = KafkaProducer(bootstrap_servers=['localhost:9092']) producer.send('my-topic', b'raw_bytes') + def process_msg(message): + print(message) + # report a span of type consumer with the default settings consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: - # process message + # process message + process_msg(message) The _instrument() method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider @@ -53,6 +57,7 @@ def consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs) def produce_hook(span, args, kwargs): if span and span.is_recording(): span.set_attribute("custom_user_attribute_from_produce_hook", "some-value") + def consume_hook(span, record, args, kwargs): if span and span.is_recording(): span.set_attribute("custom_user_attribute_from_consume_hook", "some-value") @@ -65,6 +70,14 @@ def consume_hook(span, record, args, kwargs): producer = KafkaProducer(bootstrap_servers=['localhost:9092']) producer.send('my-topic', b'raw_bytes') + def process_msg(message): + print(message) + + consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) + for message in consumer: + # process message + process_msg(message) + API ___ """