Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('my-topic', b'raw_bytes')

def process_msg(message):
print(message)

# report a span of type consumer with the default settings
consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
for message in consumer:
# process message
# process message
process_msg(message)

The _instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
Expand All @@ -53,6 +57,7 @@ def consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
def produce_hook(span, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_produce_hook", "some-value")

def consume_hook(span, record, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
Expand All @@ -65,6 +70,14 @@ def consume_hook(span, record, args, kwargs):
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('my-topic', b'raw_bytes')

def process_msg(message):
print(message)

consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
for message in consumer:
# process message
process_msg(message)

API
___
"""
Expand Down