Skip to content

Commit a404705

Browse files
committed
fixes
1 parent f3ed3f3 commit a404705

5 files changed

Lines changed: 86 additions & 85 deletions

File tree

docs/examples/exemplars/README.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
OpenTelemetry Exemplars Example
22
===============================
33

4+
.. _Exemplars:
5+
46
Exemplars are example measurements for aggregations. While they are simple conceptually, exemplars can estimate any statistic about the input distribution, can provide links to sample traces for high latency requests, and much more.
57
For more information about exemplars and how they work in OpenTelemetry, see the `spec <https://github.com/open-telemetry/oteps/pull/113>`_
68

@@ -24,7 +26,7 @@ Statistical exemplars
2426
The opentelemetry SDK provides a way to sample exemplars statistically:
2527

2628
- Exemplars will be picked to represent the input distribution, without unquantifiable bias
27-
- A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents
29+
- A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents (for randomly sampled exemplars, this value will be N (total measurements) / num_samples. For histogram exemplars, this value will be specific to each bucket).
2830

2931
See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?)
3032

docs/examples/exemplars/statistical_exemplars.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
" random.seed(1)\n",
123123
"\n",
124124
" # customer 123 is a big user, and made 1000 requests in this timeframe\n",
125-
" requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n",
125+
" requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, standard deviation 250\n",
126126
"\n",
127127
" for request in requests:\n",
128128
" bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n",
@@ -205,7 +205,7 @@
205205
" customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n",
206206
"\n",
207207
"\n",
208-
"customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n",
208+
"customer_bytes_list = sorted(customer_bytes_map.items(), key=lambda t: t[1], reverse=True)\n",
209209
"\n",
210210
"# Save our top 5 customers and sum all of the rest into \"Others\".\n",
211211
"top_3_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:3]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[3:]]))]\n",

docs/examples/exemplars/statistical_exemplars.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ def unknown_customer_calls():
6161

6262
# customer 123 is a big user, and made 1000 requests in this timeframe
6363
requests = np.random.normal(
64-
1000, 250, 1000
65-
) # 1000 requests with average 1000 bytes, covariance 100
64+
1000, 100, 1000
65+
) # 1000 requests with average 1000 bytes, standard deviation 100
6666

6767
for request in requests:
6868
bytes_counter.add(
@@ -123,7 +123,7 @@ def unknown_customer_calls():
123123

124124

125125
customer_bytes_list = sorted(
126-
list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True
126+
customer_bytes_map.items(), key=lambda t: t[1], reverse=True
127127
)
128128

129129
# Save our top 5 customers and sum all of the rest into "Others".
@@ -146,7 +146,6 @@ def unknown_customer_calls():
146146

147147
# Since the exemplars were randomly sampled, all sample_counts will be the same
148148
sample_count = exemplars[0].sample_count
149-
print("sample count", sample_count, "custmer", customer_123_bytes)
150149
full_customer_123_bytes = sample_count * customer_123_bytes
151150

152151
# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)
@@ -160,13 +159,10 @@ def unknown_customer_calls():
160159
top_25_customers = customer_bytes_list[:25]
161160

162161
# out of those 25 customers, determine how many used grpc, and come up with a ratio
163-
percent_grpc = len(
164-
list(
165-
filter(
166-
lambda customer_value: customer_value[0][1][1] == "gRPC",
167-
top_25_customers,
168-
)
169-
)
162+
percent_grpc = sum(
163+
1
164+
for customer_value in top_25_customers
165+
if customer_value[0][1][1] == "gRPC"
170166
) / len(top_25_customers)
171167

172168
print(

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max).
2020
2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars)
2121
22-
To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the "Exemplars" example for an example):
22+
To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example):
2323
"num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars"
2424
"statistical_exemplars": If exemplars should be recorded statistically
2525
@@ -29,6 +29,7 @@
2929
import abc
3030
import itertools
3131
import random
32+
from typing import List, Optional, Tuple, Union
3233

3334
from opentelemetry.context import get_current
3435
from opentelemetry.util import time_ns
@@ -41,12 +42,12 @@ class Exemplar:
4142

4243
def __init__(
4344
self,
44-
value,
45-
timestamp,
46-
dropped_labels=None,
47-
span_id=None,
48-
trace_id=None,
49-
sample_count=None,
45+
value: Union[int, float],
46+
timestamp: int,
47+
dropped_labels: Optional[Tuple[Tuple[str, str]]] = None,
48+
span_id: Optional[bytes] = None,
49+
trace_id: Optional[bytes] = None,
50+
sample_count: Optional[float] = None,
5051
):
5152
self._value = value
5253
self._timestamp = timestamp
@@ -94,22 +95,22 @@ def sample_count(self):
9495
"""For statistical exemplars, how many measurements a single exemplar represents"""
9596
return self._sample_count
9697

97-
def set_sample_count(self, count):
98+
def set_sample_count(self, count: float):
9899
self._sample_count = count
99100

100101

101-
class ExemplarSampler:
102+
class ExemplarSampler(abc.ABC):
102103
"""
103-
Abstract class to sample exemplars through a stream of incoming measurements
104+
Abstract class to sample `k` exemplars in some way through a stream of incoming measurements
104105
"""
105106

106-
def __init__(self, k, statistical=False):
107+
def __init__(self, k: int, statistical: bool = False):
107108
self._k = k
108109
self._statistical = statistical
109-
self._sample_set = list()
110+
self._sample_set = []
110111

111112
@abc.abstractmethod
112-
def sample(self, exemplar, **kwargs):
113+
def sample(self, exemplar: Exemplar, **kwargs):
113114
"""
114115
Given an exemplar, determine if it should be sampled or not
115116
"""
@@ -122,7 +123,7 @@ def sample_set(self):
122123
"""
123124

124125
@abc.abstractmethod
125-
def merge(self, set1, set2):
126+
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
126127
"""
127128
Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler
128129
"""
@@ -139,33 +140,35 @@ class RandomExemplarSampler(ExemplarSampler):
139140
Randomly sample a set of k exemplars from a stream. Each measurement in the stream
140141
will have an equal chance of being sampled.
141142
142-
If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records.
143+
If `RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records.
143144
This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value.
144145
"""
145146

146-
def __init__(self, k, statistical=False):
147+
def __init__(self, k: int, statistical: bool = False):
147148
super().__init__(k, statistical=statistical)
148149
self.rand_count = 0
149150

150-
def sample(self, exemplar, **kwargs):
151+
def sample(self, exemplar: Exemplar, **kwargs):
151152
self.rand_count += 1
152153

153-
if len(self.sample_set) < self._k:
154-
self.sample_set.append(exemplar)
154+
if len(self._sample_set) < self._k:
155+
self._sample_set.append(exemplar)
155156
return
156157

157158
# We sample a random subset of a stream using "Algorithm R":
158159
# https://en.wikipedia.org/wiki/Reservoir_sampling#Simple_algorithm
159160
replace_index = random.randint(0, self.rand_count - 1)
160161

161162
if replace_index < self._k:
162-
self.sample_set[replace_index] = exemplar
163+
self._sample_set[replace_index] = exemplar
163164

164-
def merge(self, set1, set2):
165-
combined = set1 + set2
166-
if len(combined) <= self._k:
167-
return combined
168-
return random.sample(combined, self._k)
165+
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
166+
"""
167+
Assume that set2 is the latest set of exemplars.
168+
For simplicity, we will just keep set2 and assume set1 has already been exported.
169+
This may need to change with a different SDK implementation.
170+
"""
171+
return set2
169172

170173
@property
171174
def sample_set(self):
@@ -186,12 +189,12 @@ class MinMaxExemplarSampler(ExemplarSampler):
186189
Sample the minimum and maximum measurements recorded only
187190
"""
188191

189-
def __init__(self, k, statistical=False):
192+
def __init__(self, k: int, statistical: bool = False):
190193
# K will always be 2 (min and max), and selecting min and max can never be statistical
191194
super().__init__(2, statistical=False)
192195
self._sample_set = []
193196

194-
def sample(self, exemplar, **kwargs):
197+
def sample(self, exemplar: Exemplar, **kwargs):
195198
self._sample_set = [
196199
min(
197200
self._sample_set + [exemplar],
@@ -209,12 +212,13 @@ def sample(self, exemplar, **kwargs):
209212
def sample_set(self):
210213
return self._sample_set
211214

212-
def merge(self, set1, set2):
213-
merged_set = set1 + set2
214-
if len(merged_set) <= 2:
215-
return sorted(merged_set, key=lambda exemplar: exemplar.value)
216-
217-
return [min(merged_set), max(merged_set)]
215+
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
216+
"""
217+
Assume that set2 is the latest set of exemplars.
218+
For simplicity, we will just keep set2 and assume set1 has already been exported.
219+
This may need to change with a different SDK implementation.
220+
"""
221+
return set2
218222

219223
def reset(self):
220224
self._sample_set = []
@@ -228,15 +232,17 @@ class BucketedExemplarSampler(ExemplarSampler):
228232
This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents.
229233
"""
230234

231-
def __init__(self, k, statistical=False, boundaries=None):
235+
def __init__(
236+
self, k: int, statistical: bool = False, boundaries: list = None
237+
):
232238
super().__init__(k)
233239
self._boundaries = boundaries
234240
self._sample_set = [
235241
RandomExemplarSampler(k, statistical=statistical)
236242
for _ in range(len(self._boundaries) + 1)
237243
]
238244

239-
def sample(self, exemplar, **kwargs):
245+
def sample(self, exemplar: Exemplar, **kwargs):
240246
bucket_index = kwargs.get("bucket_index")
241247
if bucket_index is None:
242248
return
@@ -251,25 +257,13 @@ def sample_set(self):
251257
)
252258
)
253259

254-
def merge(self, set1, set2):
255-
exemplar_set = [list() for _ in range(len(self._boundaries) + 1)]
256-
# Sort both sets back into buckets
257-
for setx in [set1, set2]:
258-
bucket_idx = 0
259-
for exemplar in setx:
260-
if exemplar.value >= self._boundaries[-1]:
261-
exemplar_set[-1].append(exemplar)
262-
continue
263-
264-
while exemplar.value >= self._boundaries[bucket_idx]:
265-
bucket_idx += 1
266-
exemplar_set[bucket_idx].append(exemplar)
267-
268-
# Pick only k exemplars for every bucket
269-
for index, inner_set in enumerate(exemplar_set):
270-
if len(inner_set) > self._k:
271-
exemplar_set[index] = random.sample(inner_set, self._k)
272-
return list(itertools.chain.from_iterable(exemplar_set))
260+
def merge(self, set1: List[Exemplar], set2: List[Exemplar]):
261+
"""
262+
Assume that set2 is the latest set of exemplars.
263+
For simplicity, we will just keep set2 and assume set1 has already been exported.
264+
This may need to change with a different SDK implementation.
265+
"""
266+
return set2
273267

274268
def reset(self):
275269
for sampler in self._sample_set:
@@ -285,9 +279,9 @@ class ExemplarManager:
285279

286280
def __init__(
287281
self,
288-
config,
289-
default_exemplar_sampler,
290-
statistical_exemplar_sampler,
282+
config: dict,
283+
default_exemplar_sampler: ExemplarSampler,
284+
statistical_exemplar_sampler: ExemplarSampler,
291285
**kwargs
292286
):
293287
if config:
@@ -311,7 +305,12 @@ def __init__(
311305
else:
312306
self.record_exemplars = False
313307

314-
def sample(self, value, dropped_labels, **kwargs):
308+
def sample(
309+
self,
310+
value: Union[int, float],
311+
dropped_labels: Tuple[Tuple[str, str]],
312+
**kwargs
313+
):
315314
context = get_current()
316315

317316
is_sampled = (
@@ -343,7 +342,11 @@ def take_checkpoint(self):
343342
return ret
344343
return []
345344

346-
def merge(self, checkpoint_exemplars, other_checkpoint_exemplars):
345+
def merge(
346+
self,
347+
checkpoint_exemplars: List[Exemplar],
348+
other_checkpoint_exemplars: List[Exemplar],
349+
):
347350
if self.record_exemplars:
348351
return self.exemplar_sampler.merge(
349352
checkpoint_exemplars, other_checkpoint_exemplars

opentelemetry-sdk/tests/metrics/export/test_exemplars.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ def test_merge(self):
9797
set1 = [1, 2, 3]
9898
set2 = [4, 5, 6]
9999
sampler = RandomExemplarSampler(6)
100-
self.assertEqual(set1 + set2, sampler.merge(set1, set2))
100+
self.assertEqual(set2, sampler.merge(set1, set2))
101101
sampler = RandomExemplarSampler(8)
102-
self.assertEqual(set1 + set2, sampler.merge(set1, set2))
102+
self.assertEqual(set2, sampler.merge(set1, set2))
103103
sampler = RandomExemplarSampler(4)
104-
self.assertEqual(4, len(sampler.merge(set1, set2)))
104+
self.assertEqual(3, len(sampler.merge(set1, set2)))
105105

106106

107107
class TestMinMaxExemplarSampler(unittest.TestCase):
@@ -140,10 +140,10 @@ def test_reset(self):
140140
self.assertEqual(len(sampler.sample_set), 1)
141141

142142
def test_merge(self):
143-
set1 = [1, 2, 3]
144-
set2 = [4, 5, 6]
143+
set1 = [1, 3]
144+
set2 = [4, 6]
145145
sampler = MinMaxExemplarSampler(2)
146-
self.assertEqual([1, 6], sampler.merge(set1, set2))
146+
self.assertEqual([4, 6], sampler.merge(set1, set2))
147147

148148

149149
class TestBucketedExemplarSampler(unittest.TestCase):
@@ -195,7 +195,7 @@ def test_merge(self):
195195
[Exemplar(2, time())],
196196
)
197197
),
198-
2,
198+
1,
199199
)
200200

201201

@@ -339,7 +339,7 @@ def _merge_aggregators_test(self, aggregator):
339339

340340
agg1.merge(agg2)
341341

342-
self.assertEqual(len(agg1.checkpoint_exemplars), 2)
342+
self.assertEqual(len(agg1.checkpoint_exemplars), 1)
343343

344344
def test_sum_aggregator(self):
345345
self._no_exemplars_test(SumAggregator)
@@ -495,8 +495,8 @@ def test_histogram(self):
495495
# Since this is using the HistogramAggregator, the bucket counts will be reflected
496496
# with each record
497497
requests_size.record(25, {"environment": "staging", "test": "value"})
498-
requests_size.record(1, {"environment": "staging", "test": "value2"})
499-
requests_size.record(200, {"environment": "staging", "test": "value3"})
498+
requests_size.record(1, {"environment": "staging", "test": "value"})
499+
requests_size.record(200, {"environment": "staging", "test": "value"})
500500

501501
controller.tick()
502502
metrics_list = exporter.get_exported_metrics()
@@ -509,8 +509,8 @@ def test_histogram(self):
509509
for exemplar in exemplars
510510
],
511511
[
512-
(1, (("test", "value2"),)),
512+
(1, (("test", "value"),)),
513513
(25, (("test", "value"),)),
514-
(200, (("test", "value3"),)),
514+
(200, (("test", "value"),)),
515515
],
516516
)

0 commit comments

Comments
 (0)