Skip to content

Commit 640e4d2

Browse files
authored
Merge pull request #1327 from newrelic/add-kombu-support
Add support for kombu
2 parents 5908463 + 1789b98 commit 640e4d2

9 files changed

Lines changed: 717 additions & 2 deletions

File tree

newrelic/api/transaction.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@
6666
_logger = logging.getLogger(__name__)
6767

6868
DISTRIBUTED_TRACE_KEYS_REQUIRED = ("ty", "ac", "ap", "tr", "ti")
69-
DISTRIBUTED_TRACE_TRANSPORT_TYPES = set(("HTTP", "HTTPS", "Kafka", "JMS", "IronMQ", "AMQP", "Queue", "Other"))
69+
DISTRIBUTED_TRACE_TRANSPORT_TYPES = set(
70+
("HTTP", "HTTPS", "Kafka", "JMS", "IronMQ", "AMQP", "Queue", "SQS", "REDIS", "ZooKeeper", "Other")
71+
)
7072
DELIMITER_FORMAT_RE = re.compile("[ \t]*,[ \t]*")
7173
ACCEPTED_DISTRIBUTED_TRACE = 1
7274
CREATED_DISTRIBUTED_TRACE = 2

newrelic/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2846,7 +2846,10 @@ def _process_module_builtin_defaults():
28462846
_process_module_definition(
28472847
"kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat"
28482848
)
2849-
2849+
_process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
2850+
_process_module_definition(
2851+
"kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
2852+
)
28502853
_process_module_definition("logging", "newrelic.hooks.logger_logging", "instrument_logging")
28512854

28522855
_process_module_definition("loguru", "newrelic.hooks.logger_loguru", "instrument_loguru")

newrelic/core/attribute.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@
7575
"host.displayName",
7676
"http.statusCode",
7777
"http.url",
78+
"kafka.consume.channel_id",
79+
"kafka.consume.byteCount",
80+
"kombu.consume.channel_id",
81+
"kombu.consume.byteCount",
7882
"llm",
7983
"message.queueName",
8084
"message.routingKey",

newrelic/core/config.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ def emit(self, record):
7171
_logger.addHandler(_NullHandler())
7272

7373

74+
def parse_space_separated_into_list(string):
75+
return string.split()
76+
77+
7478
def _map_aws_account_id(s, logger):
7579
# The AWS account id must be a 12 digit number.
7680
# See https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-identifiers.html#awsaccountid.
@@ -425,6 +429,18 @@ class InstrumentationGraphQLSettings(Settings):
425429
pass
426430

427431

432+
class InstrumentationKombuSettings(Settings):
433+
pass
434+
435+
436+
class InstrumentationKombuIgnoredExchangesSettings(Settings):
437+
pass
438+
439+
440+
class InstrumentationKombuConsumerSettings(Settings):
441+
enabled = False
442+
443+
428444
class EventHarvestConfigSettings(Settings):
429445
nested = True
430446
_lock = threading.Lock()
@@ -488,6 +504,9 @@ class EventHarvestConfigHarvestLimitSettings(Settings):
488504
_settings.infinite_tracing = InfiniteTracingSettings()
489505
_settings.instrumentation = InstrumentationSettings()
490506
_settings.instrumentation.graphql = InstrumentationGraphQLSettings()
507+
_settings.instrumentation.kombu = InstrumentationKombuSettings()
508+
_settings.instrumentation.kombu.ignored_exchanges = InstrumentationKombuIgnoredExchangesSettings()
509+
_settings.instrumentation.kombu.consumer = InstrumentationKombuConsumerSettings()
491510
_settings.message_tracer = MessageTracerSettings()
492511
_settings.process_host = ProcessHostSettings()
493512
_settings.rum = RumSettings()
@@ -877,6 +896,14 @@ def default_otlp_host(host):
877896
"NEW_RELIC_INSTRUMENTATION_GRAPHQL_CAPTURE_INTROSPECTION_QUERIES", False
878897
)
879898

899+
# celeryev is the monitoring queue for rabbitmq which we do not need to monitor-it just makes a lot of noise.
900+
_settings.instrumentation.kombu.ignored_exchanges = parse_space_separated_into_list(
901+
os.environ.get("NEW_RELIC_INSTRUMENTATION_KOMBU_IGNORED_EXCHANGES", "celeryev")
902+
)
903+
_settings.instrumentation.kombu.consumer.enabled = _environ_as_bool(
904+
"NEW_RELIC_INSTRUMENTATION_KOMBU_CONSUMER_ENABLED", default=False
905+
)
906+
880907
_settings.event_harvest_config.harvest_limits.analytic_event_data = _environ_as_int(
881908
"NEW_RELIC_ANALYTICS_EVENTS_MAX_SAMPLES_STORED", DEFAULT_RESERVOIR_SIZE
882909
)
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import logging
15+
import sys
16+
17+
from newrelic.api.application import application_instance, application_settings
18+
from newrelic.api.function_trace import FunctionTrace
19+
from newrelic.api.message_trace import MessageTrace
20+
from newrelic.api.message_transaction import MessageTransaction
21+
from newrelic.api.time_trace import current_trace, notice_error
22+
from newrelic.api.transaction import current_transaction
23+
from newrelic.common.object_wrapper import ObjectProxy, function_wrapper, wrap_function_wrapper
24+
from newrelic.common.package_version_utils import get_package_version
25+
from newrelic.common.signature import bind_args
26+
27+
_logger = logging.getLogger(__name__)
28+
29+
"""
30+
The following are unsupported transport types since the libraries are too old:
31+
* librabbitmq
32+
* qpid
33+
* amqp uses librabbitmq or py-amqp
34+
"""
35+
AVAILABLE_TRANSPORTS = {
36+
"py-amqp": "AMQP",
37+
"sqs": "SQS",
38+
"redis": "REDIS",
39+
"zookeeper": "ZooKeeper",
40+
"confluentkafka": "Kafka",
41+
}
42+
43+
44+
def wrap_Producer_publish(wrapped, instance, args, kwargs):
45+
transaction = current_transaction()
46+
47+
if transaction is None:
48+
return wrapped(*args, **kwargs)
49+
50+
bound_args = bind_args(wrapped, args, kwargs)
51+
headers = bound_args["headers"]
52+
headers = headers if headers else {}
53+
value = bound_args["body"]
54+
key = bound_args["routing_key"]
55+
exchange = getattr(bound_args["exchange"], "name", None) or "Default"
56+
57+
transaction.add_messagebroker_info("Kombu", get_package_version("kombu"))
58+
59+
with MessageTrace(
60+
library="Kombu",
61+
operation="Produce",
62+
destination_type="Exchange",
63+
destination_name=exchange,
64+
source=wrapped,
65+
terminal=False,
66+
):
67+
dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)}
68+
if headers:
69+
dt_headers.update(headers)
70+
71+
try:
72+
bound_args["headers"] = dt_headers
73+
return wrapped(**bound_args)
74+
except Exception:
75+
notice_error()
76+
raise
77+
78+
79+
def wrap_consumer_recieve_callback(wrapped, instance, args, kwargs):
80+
# In cases where Kombu is being used to talk to the queue via Celery (aka Celery
81+
# is the toplevel api) a transaction will be created for Kombu and a separate
82+
# transaction will be created for Celery. If instrumentation.kombu.consumer.enabled
83+
# is disabled, do not create the duplicate Kombu transaction.
84+
settings = application_settings() or global_settings()
85+
if not settings.instrumentation.kombu.consumer.enabled:
86+
return wrapped(*args, **kwargs)
87+
88+
# This will be the transaction if any that is created by this wrapper.
89+
created_transaction = None
90+
91+
bound_args = bind_args(wrapped, args, kwargs)
92+
message = bound_args["message"]
93+
if message:
94+
# In Kombu there is not iterator, instead there is a callback that
95+
# is called inside wrapped.
96+
# This callback can be called either outside of a transaction, or
97+
# within the context of an existing transaction. There are 3
98+
# possibilities we need to handle: (Note that this is similar to
99+
# our Pika and Celery instrumentation)
100+
#
101+
# 1. In an inactive transaction
102+
#
103+
# If the end_of_transaction() or ignore_transaction() API
104+
# calls have been invoked, this iterator may be called in the
105+
# context of an inactive transaction. In this case, don't wrap
106+
# the callback in any way.
107+
#
108+
# 2. In an active transaction
109+
#
110+
# Do nothing.
111+
#
112+
# 3. Outside of a transaction
113+
#
114+
# Since it's not running inside of an existing transaction, we
115+
# want to create a new background transaction for it.
116+
body = getattr(message, "body", None)
117+
key = getattr(message, "delivery_info", {}).get("routing_key")
118+
library = "Kombu"
119+
destination_type = "Exchange"
120+
destination_name = getattr(message, "delivery_info", {}).get("exchange") or "Default"
121+
received_bytes = len(str(body).encode("utf-8"))
122+
message_count = 1
123+
transaction = current_transaction(active_only=False)
124+
if not transaction and destination_name not in settings.instrumentation.kombu.ignored_exchanges:
125+
# Try to get the transport type. The default for kombu is py-amqp.
126+
# If not in the known transport type list, fallback to "Other".
127+
try:
128+
transport_name = getattr(
129+
getattr(getattr(instance, "connection", None), "transport", None), "driver_name", "py-amqp"
130+
)
131+
transport_type = AVAILABLE_TRANSPORTS.get(transport_name.lower(), "Other")
132+
except Exception:
133+
_logger.debug("Failed to determine transport type.", exc_info=True)
134+
transport_type = "Other"
135+
created_transaction = MessageTransaction(
136+
application=application_instance(),
137+
library=library,
138+
destination_type=destination_type,
139+
destination_name=destination_name,
140+
headers=dict(getattr(message, "headers", {})),
141+
transport_type=transport_type,
142+
routing_key=key,
143+
source=wrapped,
144+
)
145+
created_transaction.__enter__() # pylint: disable=C2801
146+
created_transaction.destination_name = destination_name
147+
148+
# Obtain consumer client_id to send up as agent attribute
149+
if hasattr(message, "channel") and hasattr(message.channel, "channel_id"):
150+
channel_id = message.channel.channel_id
151+
created_transaction._add_agent_attribute("kombu.consume.channel_id", channel_id)
152+
if received_bytes:
153+
created_transaction._add_agent_attribute("kombu.consume.byteCount", received_bytes)
154+
155+
transaction = current_transaction()
156+
if transaction: # If there is an active transaction now.
157+
# Add metrics whether or not a transaction was already active, or one was just started.
158+
# Don't add metrics if there was an inactive transaction.
159+
# Name the metrics using the same format as the transaction, but in case the active transaction
160+
# was an existing one and not a message transaction, reproduce the naming logic here.
161+
group = f"Message/{library}/{destination_type}"
162+
name = f"Named/{destination_name}"
163+
if received_bytes:
164+
transaction.record_custom_metric(f"{group}/{name}/Received/Bytes", received_bytes)
165+
if message_count:
166+
transaction.record_custom_metric(f"{group}/{name}/Received/Messages", message_count)
167+
transaction.add_messagebroker_info("Kombu", get_package_version("kombu"))
168+
169+
try:
170+
return_val = wrapped(*args, **kwargs)
171+
except Exception:
172+
if created_transaction:
173+
created_transaction.__exit__(*sys.exc_info())
174+
elif current_transaction():
175+
# Report error on existing transaction if there is one
176+
notice_error()
177+
else:
178+
# Report error on application
179+
notice_error(application=application_instance(activate=False))
180+
raise
181+
182+
if created_transaction and not created_transaction.stopped:
183+
created_transaction.__exit__(*sys.exc_info())
184+
185+
return return_val
186+
187+
188+
def wrap_serialize(wrapped, instance, args, kwargs):
189+
transaction = current_transaction()
190+
if not transaction:
191+
return wrapped(*args, **kwargs)
192+
193+
exchange = "Unknown"
194+
if isinstance(transaction, MessageTransaction):
195+
exchange = transaction.destination_name
196+
else:
197+
# Find parent message trace to retrieve topic
198+
message_trace = current_trace()
199+
while message_trace is not None and not isinstance(message_trace, MessageTrace):
200+
message_trace = message_trace.parent
201+
if message_trace:
202+
exchange = message_trace.destination_name
203+
204+
group = f"MessageBroker/Kombu/Exchange"
205+
name = f"Named/{exchange}/Serialization/Value"
206+
207+
with FunctionTrace(name=name, group=group) as ft:
208+
return wrapped(*args, **kwargs)
209+
210+
211+
def instrument_kombu_messaging(module):
212+
if hasattr(module, "Producer"):
213+
wrap_function_wrapper(module, "Producer.publish", wrap_Producer_publish)
214+
if hasattr(module, "Consumer"):
215+
wrap_function_wrapper(module, "Consumer._receive_callback", wrap_consumer_recieve_callback)
216+
# This is a little unorthodox but because Kombu creates an object on import we
217+
# have to instrument it where it's used/imported as opposed to where the class is
218+
# defined.
219+
if hasattr(module, "dumps"):
220+
wrap_function_wrapper(module, "dumps", wrap_serialize)
221+
222+
223+
def instrument_kombu_serializaion(module):
224+
# This is a little unorthodox but because Kombu creates an object on import we
225+
# have to instrument it where it's used/imported as opposed to where the class is
226+
# defined.
227+
if hasattr(module, "loads"):
228+
wrap_function_wrapper(module, "loads", wrap_serialize)

0 commit comments

Comments
 (0)