Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4458](https://github.com/open-telemetry/opentelemetry-python/pull/4458))
- pylint-ci updated python version to 3.13
([#4450](https://github.com/open-telemetry/opentelemetry-python/pull/4450))
- Fix memory leak in Log & Trace exporter
([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449))

## Version 1.30.0/0.51b0 (2025-02-03)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import sys
import threading
import weakref
from os import environ, linesep
from time import time_ns
from typing import IO, Callable, Deque, List, Optional, Sequence
Expand Down Expand Up @@ -216,7 +217,8 @@ def __init__(
self._log_records = [None] * self._max_export_batch_size
self._worker_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def _at_fork_reinit(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def __init__(
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)

os.register_at_fork(
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda
)
elif self._export_interval_millis <= 0:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
import threading
import typing
import weakref
from enum import Enum
from os import environ, linesep
from time import time_ns
Expand Down Expand Up @@ -200,7 +201,8 @@ def __init__(
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
self._pid = os.getpid()

def on_start(
Expand Down
19 changes: 19 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

# pylint: disable=protected-access
import gc
import logging
import multiprocessing
import os
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -619,6 +621,23 @@ def _target():

log_record_processor.shutdown()

def test_batch_log_record_processor_gc(self):
# Given a BatchLogRecordProcessor
exporter = InMemoryLogExporter()
processor = BatchLogRecordProcessor(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When the processor is garbage collected
del processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The BatchLogRecordProcessor object created by this test wasn't garbage collected",
)


class TestConsoleLogExporter(unittest.TestCase):
def test_export(self): # pylint: disable=no-self-use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

# pylint: disable=protected-access,invalid-name,no-self-use

import gc
import math
import weakref
from logging import WARNING
from time import sleep, time_ns
from typing import Optional, Sequence
Expand Down Expand Up @@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self):
sleep(0.1)
self.assertTrue(pmr._daemon_thread.is_alive())
pmr.shutdown()

def test_metric_exporer_gc(self):
# Given a PeriodicExportingMetricReader
exporter = FakeMetricsExporter(
preferred_aggregation={
Counter: LastValueAggregation(),
},
)
processor = PeriodicExportingMetricReader(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When we garbage collect the reader
del processor
gc.collect()

# Then the reference to the reader should no longer exist
self.assertIsNone(
weak_ref(),
"The PeriodicExportingMetricReader object created by this test wasn't garbage collected",
)
19 changes: 19 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import multiprocessing
import os
import threading
import time
import unittest
import weakref
from concurrent.futures import ThreadPoolExecutor
from logging import WARNING
from platform import python_implementation, system
Expand Down Expand Up @@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self):
max_export_batch_size=512,
)

def test_batch_span_processor_gc(self):
# Given a BatchSpanProcessor
exporter = MySpanExporter(destination=[])
processor = export.BatchSpanProcessor(exporter)
weak_ref = weakref.ref(processor)
processor.shutdown()

# When the processor is garbage collected
del processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The BatchSpanProcessor object created by this test wasn't garbage collected",
)


class TestConsoleSpanExporter(unittest.TestCase):
def test_export(self): # pylint: disable=no-self-use
Expand Down