Skip to content

Commit c724f4b

Browse files
Kielekrajkumar-rangarajmartincostello
authored
[Exporter.Zipkin] Harden memory usage for endpoint caching and array tag serialization (#7081)
Co-authored-by: Rajkumar Rangaraj <rajrang@microsoft.com> Co-authored-by: Martin Costello <martin@martincostello.com>
1 parent 3d1580d commit c724f4b

File tree

8 files changed

+280
-6
lines changed

8 files changed

+280
-6
lines changed

src/OpenTelemetry.Exporter.Zipkin/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ Notes](../../RELEASENOTES.md).
66

77
## Unreleased
88

9+
* Harden memory usage for endpoint caching and array tag serialization.
10+
([#7081](https://github.com/open-telemetry/opentelemetry-dotnet/pull/7081))
11+
912
## 1.15.2
1013

1114
Released 2026-Apr-08

src/OpenTelemetry.Exporter.Zipkin/Implementation/ZipkinActivityConversionExtensions.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
using System.Collections.Concurrent;
54
using System.Diagnostics;
65
using OpenTelemetry.Internal;
76
using OpenTelemetry.Trace;
@@ -11,11 +10,12 @@ namespace OpenTelemetry.Exporter.Zipkin.Implementation;
1110
internal static class ZipkinActivityConversionExtensions
1211
{
1312
internal const string ZipkinErrorFlagTagName = "error";
13+
internal const int MaxRemoteEndpointCacheSize = 1024;
1414
private const long TicksPerMicrosecond = TimeSpan.TicksPerMillisecond / 1000;
1515
private const long UnixEpochTicks = 621355968000000000L; // = DateTimeOffset.FromUnixTimeMilliseconds(0).Ticks
1616
private const long UnixEpochMicroseconds = UnixEpochTicks / TicksPerMicrosecond;
1717

18-
private static readonly ConcurrentDictionary<(string, int), ZipkinEndpoint> RemoteEndpointCache = new();
18+
private static readonly ZipkinEndpointLruCache RemoteEndpointCache = new(MaxRemoteEndpointCacheSize);
1919

2020
internal static ZipkinSpan ToZipkinSpan(this Activity activity, ZipkinEndpoint localEndpoint, bool useShortTraceIds = false)
2121
{
@@ -60,6 +60,12 @@ internal static long ToEpochMicroseconds(this DateTimeOffset dateTimeOffset)
6060
internal static long ToEpochMicroseconds(this TimeSpan timeSpan)
6161
=> timeSpan.Ticks / TicksPerMicrosecond;
6262

63+
internal static int GetRemoteEndpointCacheCount()
64+
=> RemoteEndpointCache.Count;
65+
66+
internal static void ClearRemoteEndpointCache()
67+
=> RemoteEndpointCache.Clear();
68+
6369
internal static long ToEpochMicroseconds(this DateTime utcDateTime)
6470
{
6571
// Truncate sub-microsecond precision before offsetting by the Unix Epoch to avoid
@@ -204,13 +210,12 @@ private static void ExtractActivitySource(Activity activity, ref PooledList<KeyV
204210

205211
static ZipkinEndpoint? TryCreateEndpoint(string? remoteEndpoint)
206212
{
207-
if (remoteEndpoint != null)
213+
if (remoteEndpoint == null)
208214
{
209-
var endpoint = RemoteEndpointCache.GetOrAdd((remoteEndpoint, default), ZipkinEndpoint.Create);
210-
return endpoint;
215+
return null;
211216
}
212217

213-
return null;
218+
return RemoteEndpointCache.GetOrAdd(remoteEndpoint);
214219
}
215220

216221
var remoteEndpoint = activity.GetTagItem(SemanticConventions.AttributePeerService) as string;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using OpenTelemetry.Internal;
5+
6+
namespace OpenTelemetry.Exporter.Zipkin.Implementation;
7+
8+
internal sealed class ZipkinEndpointLruCache
9+
{
10+
private readonly int capacity;
11+
private readonly Dictionary<string, LinkedListNode<CacheEntry>> cache;
12+
private readonly LinkedList<CacheEntry> lruList = new();
13+
private readonly Lock sync = new();
14+
15+
public ZipkinEndpointLruCache(int capacity)
16+
{
17+
Guard.ThrowIfOutOfRange(capacity, min: 1);
18+
19+
this.capacity = capacity;
20+
this.cache = new Dictionary<string, LinkedListNode<CacheEntry>>(StringComparer.Ordinal);
21+
}
22+
23+
public int Count
24+
{
25+
get
26+
{
27+
lock (this.sync)
28+
{
29+
return this.cache.Count;
30+
}
31+
}
32+
}
33+
34+
public ZipkinEndpoint GetOrAdd(string serviceName)
35+
{
36+
Guard.ThrowIfNullOrWhitespace(serviceName);
37+
38+
lock (this.sync)
39+
{
40+
if (this.cache.TryGetValue(serviceName, out var existingNode))
41+
{
42+
this.lruList.Remove(existingNode);
43+
this.lruList.AddFirst(existingNode);
44+
return existingNode.Value.Endpoint;
45+
}
46+
47+
var endpoint = ZipkinEndpoint.Create(serviceName);
48+
var createdNode = new LinkedListNode<CacheEntry>(new CacheEntry(serviceName, endpoint));
49+
50+
this.lruList.AddFirst(createdNode);
51+
this.cache[serviceName] = createdNode;
52+
53+
if (this.cache.Count > this.capacity)
54+
{
55+
var nodeToEvict = this.lruList.Last;
56+
if (nodeToEvict != null)
57+
{
58+
this.lruList.RemoveLast();
59+
this.cache.Remove(nodeToEvict.Value.ServiceName);
60+
}
61+
}
62+
63+
return endpoint;
64+
}
65+
}
66+
67+
public void Clear()
68+
{
69+
lock (this.sync)
70+
{
71+
this.cache.Clear();
72+
this.lruList.Clear();
73+
}
74+
}
75+
76+
private readonly record struct CacheEntry(string ServiceName, ZipkinEndpoint Endpoint)
77+
{
78+
public string ServiceName { get; } = ServiceName;
79+
80+
public ZipkinEndpoint Endpoint { get; } = Endpoint;
81+
}
82+
}

src/OpenTelemetry.Exporter.Zipkin/OpenTelemetry.Exporter.Zipkin.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<Compile Include="$(RepoRoot)\src\Shared\ActivityHelperExtensions.cs" Link="Includes\ActivityHelperExtensions.cs" />
1818
<Compile Include="$(RepoRoot)\src\Shared\EnvironmentVariables\*.cs" Link="Includes\EnvironmentVariables\%(Filename).cs" />
1919
<Compile Include="$(RepoRoot)\src\Shared\ExceptionExtensions.cs" Link="Includes\ExceptionExtensions.cs" />
20+
<Compile Include="$(RepoRoot)\src\Shared\Shims\Lock.cs" Link="Includes\Shims\Lock.cs" />
2021
<Compile Include="$(RepoRoot)\src\Shared\Options\*.cs" Link="Includes\Options\%(Filename).cs" />
2122
<Compile Include="$(RepoRoot)\src\Shared\SemanticConventions.cs" Link="Includes\SemanticConventions.cs" />
2223
<Compile Include="$(RepoRoot)\src\Shared\Shims\NullableAttributes.cs" Link="Includes\Shims\NullableAttributes.cs" />

src/Shared/TagWriter/JsonStringArrayTagWriter.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ internal readonly struct JsonArrayTagWriterState(MemoryStream stream, Utf8JsonWr
3636

3737
internal sealed class JsonArrayTagWriter : ArrayTagWriter<JsonArrayTagWriterState>
3838
{
39+
private const int MaxThreadStaticStreamCapacity = 64 * 1024;
40+
3941
[ThreadStatic]
4042
private static MemoryStream? threadStream;
4143

@@ -91,6 +93,11 @@ private static JsonArrayTagWriterState EnsureWriter()
9193
else
9294
{
9395
threadStream.SetLength(0);
96+
if (threadStream.Capacity > MaxThreadStaticStreamCapacity)
97+
{
98+
threadStream.Capacity = 0;
99+
}
100+
94101
threadWriter!.Reset(threadStream);
95102
return new(threadStream, threadWriter);
96103
}

test/OpenTelemetry.Exporter.Zipkin.Tests/Implementation/ZipkinActivityExporterRemoteEndpointTests.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using OpenTelemetry.Exporter.Zipkin.Tests;
5+
using OpenTelemetry.Trace;
56
using Xunit;
67

78
namespace OpenTelemetry.Exporter.Zipkin.Implementation.Tests;
@@ -33,4 +34,70 @@ public void GenerateActivity_RemoteEndpointResolutionPriority(RemoteEndpointPrio
3334
Assert.NotNull(zipkinSpan.RemoteEndpoint);
3435
Assert.Equal(testCase.ExpectedResult, zipkinSpan.RemoteEndpoint.ServiceName);
3536
}
37+
38+
[Fact]
39+
public void GenerateActivity_RemoteEndpointCacheIsBounded()
40+
{
41+
ZipkinActivityConversionExtensions.ClearRemoteEndpointCache();
42+
43+
for (var i = 0; i < ZipkinActivityConversionExtensions.MaxRemoteEndpointCacheSize + 200; i++)
44+
{
45+
using var activity = ZipkinActivitySource.CreateTestActivity(
46+
additionalAttributes: new Dictionary<string, object>
47+
{
48+
[SemanticConventions.AttributePeerService] = $"service-{i}",
49+
});
50+
51+
var zipkinSpan = activity.ToZipkinSpan(DefaultZipkinEndpoint);
52+
Assert.Equal($"service-{i}", zipkinSpan.RemoteEndpoint?.ServiceName);
53+
}
54+
55+
Assert.Equal(
56+
ZipkinActivityConversionExtensions.MaxRemoteEndpointCacheSize,
57+
ZipkinActivityConversionExtensions.GetRemoteEndpointCacheCount());
58+
}
59+
60+
[Fact]
61+
public void GenerateActivity_RemoteEndpointCacheEvictsLeastRecentlyUsedEntry()
62+
{
63+
ZipkinActivityConversionExtensions.ClearRemoteEndpointCache();
64+
65+
ZipkinEndpoint? firstEndpoint = null;
66+
67+
for (var i = 0; i < ZipkinActivityConversionExtensions.MaxRemoteEndpointCacheSize; i++)
68+
{
69+
using var activity = ZipkinActivitySource.CreateTestActivity(
70+
additionalAttributes: new Dictionary<string, object>
71+
{
72+
[SemanticConventions.AttributePeerService] = $"service-{i}",
73+
});
74+
75+
var zipkinSpan = activity.ToZipkinSpan(DefaultZipkinEndpoint);
76+
if (i == 0)
77+
{
78+
firstEndpoint = zipkinSpan.RemoteEndpoint;
79+
}
80+
}
81+
82+
using var overflowActivity = ZipkinActivitySource.CreateTestActivity(
83+
additionalAttributes: new Dictionary<string, object>
84+
{
85+
[SemanticConventions.AttributePeerService] = "service-overflow",
86+
});
87+
_ = overflowActivity.ToZipkinSpan(DefaultZipkinEndpoint);
88+
89+
using var evictedEntryActivity = ZipkinActivitySource.CreateTestActivity(
90+
additionalAttributes: new Dictionary<string, object>
91+
{
92+
[SemanticConventions.AttributePeerService] = "service-0",
93+
});
94+
var evictedEntrySpan = evictedEntryActivity.ToZipkinSpan(DefaultZipkinEndpoint);
95+
96+
Assert.NotNull(firstEndpoint);
97+
Assert.NotNull(evictedEntrySpan.RemoteEndpoint);
98+
Assert.NotSame(firstEndpoint, evictedEntrySpan.RemoteEndpoint);
99+
Assert.Equal(
100+
ZipkinActivityConversionExtensions.MaxRemoteEndpointCacheSize,
101+
ZipkinActivityConversionExtensions.GetRemoteEndpointCacheCount());
102+
}
36103
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Collections.Concurrent;
5+
using Xunit;
6+
7+
namespace OpenTelemetry.Exporter.Zipkin.Implementation.Tests;
8+
9+
public class ZipkinEndpointLruCacheTests
10+
{
11+
[Fact]
12+
public void GetOrAdd_ReturnsSameValueForExistingKey()
13+
{
14+
var cache = new ZipkinEndpointLruCache(capacity: 2);
15+
16+
var first = cache.GetOrAdd("service-a");
17+
var second = cache.GetOrAdd("service-a");
18+
19+
Assert.Same(first, second);
20+
Assert.Equal(1, cache.Count);
21+
}
22+
23+
[Fact]
24+
public void GetOrAdd_EvictsLeastRecentlyUsedEntry()
25+
{
26+
var cache = new ZipkinEndpointLruCache(capacity: 2);
27+
28+
var first = cache.GetOrAdd("service-a");
29+
_ = cache.GetOrAdd("service-b");
30+
_ = cache.GetOrAdd("service-c");
31+
32+
var firstAfterEviction = cache.GetOrAdd("service-a");
33+
34+
Assert.NotSame(first, firstAfterEviction);
35+
Assert.Equal(2, cache.Count);
36+
}
37+
38+
[Fact]
39+
public void Clear_RemovesAllEntries()
40+
{
41+
var cache = new ZipkinEndpointLruCache(capacity: 3);
42+
43+
_ = cache.GetOrAdd("service-a");
44+
_ = cache.GetOrAdd("service-b");
45+
46+
cache.Clear();
47+
48+
Assert.Equal(0, cache.Count);
49+
}
50+
51+
[Fact]
52+
public void GetOrAdd_ForSameKeyCreatesSingleInstanceAcrossThreads()
53+
{
54+
var cache = new ZipkinEndpointLruCache(capacity: 8);
55+
56+
var createdEndpoints = new ConcurrentDictionary<ZipkinEndpoint, byte>();
57+
58+
Parallel.For(0, 500, _ =>
59+
{
60+
var endpoint = cache.GetOrAdd("shared-service");
61+
createdEndpoints.TryAdd(endpoint, 0);
62+
});
63+
64+
Assert.Single(createdEndpoints);
65+
Assert.Equal(1, cache.Count);
66+
}
67+
68+
[Fact]
69+
public void GetOrAdd_IsThreadSafeAndStaysBounded()
70+
{
71+
var cache = new ZipkinEndpointLruCache(capacity: 64);
72+
73+
Parallel.For(0, 10_000, i =>
74+
{
75+
_ = cache.GetOrAdd($"service-{i % 512}");
76+
});
77+
78+
Assert.True(cache.Count <= 64);
79+
}
80+
}

test/OpenTelemetry.Tests/Internal/JsonStringArrayTagWriterTests.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,35 @@ public void DoubleArray(double[] data, string expectedValue)
120120
public void ObjectArray(object?[] data, string expectedValue)
121121
=> VerifySerialization(data, expectedValue);
122122

123+
[Fact]
124+
public void ThreadStaticStreamCapacityIsReducedAfterLargeWrite()
125+
{
126+
var streamField = typeof(JsonStringArrayTagWriter<TestTagWriter.Tag>.JsonArrayTagWriter).GetField("threadStream", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);
127+
var writerField = typeof(JsonStringArrayTagWriter<TestTagWriter.Tag>.JsonArrayTagWriter).GetField("threadWriter", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);
128+
Assert.NotNull(streamField);
129+
Assert.NotNull(writerField);
130+
131+
var largeData = new[] { new string('x', 128 * 1024) };
132+
VerifySerialization(largeData, $"""["{largeData[0]}"]""");
133+
134+
var largeStream = (MemoryStream?)streamField.GetValue(null);
135+
var largeWriter = writerField.GetValue(null);
136+
Assert.NotNull(largeStream);
137+
Assert.NotNull(largeWriter);
138+
Assert.True(largeStream.Capacity > 64 * 1024);
139+
140+
string[] smallData = ["small"];
141+
VerifySerialization(smallData, """["small"]""");
142+
143+
var reusedStream = (MemoryStream?)streamField.GetValue(null);
144+
var reusedWriter = writerField.GetValue(null);
145+
Assert.NotNull(reusedStream);
146+
Assert.NotNull(reusedWriter);
147+
Assert.Same(largeStream, reusedStream);
148+
Assert.Same(largeWriter, reusedWriter);
149+
Assert.True(reusedStream.Capacity <= 64 * 1024);
150+
}
151+
123152
private static void VerifySerialization(Array data, string expectedValue)
124153
{
125154
TestTagWriter.Tag tag = default;

0 commit comments

Comments
 (0)