Skip to content

Commit da333f8

Browse files
authored
[Metrics SDK] Performance improvement in measurement processing (#1993)
1 parent 649829f commit da333f8

12 files changed

Lines changed: 300 additions & 65 deletions

sdk/include/opentelemetry/sdk/common/attributemap_hash.h

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
#pragma once
55

6-
#include <iostream>
76
#include <string>
87
#include "opentelemetry/sdk/common/attribute_utils.h"
98

@@ -14,7 +13,7 @@ namespace common
1413
{
1514

1615
template <class T>
17-
inline void GetHashForAttributeValue(size_t &seed, const T arg)
16+
inline void GetHash(size_t &seed, const T &arg)
1817
{
1918
std::hash<T> hasher;
2019
// reference -
@@ -23,11 +22,11 @@ inline void GetHashForAttributeValue(size_t &seed, const T arg)
2322
}
2423

2524
template <class T>
26-
inline void GetHashForAttributeValue(size_t &seed, const std::vector<T> &arg)
25+
inline void GetHash(size_t &seed, const std::vector<T> &arg)
2726
{
2827
for (auto v : arg)
2928
{
30-
GetHashForAttributeValue<T>(seed, v);
29+
GetHash<T>(seed, v);
3130
}
3231
}
3332

@@ -37,7 +36,7 @@ struct GetHashForAttributeValueVisitor
3736
template <class T>
3837
void operator()(T &v)
3938
{
40-
GetHashForAttributeValue(seed_, v);
39+
GetHash(seed_, v);
4140
}
4241
size_t &seed_;
4342
};
@@ -48,15 +47,40 @@ inline size_t GetHashForAttributeMap(const OrderedAttributeMap &attribute_map)
4847
size_t seed = 0UL;
4948
for (auto &kv : attribute_map)
5049
{
51-
std::hash<std::string> hasher;
52-
// reference -
53-
// https://www.boost.org/doc/libs/1_37_0/doc/html/hash/reference.html#boost.hash_combine
54-
seed ^= hasher(kv.first) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
50+
GetHash(seed, kv.first);
5551
nostd::visit(GetHashForAttributeValueVisitor(seed), kv.second);
5652
}
5753
return seed;
5854
}
5955

56+
// Calculate hash of keys and values of KeyValueIterable, filtered using callback.
57+
inline size_t GetHashForAttributeMap(
58+
const opentelemetry::common::KeyValueIterable &attributes,
59+
nostd::function_ref<bool(nostd::string_view)> is_key_present_callback)
60+
{
61+
AttributeConverter converter;
62+
size_t seed = 0UL;
63+
attributes.ForEachKeyValue(
64+
[&](nostd::string_view key, opentelemetry::common::AttributeValue value) noexcept {
65+
if (!is_key_present_callback(key))
66+
{
67+
return true;
68+
}
69+
GetHash(seed, key.data());
70+
auto attr_val = nostd::visit(converter, value);
71+
nostd::visit(GetHashForAttributeValueVisitor(seed), attr_val);
72+
return true;
73+
});
74+
return seed;
75+
}
76+
77+
template <class T>
78+
inline size_t GetHash(T value)
79+
{
80+
std::hash<T> hasher;
81+
return hasher(value);
82+
}
83+
6084
} // namespace common
6185
} // namespace sdk
6286
OPENTELEMETRY_END_NAMESPACE

sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,24 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
4747
{
4848
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
4949
aggr->Aggregate(measurement.second);
50-
auto prev = cumulative_hash_map_->Get(measurement.first);
50+
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
51+
auto prev = cumulative_hash_map_->Get(hash);
5152
if (prev)
5253
{
5354
auto delta = prev->Diff(*aggr);
5455
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
5556
// storage)
56-
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
57-
delta_hash_map_->Set(measurement.first, std::move(delta));
57+
cumulative_hash_map_->Set(measurement.first, std::move(aggr), hash);
58+
delta_hash_map_->Set(measurement.first, std::move(delta), hash);
5859
}
5960
else
6061
{
6162
// store received value in cumulative and delta map.
6263
cumulative_hash_map_->Set(
6364
measurement.first,
64-
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
65-
delta_hash_map_->Set(measurement.first, std::move(aggr));
65+
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr),
66+
hash);
67+
delta_hash_map_->Set(measurement.first, std::move(aggr), hash);
6668
}
6769
}
6870
}

sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "opentelemetry/sdk/common/attributemap_hash.h"
99
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
1010
#include "opentelemetry/sdk/metrics/instruments.h"
11+
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
1112
#include "opentelemetry/version.h"
1213

1314
#include <functional>
@@ -19,6 +20,7 @@ namespace sdk
1920
{
2021
namespace metrics
2122
{
23+
2224
using opentelemetry::sdk::common::OrderedAttributeMap;
2325

2426
class AttributeHashGenerator
@@ -33,12 +35,12 @@ class AttributeHashGenerator
3335
class AttributesHashMap
3436
{
3537
public:
36-
Aggregation *Get(const MetricAttributes &attributes) const
38+
Aggregation *Get(size_t hash) const
3739
{
38-
auto it = hash_map_.find(attributes);
40+
auto it = hash_map_.find(hash);
3941
if (it != hash_map_.end())
4042
{
41-
return it->second.get();
43+
return it->second.second.get();
4244
}
4345
return nullptr;
4446
}
@@ -47,35 +49,89 @@ class AttributesHashMap
4749
* @return check if key is present in hash
4850
*
4951
*/
50-
bool Has(const MetricAttributes &attributes) const
51-
{
52-
return (hash_map_.find(attributes) == hash_map_.end()) ? false : true;
53-
}
52+
bool Has(size_t hash) const { return hash_map_.find(hash) != hash_map_.end(); }
5453

5554
/**
5655
* @return the pointer to value for given key if present.
5756
* If not present, it uses the provided callback to generate
5857
* value and store in the hash
5958
*/
59+
Aggregation *GetOrSetDefault(const opentelemetry::common::KeyValueIterable &attributes,
60+
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
61+
size_t hash)
62+
{
63+
auto it = hash_map_.find(hash);
64+
if (it != hash_map_.end())
65+
{
66+
return it->second.second.get();
67+
}
68+
69+
MetricAttributes attr{attributes};
70+
71+
hash_map_[hash] = {attr, aggregation_callback()};
72+
return hash_map_[hash].second.get();
73+
}
74+
75+
Aggregation *GetOrSetDefault(std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
76+
size_t hash)
77+
{
78+
auto it = hash_map_.find(hash);
79+
if (it != hash_map_.end())
80+
{
81+
return it->second.second.get();
82+
}
83+
MetricAttributes attr{};
84+
hash_map_[hash] = {attr, aggregation_callback()};
85+
return hash_map_[hash].second.get();
86+
}
87+
6088
Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
61-
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
89+
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
90+
size_t hash)
6291
{
63-
auto it = hash_map_.find(attributes);
92+
auto it = hash_map_.find(hash);
6493
if (it != hash_map_.end())
6594
{
66-
return it->second.get();
95+
return it->second.second.get();
6796
}
6897

69-
hash_map_[attributes] = aggregation_callback();
70-
return hash_map_[attributes].get();
98+
MetricAttributes attr{attributes};
99+
100+
hash_map_[hash] = {attr, aggregation_callback()};
101+
return hash_map_[hash].second.get();
71102
}
72103

73104
/**
74105
* Set the value for given key, overwriting the value if already present
75106
*/
76-
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> value)
107+
void Set(const opentelemetry::common::KeyValueIterable &attributes,
108+
std::unique_ptr<Aggregation> aggr,
109+
size_t hash)
77110
{
78-
hash_map_[attributes] = std::move(value);
111+
auto it = hash_map_.find(hash);
112+
if (it != hash_map_.end())
113+
{
114+
it->second.second = std::move(aggr);
115+
}
116+
else
117+
{
118+
MetricAttributes attr{attributes};
119+
hash_map_[hash] = {attr, std::move(aggr)};
120+
}
121+
}
122+
123+
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr, size_t hash)
124+
{
125+
auto it = hash_map_.find(hash);
126+
if (it != hash_map_.end())
127+
{
128+
it->second.second = std::move(aggr);
129+
}
130+
else
131+
{
132+
MetricAttributes attr{attributes};
133+
hash_map_[hash] = {attr, std::move(aggr)};
134+
}
79135
}
80136

81137
/**
@@ -86,7 +142,7 @@ class AttributesHashMap
86142
{
87143
for (auto &kv : hash_map_)
88144
{
89-
if (!callback(kv.first, *(kv.second.get())))
145+
if (!callback(kv.second.first, *(kv.second.second.get())))
90146
{
91147
return false; // callback is not prepared to consume data
92148
}
@@ -100,8 +156,7 @@ class AttributesHashMap
100156
size_t Size() { return hash_map_.size(); }
101157

102158
private:
103-
std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, AttributeHashGenerator>
104-
hash_map_;
159+
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
105160
};
106161
} // namespace metrics
107162

sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
3535
const AggregationConfig *aggregation_config)
3636
: instrument_descriptor_(instrument_descriptor),
3737
attributes_hashmap_(new AttributesHashMap()),
38-
attributes_processor_{attributes_processor},
38+
attributes_processor_(attributes_processor),
3939
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
4040
exemplar_reservoir_(exemplar_reservoir),
4141
#endif
4242
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)
43-
4443
{
4544
create_default_aggregation_ = [&, aggregation_type,
4645
aggregation_config]() -> std::unique_ptr<Aggregation> {
@@ -60,8 +59,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
6059
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
6160
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
6261
#endif
62+
static size_t hash = opentelemetry::sdk::common::GetHash("");
6363
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
64-
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
64+
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
6565
}
6666

6767
void RecordLong(int64_t value,
@@ -77,9 +77,21 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
7777
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
7878
std::chrono::system_clock::now());
7979
#endif
80-
auto attr = attributes_processor_->process(attributes);
80+
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
81+
attributes, [this](nostd::string_view key) {
82+
if (attributes_processor_)
83+
{
84+
return attributes_processor_->isPresent(key);
85+
}
86+
else
87+
{
88+
return true;
89+
}
90+
});
91+
8192
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
82-
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
93+
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
94+
->Aggregate(value);
8395
}
8496

8597
void RecordDouble(double value,
@@ -93,8 +105,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
93105
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
94106
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
95107
#endif
108+
static size_t hash = opentelemetry::sdk::common::GetHash("");
96109
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
97-
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
110+
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
98111
}
99112

100113
void RecordDouble(double value,
@@ -114,9 +127,20 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
114127
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
115128
std::chrono::system_clock::now());
116129
#endif
117-
auto attr = attributes_processor_->process(attributes);
130+
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
131+
attributes, [this](nostd::string_view key) {
132+
if (attributes_processor_)
133+
{
134+
return attributes_processor_->isPresent(key);
135+
}
136+
else
137+
{
138+
return true;
139+
}
140+
});
118141
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
119-
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
142+
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
143+
->Aggregate(value);
120144
}
121145

122146
bool Collect(CollectorHandle *collector,
@@ -127,16 +151,15 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
127151

128152
private:
129153
InstrumentDescriptor instrument_descriptor_;
130-
131154
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
132155
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
133156
// unreported metrics stash for all the collectors
134157
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
135158
unreported_metrics_;
136159
// last reported metrics stash for all the collectors.
137160
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
138-
const AttributesProcessor *attributes_processor_;
139161
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
162+
const AttributesProcessor *attributes_processor_;
140163
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
141164
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
142165
#endif

0 commit comments

Comments
 (0)