Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4910](https://github.com/open-telemetry/opentelemetry-python/pull/4910))
- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter
([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576))
- `opentelemetry-exporter-otlp-proto-http`: use consistent protobuf for export request
([#5015](https://github.com/open-telemetry/opentelemetry-python/pull/5015))

## Version 1.40.0/0.61b0 (2026-03-04)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
_is_retryable,
_load_session_from_envvar,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
Comment thread
xrmx marked this conversation as resolved.
Expand All @@ -60,8 +60,7 @@
KeyValue,
KeyValueList,
)
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
Comment thread
xrmx marked this conversation as resolved.
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2
Comment thread
xrmx marked this conversation as resolved.
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
)
Expand Down Expand Up @@ -243,18 +242,19 @@ def _export(

def _export_with_retries(
self,
serialized_data: bytes,
export_request: ExportMetricsServiceRequest,
deadline_sec: float,
) -> MetricExportResult:
"""Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out.

Args:
serialized_data: serialized metrics data to export
export_request: ExportMetricsServiceRequest object containing metrics data to export
deadline_sec: timestamp deadline for the export

Returns:
MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise
"""
serialized_data = export_request.SerializeToString()
for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
Expand Down Expand Up @@ -310,23 +310,21 @@ def export(
_logger.warning("Exporter already shutdown, ignoring batch")
return MetricExportResult.FAILURE

serialized_data = encode_metrics(metrics_data)
export_request = encode_metrics(metrics_data)
deadline_sec = time() + self._timeout

# If no batch size configured, export as single batch with retries as configured
if self._max_export_batch_size is None:
return self._export_with_retries(
serialized_data.SerializeToString(), deadline_sec
)
return self._export_with_retries(export_request, deadline_sec)
Comment thread
xrmx marked this conversation as resolved.

# Else, export in batches of configured size
split_metrics_batches = list(
_split_metrics_data(serialized_data, self._max_export_batch_size)
batched_export_requests = _split_metrics_data(
export_request, self._max_export_batch_size
)

for split_metrics_data in split_metrics_batches:
for split_metrics_data in batched_export_requests:
export_result = self._export_with_retries(
split_metrics_data.SerializeToString(),
split_metrics_data,
deadline_sec,
)
if export_result != MetricExportResult.SUCCESS:
Expand All @@ -353,18 +351,18 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:


def _split_metrics_data(
metrics_data: pb2.MetricsData,
metrics_data: ExportMetricsServiceRequest,
max_export_batch_size: int | None = None,
) -> Iterable[pb2.MetricsData]:
"""Splits metrics data into several MetricsData (copies protobuf originals),
) -> Iterable[ExportMetricsServiceRequest]:
"""Splits metrics data into several ExportMetricsServiceRequest (copies protobuf originals),
based on configured data point max export batch size.

Args:
metrics_data: metrics object based on HTTP protocol buffer definition

Returns:
Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing
pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points
Iterable[ExportMetricsServiceRequest]: An iterable of ExportMetricsServiceRequest objects containing
ExportMetricsServiceRequest.ResourceMetrics, ExportMetricsServiceRequest.ScopeMetrics, ExportMetricsServiceRequest.Metrics, and data points
"""
if not max_export_batch_size:
return metrics_data
Expand Down Expand Up @@ -430,7 +428,7 @@ def _split_metrics_data(
batch_size += 1

if batch_size >= max_export_batch_size:
yield pb2.MetricsData(
yield ExportMetricsServiceRequest(
resource_metrics=_get_split_resource_metrics_pb2(
split_resource_metrics
)
Expand Down Expand Up @@ -491,7 +489,7 @@ def _split_metrics_data(
split_resource_metrics.pop()

if batch_size > 0:
yield pb2.MetricsData(
yield ExportMetricsServiceRequest(
resource_metrics=_get_split_resource_metrics_pb2(
split_resource_metrics
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
_split_metrics_data,
)
from opentelemetry.exporter.otlp.proto.http.version import __version__
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.common.v1.common_pb2 import (
InstrumentationScope,
KeyValue,
Expand Down Expand Up @@ -374,7 +377,7 @@ def test_serialization(self, mock_post):
)

def test_split_metrics_data_many_data_points(self):
metrics_data = pb2.MetricsData(
metrics_data = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -396,7 +399,7 @@ def test_split_metrics_data_many_data_points(self):
),
]
)
split_metrics_data: List[MetricsData] = list(
split_metrics_data: List[ExportMetricsServiceRequest] = list(
# pylint: disable=protected-access
_split_metrics_data(
metrics_data=metrics_data,
Expand All @@ -406,7 +409,7 @@ def test_split_metrics_data_many_data_points(self):

self.assertEqual(
[
pb2.MetricsData(
ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -427,7 +430,7 @@ def test_split_metrics_data_many_data_points(self):
),
]
),
pb2.MetricsData(
ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -452,7 +455,7 @@ def test_split_metrics_data_many_data_points(self):
)

def test_split_metrics_data_nb_data_points_equal_batch_size(self):
metrics_data = pb2.MetricsData(
metrics_data = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -475,7 +478,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):
]
)

split_metrics_data: List[MetricsData] = list(
split_metrics_data: List[ExportMetricsServiceRequest] = list(
# pylint: disable=protected-access
_split_metrics_data(
metrics_data=metrics_data,
Expand All @@ -485,7 +488,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):

self.assertEqual(
[
pb2.MetricsData(
ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -513,7 +516,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):

def test_split_metrics_data_many_resources_scopes_metrics(self):
# GIVEN
metrics_data = pb2.MetricsData(
metrics_data = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -567,7 +570,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
]
)

split_metrics_data: List[MetricsData] = list(
split_metrics_data: List[ExportMetricsServiceRequest] = list(
# pylint: disable=protected-access
_split_metrics_data(
metrics_data=metrics_data,
Expand All @@ -577,7 +580,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):

self.assertEqual(
[
pb2.MetricsData(
ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -603,7 +606,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
),
]
),
pb2.MetricsData(
ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -858,7 +861,7 @@ def test_export_retries_with_batching_success(
MagicMock(ok=True),
MagicMock(ok=True),
]
mock_encode_metrics.return_value = pb2.MetricsData(
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
Comment thread
xrmx marked this conversation as resolved.
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -880,7 +883,7 @@ def test_export_retries_with_batching_success(
),
]
)
batch_1 = pb2.MetricsData(
batch_1 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -901,7 +904,7 @@ def test_export_retries_with_batching_success(
),
]
)
batch_2 = pb2.MetricsData(
batch_2 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -954,7 +957,7 @@ def test_export_retries_with_batching_failure_first(
MagicMock(ok=True),
MagicMock(ok=True),
]
mock_encode_metrics.return_value = pb2.MetricsData(
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -976,7 +979,7 @@ def test_export_retries_with_batching_failure_first(
),
]
)
batch_1 = pb2.MetricsData(
batch_1 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -1031,7 +1034,7 @@ def test_export_retries_with_batching_failure_last(
# Non-retryable
MagicMock(ok=False, status_code=400, reason="bad request"),
]
mock_encode_metrics.return_value = pb2.MetricsData(
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -1053,7 +1056,7 @@ def test_export_retries_with_batching_failure_last(
),
]
)
batch_1 = pb2.MetricsData(
batch_1 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -1074,7 +1077,7 @@ def test_export_retries_with_batching_failure_last(
),
]
)
batch_2 = pb2.MetricsData(
batch_2 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down Expand Up @@ -1131,7 +1134,7 @@ def test_export_retries_with_batching_failure_retryable(
# Then success
MagicMock(ok=True),
]
mock_encode_metrics.return_value = pb2.MetricsData(
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -1153,7 +1156,7 @@ def test_export_retries_with_batching_failure_retryable(
),
]
)
batch_1 = pb2.MetricsData(
batch_1 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand All @@ -1174,7 +1177,7 @@ def test_export_retries_with_batching_failure_retryable(
),
]
)
batch_2 = pb2.MetricsData(
batch_2 = ExportMetricsServiceRequest(
resource_metrics=[
_resource_metrics(
index=1,
Expand Down
Loading