Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: fix type annotations on `MetricReader` and related types ([#4938](https://github.com/open-telemetry/opentelemetry-python/pull/4938/))
- `opentelemetry-sdk`: deprecate `LoggingHandler` in favor of `opentelemetry-instrumentation-logging`, see `opentelemetry-instrumentation-logging` documentation
([#4919](https://github.com/open-telemetry/opentelemetry-python/pull/4919))
- `opentelemetry-sdk`: Clarify log processor error handling expectations in documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ class ConsoleLogRecordExporter(LogRecordExporter):
def __init__(
self,
out: IO = sys.stdout,
formatter: Callable[
[ReadableLogRecord], str
] = lambda record: record.to_json() + linesep,
formatter: Callable[[ReadableLogRecord], str] = lambda record: (
record.to_json() + linesep
),
):
self.out = out
self.formatter = formatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ class ConsoleMetricExporter(MetricExporter):
def __init__(
self,
out: IO = stdout,
formatter: Callable[
[MetricsData], str
] = lambda metrics_data: metrics_data.to_json() + linesep,
formatter: Callable[[MetricsData], str] = lambda metrics_data: (
metrics_data.to_json() + linesep
),
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
preferred_aggregation: dict[
Expand Down Expand Up @@ -353,7 +353,7 @@ def _set_collect_callback(
"opentelemetry.sdk.metrics.export.MetricReader",
AggregationTemporality,
],
Iterable["opentelemetry.sdk.metrics.export.Metric"],
MetricsData,
],
) -> None:
"""This function is internal to the SDK. It should not be called or overridden by users"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping, Optional
from typing import List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand All @@ -29,7 +29,7 @@
from opentelemetry.sdk.metrics._internal.metric_reader_storage import (
MetricReaderStorage,
)
from opentelemetry.sdk.metrics._internal.point import Metric
from opentelemetry.sdk.metrics._internal.point import MetricsData


class MeasurementConsumer(ABC):
Expand All @@ -51,7 +51,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Optional[Iterable[Metric]]:
) -> Optional[MetricsData]:
pass


Expand Down Expand Up @@ -104,7 +104,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Optional[Iterable[Metric]]:
) -> Optional[MetricsData]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ def __init__(
self,
service_name: str | None = None,
out: typing.IO = sys.stdout,
formatter: typing.Callable[
[ReadableSpan], str
] = lambda span: span.to_json() + linesep,
formatter: typing.Callable[[ReadableSpan], str] = lambda span: (
span.to_json() + linesep
),
):
self.out = out
self.formatter = formatter
Expand Down
40 changes: 34 additions & 6 deletions opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,32 @@

from opentelemetry.metrics import Observation
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics._internal.point import (
MetricsData,
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
InMemoryMetricReader,
Metric,
NumberDataPoint,
Sum,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope


class TestInMemoryMetricReader(TestCase):
def test_no_metrics(self):
mock_collect_callback = Mock(return_value=[])
mock_collect_callback = Mock(
return_value=MetricsData(resource_metrics=[])
)
reader = InMemoryMetricReader()
reader._set_collect_callback(mock_collect_callback)
self.assertEqual(reader.get_metrics_data(), [])
self.assertEqual(
reader.get_metrics_data(), MetricsData(resource_metrics=[])
)
mock_collect_callback.assert_called_once()

def test_converts_metrics_to_list(self):
Expand All @@ -55,15 +66,32 @@ def test_converts_metrics_to_list(self):
is_monotonic=True,
),
)
mock_collect_callback = Mock(return_value=(metric,))
metric_data = MetricsData(
resource_metrics=[
ResourceMetrics(
scope_metrics=[
ScopeMetrics(
metrics=[metric],
scope=InstrumentationScope(name="test"),
schema_url="",
)
],
resource=Resource.create(),
schema_url="",
)
]
)
mock_collect_callback = Mock(return_value=metric_data)
reader = InMemoryMetricReader()
reader._set_collect_callback(mock_collect_callback)

returned_metrics = reader.get_metrics_data()
mock_collect_callback.assert_called_once()
self.assertIsInstance(returned_metrics, tuple)
self.assertEqual(len(returned_metrics), 1)
self.assertIs(returned_metrics[0], metric)
self.assertIsNotNone(returned_metrics)
self.assertIs(
returned_metrics.resource_metrics[0].scope_metrics[0].metrics[0],
metric,
)

def test_shutdown(self):
# shutdown should always be successful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import weakref
from logging import WARNING
from time import sleep, time_ns
from typing import Optional, Sequence
from typing import Optional
from unittest.mock import Mock

import pytest

from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError
from opentelemetry.sdk.metrics._internal import _Counter
from opentelemetry.sdk.metrics._internal.point import (
MetricsData,
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Gauge,
Expand All @@ -40,6 +45,8 @@
DefaultAggregation,
LastValueAggregation,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.test.concurrency_test import ConcurrencyTestBase


Expand All @@ -48,7 +55,7 @@ def __init__(
self, wait=0, preferred_temporality=None, preferred_aggregation=None
):
self.wait = wait
self.metrics = []
self.metrics: list[MetricsData] = []
self._shutdown = False
super().__init__(
preferred_temporality=preferred_temporality,
Expand All @@ -57,13 +64,13 @@ def __init__(

def export(
self,
metrics_data: Sequence[Metric],
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
sleep(self.wait)
self.metrics.extend(metrics_data)
return True
self.metrics.append(metrics_data)
return MetricExportResult.SUCCESS

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._shutdown = True
Expand Down Expand Up @@ -126,6 +133,21 @@ def collect(self, timeout_millis: float = 10_000) -> None:
),
),
]
metrics = MetricsData(
resource_metrics=[
ResourceMetrics(
scope_metrics=[
ScopeMetrics(
metrics=metrics_list,
scope=InstrumentationScope(name="test"),
schema_url="",
)
],
resource=Resource.create(),
schema_url="",
)
]
)


class TestPeriodicExportingMetricReader(ConcurrencyTestBase):
Expand All @@ -137,7 +159,12 @@ def test_defaults(self):
pmr.shutdown()

def _create_periodic_reader(
self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000
self,
metrics_data: MetricsData,
exporter,
collect_wait=0,
interval=60000,
timeout=30000,
):
pmr = PeriodicExportingMetricReader(
exporter,
Expand All @@ -147,7 +174,7 @@ def _create_periodic_reader(

def _collect(reader, timeout_millis):
sleep(collect_wait)
pmr._receive_metrics(metrics, timeout_millis)
return metrics_data

pmr._set_collect_callback(_collect)
return pmr
Expand Down Expand Up @@ -198,24 +225,26 @@ def test_ticker_value_exception_on_negative(self):
def test_ticker_collects_metrics(self):
exporter = FakeMetricsExporter()

pmr = self._create_periodic_reader(
metrics_list, exporter, interval=100
)
pmr = self._create_periodic_reader(metrics, exporter, interval=100)
sleep(0.15)
self.assertEqual(exporter.metrics, metrics_list)
self.assertEqual(exporter.metrics[0], metrics)
pmr.shutdown()

def test_shutdown(self):
exporter = FakeMetricsExporter()

pmr = self._create_periodic_reader([], exporter)
pmr = self._create_periodic_reader(
MetricsData(resource_metrics=[]), exporter
)
pmr.shutdown()
self.assertEqual(exporter.metrics, [])
self.assertEqual(exporter.metrics[0], MetricsData(resource_metrics=[]))
self.assertTrue(pmr._shutdown)
self.assertTrue(exporter._shutdown)

def test_shutdown_multiple_times(self):
pmr = self._create_periodic_reader([], FakeMetricsExporter())
pmr = self._create_periodic_reader(
MetricsData(resource_metrics=[]), FakeMetricsExporter()
)
with self.assertLogs(level="WARNING") as w:
self.run_with_many_threads(pmr.shutdown)
self.assertTrue("Can't shutdown multiple times" in w.output[0])
Expand Down
Loading