Skip to content

Commit 8e3ff20

Browse files
authored
Merge branch 'main' into fix/explicit-bucket-histogram-advisory-guard
2 parents 5e58552 + d9a8c70 commit 8e3ff20

13 files changed

Lines changed: 730 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414

1515
- `opentelemetry-sdk`: Fix `AttributeError` in `ExplicitBucketHistogramAggregation` when applied to non-Histogram instruments without explicit boundaries
1616
([#5034](https://github.com/open-telemetry/opentelemetry-python/pull/5034))
17+
- Fix `BatchLogRecordProcessor` default `schedule_delay_millis` from 5000ms to 1000ms to comply with the OTel specification. Note: logs may be exported 5x more frequently by default (e.g. for users who don't explicitly set the `OTEL_BLRP_SCHEDULE_DELAY` env var).
18+
([#4998](https://github.com/open-telemetry/opentelemetry-python/pull/4998))
1719
- `opentelemetry-sdk`: Add `process` resource detector support to declarative file configuration via `detection_development.detectors[].process`
1820
([#5001](https://github.com/open-telemetry/opentelemetry-python/pull/5001))
1921
- `opentelemetry-sdk`: Add shared `_parse_headers` helper for declarative config OTLP exporters
@@ -33,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3335
([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935))
3436
- `opentelemetry-sdk`: implement metric reader metrics
3537
([#4970](https://github.com/open-telemetry/opentelemetry-python/pull/4970))
38+
- `opentelemetry-sdk`: implement processor metrics
39+
([#5012](https://github.com/open-telemetry/opentelemetry-python/pull/5012))
3640
- `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0
3741
([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965))
3842
- improve check-links ci job

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,29 @@
3030
get_value,
3131
set_value,
3232
)
33+
from opentelemetry.metrics import MeterProvider, get_meter_provider
3334
from opentelemetry.sdk._logs import (
3435
LogRecordProcessor,
3536
ReadableLogRecord,
3637
ReadWriteLogRecord,
3738
)
38-
from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter
39+
from opentelemetry.sdk._shared_internal import (
40+
BatchProcessor,
41+
DuplicateFilter,
42+
ProcessorMetrics,
43+
)
3944
from opentelemetry.sdk.environment_variables import (
4045
OTEL_BLRP_EXPORT_TIMEOUT,
4146
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
4247
OTEL_BLRP_MAX_QUEUE_SIZE,
4348
OTEL_BLRP_SCHEDULE_DELAY,
4449
)
4550
from opentelemetry.sdk.resources import Resource
51+
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
52+
OtelComponentTypeValues,
53+
)
4654

47-
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
55+
_DEFAULT_SCHEDULE_DELAY_MILLIS = 1000
4856
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
4957
_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
5058
_DEFAULT_MAX_QUEUE_SIZE = 2048
@@ -170,9 +178,19 @@ class SimpleLogRecordProcessor(LogRecordProcessor):
170178
propagating to the application.
171179
"""
172180

173-
def __init__(self, exporter: LogRecordExporter):
181+
def __init__(
182+
self,
183+
exporter: LogRecordExporter,
184+
*,
185+
meter_provider: MeterProvider | None = None,
186+
):
174187
self._exporter = exporter
175188
self._shutdown = False
189+
self._metrics = ProcessorMetrics(
190+
"logs",
191+
OtelComponentTypeValues.SIMPLE_LOG_PROCESSOR,
192+
meter_provider or get_meter_provider(),
193+
)
176194

177195
def on_emit(self, log_record: ReadWriteLogRecord):
178196
# Prevent entering a recursive loop.
@@ -193,6 +211,7 @@ def on_emit(self, log_record: ReadWriteLogRecord):
193211
set_value(_ON_EMIT_RECURSION_COUNT_KEY, cnt + 1), # pyright: ignore[reportOperatorIssue]
194212
)
195213
)
214+
error: Exception | None = None
196215
try:
197216
if self._shutdown:
198217
_logger.warning("Processor is already shutdown, ignoring call")
@@ -211,9 +230,11 @@ def on_emit(self, log_record: ReadWriteLogRecord):
211230
limits=log_record.limits,
212231
)
213232
self._exporter.export((readable_log_record,))
214-
except Exception: # pylint: disable=broad-exception-caught
233+
except Exception as err: # pylint: disable=broad-exception-caught
234+
error = err
215235
_logger.exception("Exception while exporting logs.")
216236
finally:
237+
self._metrics.finish_items(1, error)
217238
detach(token)
218239

219240
def shutdown(self):
@@ -246,6 +267,8 @@ def __init__(
246267
max_export_batch_size: int | None = None,
247268
export_timeout_millis: float | None = None,
248269
max_queue_size: int | None = None,
270+
*,
271+
meter_provider: MeterProvider | None = None,
249272
):
250273
if max_queue_size is None:
251274
max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
@@ -276,6 +299,12 @@ def __init__(
276299
export_timeout_millis,
277300
max_queue_size,
278301
"Log",
302+
ProcessorMetrics(
303+
"logs",
304+
OtelComponentTypeValues.BATCHING_LOG_PROCESSOR,
305+
meter_provider or get_meter_provider(),
306+
capacity=max_queue_size,
307+
),
279308
)
280309

281310
def on_emit(self, log_record: ReadWriteLogRecord) -> None:

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
detach,
3737
set_value,
3838
)
39+
from opentelemetry.sdk._shared_internal._processor_metrics import (
40+
ProcessorMetrics,
41+
)
3942
from opentelemetry.util._once import Once
4043

4144

@@ -98,6 +101,7 @@ def __init__(
98101
export_timeout_millis: float,
99102
max_queue_size: int,
100103
exporting: str,
104+
metrics: ProcessorMetrics,
101105
):
102106
self._bsp_reset_once = Once()
103107
self._exporter = exporter
@@ -127,6 +131,9 @@ def __init__(
127131
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pyright: ignore[reportOptionalCall] pylint: disable=unnecessary-lambda
128132
self._pid = os.getpid()
129133

134+
metrics.register_queue_size(lambda: len(self._queue))
135+
self._metrics = metrics
136+
130137
def _should_export_batch(
131138
self, batch_strategy: BatchExportStrategy, num_iterations: int
132139
) -> bool:
@@ -177,23 +184,27 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
177184
while self._should_export_batch(batch_strategy, iteration):
178185
iteration += 1
179186
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
187+
error: Exception | None = None
188+
count = 0
180189
try:
190+
count = min(
191+
self._max_export_batch_size,
192+
len(self._queue),
193+
)
181194
self._exporter.export(
182195
[
183196
# Oldest records are at the back, so pop from there.
184197
self._queue.pop()
185-
for _ in range(
186-
min(
187-
self._max_export_batch_size,
188-
len(self._queue),
189-
)
190-
)
198+
for _ in range(count)
191199
]
192200
)
193-
except Exception: # pylint: disable=broad-exception-caught
201+
except Exception as err: # pylint: disable=broad-exception-caught
202+
error = err
194203
_logger.exception(
195204
"Exception while exporting %s.", self._exporting
196205
)
206+
finally:
207+
self._metrics.finish_items(count, error)
197208
detach(token)
198209

199210
def emit(self, data: Telemetry) -> None:
@@ -204,6 +215,7 @@ def emit(self, data: Telemetry) -> None:
204215
self._bsp_reset_once.do_once(self._at_fork_reinit)
205216
if len(self._queue) == self._max_queue_size:
206217
_logger.warning("Queue full, dropping %s.", self._exporting)
218+
self._metrics.drop_items(1)
207219
# This will drop a log from the right side if the queue is at _max_queue_size.
208220
self._queue.appendleft(data)
209221
if len(self._queue) >= self._max_export_batch_size:
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from collections import Counter
18+
from collections.abc import Callable
19+
from typing import Literal
20+
21+
from opentelemetry.metrics import CallbackOptions, MeterProvider, Observation
22+
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
23+
OTEL_COMPONENT_NAME,
24+
OTEL_COMPONENT_TYPE,
25+
OtelComponentTypeValues,
26+
)
27+
from opentelemetry.semconv._incubating.metrics.otel_metrics import (
28+
OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE,
29+
OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE,
30+
create_otel_sdk_processor_log_processed,
31+
create_otel_sdk_processor_log_queue_capacity,
32+
create_otel_sdk_processor_span_processed,
33+
create_otel_sdk_processor_span_queue_capacity,
34+
)
35+
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
36+
37+
_component_counter = Counter()
38+
39+
40+
class ProcessorMetrics:
41+
def __init__(
42+
self,
43+
signal: Literal["traces", "logs"],
44+
component_type: OtelComponentTypeValues,
45+
meter_provider: MeterProvider,
46+
*,
47+
capacity: int | None = None,
48+
) -> None:
49+
self._signal = signal
50+
meter = meter_provider.get_meter("opentelemetry-sdk")
51+
self._meter = meter
52+
53+
count = _component_counter[component_type.value]
54+
_component_counter[component_type.value] = count + 1
55+
56+
self._standard_attrs = {
57+
OTEL_COMPONENT_TYPE: component_type.value,
58+
OTEL_COMPONENT_NAME: f"{component_type.value}/{count}",
59+
}
60+
61+
self._dropped_attrs = {
62+
**self._standard_attrs,
63+
ERROR_TYPE: "queue_full",
64+
}
65+
66+
if signal == "traces":
67+
create_processed = create_otel_sdk_processor_span_processed
68+
create_queue_capacity = (
69+
create_otel_sdk_processor_span_queue_capacity
70+
)
71+
else:
72+
create_processed = create_otel_sdk_processor_log_processed
73+
create_queue_capacity = (
74+
create_otel_sdk_processor_log_queue_capacity
75+
)
76+
77+
self._processed = create_processed(meter)
78+
79+
if capacity is not None:
80+
self._queue_capacity = create_queue_capacity(meter)
81+
self._queue_capacity.add(capacity, self._standard_attrs)
82+
83+
def register_queue_size(self, get_queue_size: Callable[[], int]) -> None:
84+
def record_queue_size(
85+
_options: CallbackOptions,
86+
) -> tuple[Observation]:
87+
return (Observation(get_queue_size(), self._standard_attrs),)
88+
89+
if self._signal == "traces":
90+
queue_size_name = OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE
91+
queue_size_description = "The number of spans in the queue of a given instance of an SDK span processor."
92+
queue_size_unit = "{span}"
93+
else:
94+
queue_size_name = OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE
95+
queue_size_description = "The number of logs in the queue of a given instance of an SDK log processor."
96+
queue_size_unit = "{log}"
97+
98+
self._meter.create_observable_up_down_counter(
99+
queue_size_name,
100+
callbacks=(record_queue_size,),
101+
description=queue_size_description,
102+
unit=queue_size_unit,
103+
)
104+
105+
def drop_items(self, count: int) -> None:
106+
self._processed.add(count, self._dropped_attrs)
107+
108+
def finish_items(self, count: int, error: Exception | None) -> None:
109+
if not error:
110+
self._processed.add(count, self._standard_attrs)
111+
return
112+
attrs = {
113+
**self._standard_attrs,
114+
ERROR_TYPE: type(error).__name__,
115+
}
116+
self._processed.add(count, attrs)

opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
.. envvar:: OTEL_BLRP_SCHEDULE_DELAY
8080
8181
The :envvar:`OTEL_BLRP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchLogRecordProcessor.
82-
Default: 5000
82+
Default: 1000
8383
"""
8484

8585
OTEL_BLRP_EXPORT_TIMEOUT = "OTEL_BLRP_EXPORT_TIMEOUT"

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515

16+
from opentelemetry.sdk.metrics import export, view
1617
from opentelemetry.sdk.metrics._internal import Meter, MeterProvider
1718
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
1819
from opentelemetry.sdk.metrics._internal.exemplar import (
@@ -54,4 +55,6 @@
5455
"SimpleFixedSizeExemplarReservoir",
5556
"UpDownCounter",
5657
"TraceBasedExemplarFilter",
58+
"export",
59+
"view",
5760
]

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ def get_meter(
680680
version: Optional[str] = None,
681681
schema_url: Optional[str] = None,
682682
attributes: Optional[Attributes] = None,
683-
) -> Meter:
683+
) -> APIMeter:
684684
if self._disabled:
685685
return NoOpMeter(name, version=version, schema_url=schema_url)
686686

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ def consume_measurement(self, measurement: Measurement) -> None:
4141
def register_asynchronous_instrument(
4242
self,
4343
instrument: (
44-
"opentelemetry.sdk.metrics._internal.instrument_Asynchronous"
44+
"opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
4545
),
4646
):
4747
pass
4848

4949
@abstractmethod
5050
def collect(
5151
self,
52-
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
52+
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
5353
timeout_millis: float = 10_000,
5454
) -> Optional[MetricsData]:
5555
pass
@@ -102,7 +102,7 @@ def register_asynchronous_instrument(
102102

103103
def collect(
104104
self,
105-
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
105+
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
106106
timeout_millis: float = 10_000,
107107
) -> Optional[MetricsData]:
108108
with self._lock:

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@
2626
class SdkConfiguration:
2727
exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter"
2828
resource: "opentelemetry.sdk.resources.Resource"
29-
metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"]
30-
views: Sequence["opentelemetry.sdk.metrics.View"]
29+
metric_readers: Sequence["opentelemetry.sdk.metrics.export.MetricReader"]
30+
views: Sequence["opentelemetry.sdk.metrics.view.View"]

0 commit comments

Comments
 (0)