|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | """ |
16 | | -The integration with MongoDB supports the `pymongo`_ library and is specified |
17 | | -to ``trace_integration`` using ``'pymongo'``. |
| 16 | +The integration with MongoDB supports the `pymongo`_ library, it can be |
| 17 | +enabled using the ``PymongoInstrumentor``. |
18 | 18 |
|
19 | 19 | .. _pymongo: https://pypi.org/project/pymongo |
20 | 20 |
|
|
26 | 26 | from pymongo import MongoClient |
27 | 27 | from opentelemetry import trace |
28 | 28 | from opentelemetry.trace import TracerProvider |
29 | | - from opentelemetry.trace.ext.pymongo import trace_integration |
| 29 | + from opentelemetry.trace.ext.pymongo import PymongoInstrumentor |
30 | 30 |
|
31 | 31 | trace.set_tracer_provider(TracerProvider()) |
32 | 32 |
|
33 | | - trace_integration() |
| 33 | + PymongoInstrumentor().instrument() |
34 | 34 | client = MongoClient() |
35 | 35 | db = client["MongoDB_Database"] |
36 | 36 | collection = db["MongoDB_Collection"] |
|
42 | 42 |
|
43 | 43 | from pymongo import monitoring |
44 | 44 |
|
| 45 | +from opentelemetry import trace |
| 46 | +from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor |
45 | 47 | from opentelemetry.ext.pymongo.version import __version__ |
46 | 48 | from opentelemetry.trace import SpanKind, get_tracer |
47 | 49 | from opentelemetry.trace.status import Status, StatusCanonicalCode |
|
50 | 52 | COMMAND_ATTRIBUTES = ["filter", "sort", "skip", "limit", "pipeline"] |
51 | 53 |
|
52 | 54 |
|
53 | | -def trace_integration(tracer_provider=None): |
54 | | - """Integrate with pymongo to trace it using event listener. |
55 | | - https://api.mongodb.com/python/current/api/pymongo/monitoring.html |
56 | | -
|
57 | | - Args: |
58 | | - tracer_provider: The `TracerProvider` to use. If none is passed the |
59 | | - current configured one is used. |
60 | | - """ |
61 | | - |
62 | | - tracer = get_tracer(__name__, __version__, tracer_provider) |
63 | | - |
64 | | - monitoring.register(CommandTracer(tracer)) |
65 | | - |
66 | | - |
67 | 55 | class CommandTracer(monitoring.CommandListener): |
68 | 56 | def __init__(self, tracer): |
69 | 57 | self._tracer = tracer |
70 | 58 | self._span_dict = {} |
| 59 | + self.is_enabled = True |
71 | 60 |
|
72 | 61 | def started(self, event: monitoring.CommandStartedEvent): |
73 | 62 | """ Method to handle a pymongo CommandStartedEvent """ |
| 63 | + if not self.is_enabled: |
| 64 | + return |
74 | 65 | command = event.command.get(event.command_name, "") |
75 | 66 | name = DATABASE_TYPE + "." + event.command_name |
76 | 67 | statement = event.command_name |
@@ -103,38 +94,70 @@ def started(self, event: monitoring.CommandStartedEvent): |
103 | 94 | if span is not None: |
104 | 95 | span.set_status(Status(StatusCanonicalCode.INTERNAL, str(ex))) |
105 | 96 | span.end() |
106 | | - self._remove_span(event) |
| 97 | + self._pop_span(event) |
107 | 98 |
|
108 | 99 | def succeeded(self, event: monitoring.CommandSucceededEvent): |
109 | 100 | """ Method to handle a pymongo CommandSucceededEvent """ |
110 | | - span = self._get_span(event) |
111 | | - if span is not None: |
112 | | - span.set_attribute( |
113 | | - "db.mongo.duration_micros", event.duration_micros |
114 | | - ) |
115 | | - span.set_status(Status(StatusCanonicalCode.OK, event.reply)) |
116 | | - span.end() |
117 | | - self._remove_span(event) |
| 101 | + if not self.is_enabled: |
| 102 | + return |
| 103 | + span = self._pop_span(event) |
| 104 | + if span is None: |
| 105 | + return |
| 106 | + span.set_attribute("db.mongo.duration_micros", event.duration_micros) |
| 107 | + span.set_status(Status(StatusCanonicalCode.OK, event.reply)) |
| 108 | + span.end() |
118 | 109 |
|
119 | 110 | def failed(self, event: monitoring.CommandFailedEvent): |
120 | 111 | """ Method to handle a pymongo CommandFailedEvent """ |
121 | | - span = self._get_span(event) |
122 | | - if span is not None: |
123 | | - span.set_attribute( |
124 | | - "db.mongo.duration_micros", event.duration_micros |
125 | | - ) |
126 | | - span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure)) |
127 | | - span.end() |
128 | | - self._remove_span(event) |
129 | | - |
130 | | - def _get_span(self, event): |
131 | | - return self._span_dict.get(_get_span_dict_key(event)) |
| 112 | + if not self.is_enabled: |
| 113 | + return |
| 114 | + span = self._pop_span(event) |
| 115 | + if span is None: |
| 116 | + return |
| 117 | + span.set_attribute("db.mongo.duration_micros", event.duration_micros) |
| 118 | + span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure)) |
| 119 | + span.end() |
132 | 120 |
|
133 | | - def _remove_span(self, event): |
134 | | - self._span_dict.pop(_get_span_dict_key(event)) |
| 121 | + def _pop_span(self, event): |
| 122 | + return self._span_dict.pop(_get_span_dict_key(event), None) |
135 | 123 |
|
136 | 124 |
|
137 | 125 | def _get_span_dict_key(event): |
138 | 126 | if event.connection_id is not None: |
139 | 127 | return (event.request_id, event.connection_id) |
140 | 128 | return event.request_id |
| 129 | + |
| 130 | + |
| 131 | +class PymongoInstrumentor(BaseInstrumentor): |
| 132 | + _commandtracer_instance = None # type CommandTracer |
| 133 | + # The instrumentation for PyMongo is based on the event listener interface |
| 134 | + # https://api.mongodb.com/python/current/api/pymongo/monitoring.html. |
| 135 | + # This interface only allows to register listeners and does not provide |
| 136 | + # an unregister API. In order to provide a mechanishm to disable |
| 137 | + # instrumentation an enabled flag is implemented in CommandTracer, |
| 138 | + # it's checked in the different listeners. |
| 139 | + |
| 140 | + def _instrument(self, **kwargs): |
| 141 | + """Integrate with pymongo to trace it using event listener. |
| 142 | + https://api.mongodb.com/python/current/api/pymongo/monitoring.html |
| 143 | +
|
| 144 | + Args: |
| 145 | + tracer_provider: The `TracerProvider` to use. If none is passed the |
| 146 | + current configured one is used. |
| 147 | + """ |
| 148 | + |
| 149 | + tracer_provider = kwargs.get("tracer_provider") |
| 150 | + |
| 151 | + # Create and register a CommandTracer only the first time |
| 152 | + if self._commandtracer_instance is None: |
| 153 | + tracer = get_tracer(__name__, __version__, tracer_provider) |
| 154 | + |
| 155 | + self._commandtracer_instance = CommandTracer(tracer) |
| 156 | + monitoring.register(self._commandtracer_instance) |
| 157 | + |
| 158 | + # If already created, just enable it |
| 159 | + self._commandtracer_instance.is_enabled = True |
| 160 | + |
| 161 | + def _uninstrument(self, **kwargs): |
| 162 | + if self._commandtracer_instance is not None: |
| 163 | + self._commandtracer_instance.is_enabled = False |
0 commit comments