forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_threads.py
More file actions
64 lines (52 loc) · 2.37 KB
/
test_threads.py
File metadata and controls
64 lines (52 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ( # pylint: disable=no-name-in-module
ThreadPoolExecutor,
)
# pylint: disable=import-error
from ..otel_ot_shim_tracer import MockTracer # noqa: TID252
from ..testcase import OpenTelemetryTestCase # noqa: TID252
class TestThreads(OpenTelemetryTestCase):
def setUp(self): # pylint: disable=invalid-name
self.tracer = MockTracer()
# use max_workers=3 as a general example even if only one would suffice
self.executor = ThreadPoolExecutor(max_workers=3)
def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span("initial")
self.submit_another_task(span)
self.executor.shutdown(True)
spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ["initial", "subtask", "task"])
# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(spans[1], spans[2])
self.assertIsChildOf(spans[1], spans[2])
# initial task is not related in any way to those two tasks
self.assertNotSameTrace(spans[0], spans[1])
self.assertEqual(spans[0].parent, None)
self.assertEqual(spans[2].parent, None)
def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span("task"):
with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass
# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass
def submit_another_task(self, span):
self.executor.submit(self.task, span)