diff --git a/CHANGELOG.md b/CHANGELOG.md index 8af387c337..87a6fd6d38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#5015](https://github.com/open-telemetry/opentelemetry-python/pull/5015)) - `opentelemetry-sdk`: cache TracerConfig into the tracer, this changes an internal interface. Only one Tracer with the same instrumentation scope will be created ([#5007](https://github.com/open-telemetry/opentelemetry-python/pull/5007)) +- Redo OTLPMetricExporter unit tests of `max_export_batch_size` to use real `export` + ([#5036](https://github.com/open-telemetry/opentelemetry-python/pull/5036)) ## Version 1.40.0/0.61b0 (2026-03-04) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 75b42e60b3..8fe57f3553 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -19,7 +19,7 @@ from os import environ from typing import List from unittest import TestCase -from unittest.mock import ANY, MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, patch import requests from requests import Session @@ -841,374 +841,204 @@ def test_get_split_resource_metrics_pb2_unsupported_metric_type(self): log.output[0], ) - @patch.object(OTLPMetricExporter, "_export") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") - @patch( - "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" - ) - def test_export_retries_with_batching_success( - self, - mock_encode_metrics, - mock_time, - mock_random, - mock_export, - ): - mock_time.return_value = 0 - mock_random.uniform.return_value = 1 - mock_export.side_effect = [ - # Success - MagicMock(ok=True), - MagicMock(ok=True), - ] - mock_encode_metrics.return_value = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] - ) - batch_1 = ExportMetricsServiceRequest( + @staticmethod + def _create_metrics_data_multiple_data_points( + num_data_points: int, + ) -> MetricsData: + """Helper to create MetricsData with specified number of data points for testing batch splitting.""" + metrics = [] + for idx in range(num_data_points): + metrics.append(_generate_sum(f"sum_int_{idx}", 33)) + + return MetricsData( resource_metrics=[ - _resource_metrics( - index=1, + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - ], - ), - ], - ), + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=metrics, + schema_url="instrumentation_scope_schema_url", + ) ], - ), + schema_url="resource_schema_url", + ) ] ) - batch_2 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] + + @patch.object(Session, "post") + def test_export_max_export_batch_size_single_batch_integration( + self, mock_post + ): + resp = Response() + resp.status_code = 200 + mock_post.return_value = resp + + # 2 data points, batch size of 3: fits in one batch + metrics_data = ( + TestOTLPMetricExporter._create_metrics_data_multiple_data_points(2) ) + exporter = OTLPMetricExporter(max_export_batch_size=3) + result = exporter.export(metrics_data) - exporter = OTLPMetricExporter(max_export_batch_size=2) - result = exporter.export("foo") self.assertEqual(result, MetricExportResult.SUCCESS) - self.assertEqual(mock_export.call_count, 2) - mock_export.assert_has_calls( - [ - call(batch_1.SerializeToString(), 10), - call(batch_2.SerializeToString(), 10), - ] + self.assertEqual(mock_post.call_count, 1) + mock_post.assert_called_once() + + call_args = mock_post.call_args + self.assertEqual(call_args.kwargs["url"], exporter._endpoint) + self.assertIsInstance(call_args.kwargs["data"], bytes) + self.assertEqual( + call_args.kwargs["verify"], exporter._certificate_file ) + batch_data = call_args.kwargs["data"] + request = ExportMetricsServiceRequest() + request.ParseFromString(batch_data) + self.assertEqual(len(request.resource_metrics), 1) + metrics = request.resource_metrics[0].scope_metrics[0].metrics + self.assertEqual(len(metrics), 2) + metric_names = {metric.name for metric in metrics} + self.assertEqual(metric_names, {"sum_int_0", "sum_int_1"}) - @patch.object(OTLPMetricExporter, "_export") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") - @patch( - "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" - ) - def test_export_retries_with_batching_failure_first( - self, - mock_encode_metrics, - mock_time, - mock_random, - mock_export, + @patch.object(Session, "post") + def test_export_max_export_batch_size_multiple_batches_integration( + self, mock_post ): - mock_time.return_value = 0 - mock_random.uniform.return_value = 1 - mock_export.side_effect = [ - # Non-retryable - MagicMock(ok=False, status_code=400, reason="bad request"), - MagicMock(ok=True), - MagicMock(ok=True), - ] - mock_encode_metrics.return_value = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] - ) - batch_1 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - ], - ), - ], - ), - ], - ), - ] - ) + resp = Response() + resp.status_code = 200 + mock_post.return_value = resp - exporter = OTLPMetricExporter(max_export_batch_size=2) - result = exporter.export("foo") - # Return FAILURE when first batch fails (consistent with gRPC) - self.assertEqual(result, MetricExportResult.FAILURE) - # Only first batch is exported before failure - self.assertEqual(mock_export.call_count, 1) - mock_export.assert_has_calls( - [ - call(batch_1.SerializeToString(), 10), - ] + # 3 data points, batch size of 2: requires 2 batches + metrics_data = ( + TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) ) + exporter = OTLPMetricExporter(max_export_batch_size=2) + result = exporter.export(metrics_data) - @patch.object(OTLPMetricExporter, "_export") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") - @patch( - "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" - ) - def test_export_retries_with_batching_failure_last( - self, - mock_encode_metrics, - mock_time, - mock_random, - mock_export, - ): - mock_time.return_value = 0 - mock_random.uniform.return_value = 1 - mock_export.side_effect = [ - # Success - MagicMock(ok=True), - # Non-retryable - MagicMock(ok=False, status_code=400, reason="bad request"), - ] - mock_encode_metrics.return_value = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] - ) - batch_1 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - ], - ), - ], - ), - ], - ), - ] + self.assertEqual(result, MetricExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 2) + + for call_args in mock_post.call_args_list: + self.assertEqual(call_args.kwargs["url"], exporter._endpoint) + self.assertIsInstance(call_args.kwargs["data"], bytes) + self.assertEqual( + call_args.kwargs["verify"], exporter._certificate_file + ) + self.assertEqual(len(mock_post.call_args_list), 2) + + # First batch should contain sum_int_0 and sum_int_1 + first_batch_data = mock_post.call_args_list[0].kwargs["data"] + first_request = ExportMetricsServiceRequest() + first_request.ParseFromString(first_batch_data) + self.assertEqual(len(first_request.resource_metrics), 1) + first_metrics = ( + first_request.resource_metrics[0].scope_metrics[0].metrics ) - batch_2 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] + self.assertEqual(len(first_metrics), 2) + first_metric_names = {metric.name for metric in first_metrics} + self.assertEqual(first_metric_names, {"sum_int_0", "sum_int_1"}) + + # Second batch should contain sum_int_2 + second_batch_data = mock_post.call_args_list[1].kwargs["data"] + second_request = ExportMetricsServiceRequest() + second_request.ParseFromString(second_batch_data) + self.assertEqual(len(second_request.resource_metrics), 1) + second_metrics = ( + second_request.resource_metrics[0].scope_metrics[0].metrics ) + self.assertEqual(len(second_metrics), 1) + self.assertEqual(second_metrics[0].name, "sum_int_2") + @patch.object(Session, "post") + def test_export_max_export_batch_size_retry_scenarios_integration( + self, mock_post + ): + # Setup HTTP responses: first request succeeds, second fails non-retryable + success_resp = Response() + success_resp.status_code = 200 + failure_resp = Response() + failure_resp.status_code = 400 + failure_resp.reason = "Bad Request" + mock_post.side_effect = [success_resp, failure_resp] + + # 3 data points, batch size of 2: requires 2 batches + metrics_data = ( + TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) + ) exporter = OTLPMetricExporter(max_export_batch_size=2) - result = exporter.export("foo") + + # Export should fail when second batch fails + result = exporter.export(metrics_data) self.assertEqual(result, MetricExportResult.FAILURE) - self.assertEqual(mock_export.call_count, 2) - mock_export.assert_has_calls( - [ - call(batch_1.SerializeToString(), 10), - call(batch_2.SerializeToString(), 10), - ] + self.assertEqual(mock_post.call_count, 2) + + # Verify the content of successful first batch + first_batch_data = mock_post.call_args_list[0].kwargs["data"] + first_request = ExportMetricsServiceRequest() + first_request.ParseFromString(first_batch_data) + self.assertEqual(len(first_request.resource_metrics), 1) + first_metrics = ( + first_request.resource_metrics[0].scope_metrics[0].metrics ) + self.assertEqual(len(first_metrics), 2) + first_metric_names = {metric.name for metric in first_metrics} + self.assertEqual(first_metric_names, {"sum_int_0", "sum_int_1"}) - @patch.object(OTLPMetricExporter, "_export") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.random") - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.time") - @patch( - "opentelemetry.exporter.otlp.proto.http.metric_exporter.encode_metrics" - ) - def test_export_retries_with_batching_failure_retryable( - self, - mock_encode_metrics, - mock_time, - mock_random, - mock_export, + @patch.object(Session, "post") + def test_export_max_export_batch_size_retryable_failure_integration( + self, mock_post ): - mock_time.return_value = 0 - mock_random.uniform.return_value = 1 - mock_export.side_effect = [ - # Success - MagicMock(ok=True), - # Retryable - MagicMock( - ok=False, status_code=500, reason="internal server error" - ), - # Then success - MagicMock(ok=True), + success_resp = Response() + success_resp.status_code = 200 + retryable_failure_resp = Response() + retryable_failure_resp.status_code = 503 + retryable_failure_resp.reason = "Service Unavailable" + mock_post.side_effect = [ + success_resp, + retryable_failure_resp, + success_resp, ] - mock_encode_metrics.return_value = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] - ) - batch_1 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(11), - _number_data_point(12), - ], - ), - ], - ), - ], - ), - ] - ) - batch_2 = ExportMetricsServiceRequest( - resource_metrics=[ - _resource_metrics( - index=1, - scope_metrics=[ - _scope_metrics( - index=1, - metrics=[ - _gauge( - index=1, - data_points=[ - _number_data_point(13), - ], - ), - ], - ), - ], - ), - ] + + # 3 data points, batch size of 2: requires 2 batches + metrics_data = ( + TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) ) + exporter = OTLPMetricExporter(max_export_batch_size=2, timeout=2.0) - exporter = OTLPMetricExporter(max_export_batch_size=2) - result = exporter.export("foo") + # Export should eventually succeed after retry + result = exporter.export(metrics_data) self.assertEqual(result, MetricExportResult.SUCCESS) - self.assertEqual(mock_export.call_count, 3) - mock_export.assert_has_calls( - [ - call(batch_1.SerializeToString(), 10), - call(batch_2.SerializeToString(), 10), - call(batch_2.SerializeToString(), 10), - ] + self.assertEqual( + mock_post.call_count, 3 + ) # First batch + retry of second batch + + first_batch_data = mock_post.call_args_list[0].kwargs["data"] + first_request = ExportMetricsServiceRequest() + first_request.ParseFromString(first_batch_data) + self.assertEqual(len(first_request.resource_metrics), 1) + first_metrics = ( + first_request.resource_metrics[0].scope_metrics[0].metrics + ) + self.assertEqual(len(first_metrics), 2) + first_metric_names = {metric.name for metric in first_metrics} + self.assertEqual(first_metric_names, {"sum_int_0", "sum_int_1"}) + # Second batch (retry) should contain sum_int_2 + second_batch_data = mock_post.call_args_list[2].kwargs["data"] + second_request = ExportMetricsServiceRequest() + second_request.ParseFromString(second_batch_data) + self.assertEqual(len(second_request.resource_metrics), 1) + second_metrics = ( + second_request.resource_metrics[0].scope_metrics[0].metrics ) + self.assertEqual(len(second_metrics), 1) + self.assertEqual(second_metrics[0].name, "sum_int_2") def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter()