forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
240 lines (215 loc) · 9.35 KB
/
__init__.py
File metadata and controls
240 lines (215 loc) · 9.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import collections
import enum
import inspect
import logging
import os
import threading
import time
import weakref
from abc import abstractmethod
from typing import (
Generic,
Optional,
Protocol,
TypeVar,
)
from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opentelemetry.util._once import Once
class DuplicateFilter(logging.Filter):
"""Filter that can be applied to internal `logger`'s.
Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
recursion depth exceeded issue in cases where logging itself results in an exception."""
def filter(self, record):
current_log = (
record.module,
record.levelno,
record.msg,
time.time() // 60,
)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log
return True
# False means python's `logging` module will no longer process this log.
return False
class BatchExportStrategy(enum.Enum):
EXPORT_ALL = 0
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
EXPORT_AT_LEAST_ONE_BATCH = 2
Telemetry = TypeVar("Telemetry")
class Exporter(Protocol[Telemetry]):
@abstractmethod
def export(self, batch: list[Telemetry], /):
raise NotImplementedError
@abstractmethod
def shutdown(self):
raise NotImplementedError
class BatchProcessor(Generic[Telemetry]):
"""This class can be used with exporter's that implement the above
Exporter interface to buffer and send telemetry in batch through
the exporter."""
def __init__(
self,
exporter: Exporter[Telemetry],
schedule_delay_millis: float,
max_export_batch_size: int,
export_timeout_millis: float,
max_queue_size: int,
exporting: str,
):
self._bsp_reset_once = Once()
self._exporter = exporter
self._max_queue_size = max_queue_size
self._schedule_delay_millis = schedule_delay_millis
self._schedule_delay = schedule_delay_millis / 1e3
self._max_export_batch_size = max_export_batch_size
# Not used. No way currently to pass timeout to export.
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
self._export_timeout_millis = export_timeout_millis
# Deque is thread safe.
self._queue = collections.deque([], max_queue_size)
self._worker_thread = threading.Thread(
name=f"OtelBatch{exporting}RecordProcessor",
target=self.worker,
daemon=True,
)
self._logger = logging.getLogger(__name__)
self._logger.addFilter(DuplicateFilter())
self._exporting = exporting
self._shutdown = False
self._shutdown_timeout_exceeded = False
self._export_lock = threading.Lock()
self._worker_awaken = threading.Event()
self._worker_thread.start()
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pyright: ignore[reportOptionalCall] pylint: disable=unnecessary-lambda
self._pid = os.getpid()
def _should_export_batch(
self, batch_strategy: BatchExportStrategy, num_iterations: int
) -> bool:
if not self._queue or self._shutdown_timeout_exceeded:
return False
# Always continue to export while queue length exceeds max batch size.
if len(self._queue) >= self._max_export_batch_size:
return True
if batch_strategy is BatchExportStrategy.EXPORT_ALL:
return True
if batch_strategy is BatchExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
return num_iterations == 0
return False
def _at_fork_reinit(self):
self._export_lock = threading.Lock()
self._worker_awaken = threading.Event()
self._queue.clear()
self._worker_thread = threading.Thread(
name=f"OtelBatch{self._exporting}RecordProcessor",
target=self.worker,
daemon=True,
)
self._worker_thread.start()
self._pid = os.getpid()
def worker(self):
while not self._shutdown:
# Lots of strategies in the spec for setting next timeout.
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
if self._shutdown:
break
self._export(
BatchExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
if sleep_interrupted
else BatchExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
)
self._worker_awaken.clear()
self._export(BatchExportStrategy.EXPORT_ALL)
def _export(self, batch_strategy: BatchExportStrategy) -> None:
with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(
[
# Oldest records are at the back, so pop from there.
self._queue.pop()
for _ in range(
min(
self._max_export_batch_size,
len(self._queue),
)
)
]
)
except Exception: # pylint: disable=broad-exception-caught
self._logger.exception(
"Exception while exporting %s.", self._exporting
)
detach(token)
# Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
# resulting in endless recursive calls that crash the program.
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
def emit(self, data: Telemetry) -> None:
if self._shutdown:
return
if self._pid != os.getpid():
self._bsp_reset_once.do_once(self._at_fork_reinit)
# This will drop a log from the right side if the queue is at _max_queue_length.
self._queue.appendleft(data)
if len(self._queue) >= self._max_export_batch_size:
self._worker_awaken.set()
def shutdown(self, timeout_millis: int = 30000):
if self._shutdown:
return
shutdown_should_end = time.time() + (timeout_millis / 1000)
# Causes emit to reject telemetry and makes force_flush a no-op.
self._shutdown = True
# Interrupts sleep in the worker if it's sleeping.
self._worker_awaken.set()
self._worker_thread.join(timeout_millis / 1000)
# Stops worker thread from calling export again if queue is still not empty.
self._shutdown_timeout_exceeded = True
# We want to shutdown immediately only if we already waited `timeout_secs`.
# Otherwise we pass the remaining timeout to the exporter.
# Some exporter's shutdown support a timeout param.
if (
"timeout_millis"
in inspect.getfullargspec(self._exporter.shutdown).args
):
remaining_millis = (shutdown_should_end - time.time()) * 1000
self._exporter.shutdown(timeout_millis=max(0, remaining_millis)) # type: ignore
else:
self._exporter.shutdown()
# Worker thread **should** be finished at this point, because we called shutdown on the exporter,
# and set shutdown_is_occuring to prevent further export calls. It's possible that a single export
# call is ongoing and the thread isn't finished. In this case we will return instead of waiting on
# the thread to finish.
# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
return False
# Blocking call to export.
self._export(BatchExportStrategy.EXPORT_ALL)
return True