Skip to content

Commit a4419bd

Browse files
committed
fix #4278: proxy all methods in ProxiedProducer/Consumer
Remove Producer/Consumer inheritance from ProxiedProducer and ProxiedConsumer. Add __getattr__ to delegate undefined methods to the wrapped client, fixing segfaults on calls like list_topics() or assignment(). Add regression tests to verify delegation of undefined methods on both proxy classes.
1 parent 822cd77 commit a4419bd

4 files changed

Lines changed: 59 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2828
([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339))
2929
- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls
3030
([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349))
31+
- `opentelemetry-instrumentation-confluent-kafka`: Fix `ProxiedProducer` and `ProxiedConsumer` not delegating methods to the underlying producer/consumer instances
32+
([#4278](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4278))
3133
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone
3234
([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305))
3335
- Don't import module in unwrap if not already imported

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def close(self): # pylint: disable=useless-super-delegation
151151
return super().close()
152152

153153

154-
class ProxiedProducer(Producer):
154+
class ProxiedProducer:
155155
def __init__(self, producer: Producer, tracer: Tracer):
156156
self._producer = producer
157157
self._tracer = tracer
@@ -177,8 +177,11 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
177177
def original_producer(self):
178178
return self._producer
179179

180+
def __getattr__(self, name):
181+
return getattr(self._producer, name)
180182

181-
class ProxiedConsumer(Consumer):
183+
184+
class ProxiedConsumer:
182185
def __init__(self, consumer: Consumer, tracer: Tracer):
183186
self._consumer = consumer
184187
self._tracer = tracer
@@ -224,6 +227,9 @@ def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): # p
224227
def original_consumer(self):
225228
return self._consumer
226229

230+
def __getattr__(self, name):
231+
return getattr(self._consumer, name)
232+
227233

228234
class ConfluentKafkaInstrumentor(BaseInstrumentor):
229235
"""An instrumentor for confluent kafka module

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,3 +447,43 @@ def test_producer_flush(self) -> None:
447447
span_list = self.memory_exporter.get_finished_spans()
448448
self._assert_span_count(span_list, 1)
449449
self._assert_topic(span_list[0], "topic-1")
450+
451+
def test_proxied_producer_delegates_undefined_methods(
452+
self,
453+
) -> None:
454+
"""Regression test for #4278: methods not defined on
455+
ProxiedProducer must delegate to the underlying
456+
producer instead of hitting an uninitialized handle."""
457+
instrumentation = ConfluentKafkaInstrumentor()
458+
message_queue = []
459+
460+
producer = MockedProducer(
461+
message_queue,
462+
{
463+
"bootstrap.servers": "localhost:29092",
464+
},
465+
)
466+
467+
proxied = instrumentation.instrument_producer(producer)
468+
self.assertEqual(proxied.list_topics(), "producer_topics")
469+
470+
def test_proxied_consumer_delegates_undefined_methods(
471+
self,
472+
) -> None:
473+
"""Regression test for #4278: methods not defined on
474+
ProxiedConsumer must delegate to the underlying
475+
consumer instead of hitting an uninitialized handle."""
476+
instrumentation = ConfluentKafkaInstrumentor()
477+
478+
consumer = MockConsumer(
479+
[],
480+
{
481+
"bootstrap.servers": "localhost:29092",
482+
"group.id": "mygroup",
483+
"auto.offset.reset": "earliest",
484+
},
485+
)
486+
487+
proxied = instrumentation.instrument_consumer(consumer)
488+
self.assertEqual(proxied.list_topics(), "consumer_topics")
489+
self.assertEqual(proxied.assignment(), [])

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ def poll(self, timeout=None):
1818
return self._queue.pop(0)
1919
return None
2020

21+
def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use
22+
return "consumer_topics"
23+
24+
def assignment(self): # pylint: disable=no-self-use
25+
return []
26+
2127

2228
class MockedMessage:
2329
def __init__(
@@ -77,3 +83,6 @@ def poll(self, *args, **kwargs):
7783

7884
def flush(self, *args, **kwargs):
7985
return len(self._queue)
86+
87+
def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use
88+
return "producer_topics"

0 commit comments

Comments
 (0)