Skip to content

Commit 76e7755

Browse files
committed
Use Lazy<string[]> snapshots with span-based value factories for read-mostly subscriber store
1 parent cde785f commit 76e7755

1 file changed

Lines changed: 59 additions & 50 deletions

File tree

src/NServiceBus.InMemory/Transport/Internal/InMemoryBroker.cs

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,42 +24,67 @@ public InMemoryBroker(InMemoryBrokerOptions? options = null)
2424
public void Subscribe(string publisherAddress, string topic) =>
2525
subscriptions.AddOrUpdate(
2626
topic,
27-
_ => [publisherAddress],
28-
(_, list) =>
27+
static (_, address) => new Lazy<string[]>([address]),
28+
static (_, currentLazy, address) =>
2929
{
30-
lock (list)
30+
if (currentLazy.Value.AsSpan().IndexOf(address) >= 0)
3131
{
32-
if (!list.Contains(publisherAddress))
32+
return currentLazy;
33+
}
34+
35+
return new Lazy<string[]>(() =>
36+
{
37+
var current = currentLazy.Value;
38+
var currentSpan = current.AsSpan();
39+
if (currentSpan.IndexOf(address) >= 0)
3340
{
34-
list.Add(publisherAddress);
41+
return current;
3542
}
36-
}
37-
return list;
38-
});
3943

40-
public void Unsubscribe(string publisherAddress, string topic)
41-
{
42-
if (subscriptions.TryGetValue(topic, out var list))
43-
{
44-
lock (list)
44+
var next = new string[current.Length + 1];
45+
currentSpan.CopyTo(next);
46+
next[^1] = address;
47+
return next;
48+
});
49+
},
50+
publisherAddress);
51+
52+
public void Unsubscribe(string publisherAddress, string topic) =>
53+
subscriptions.AddOrUpdate(
54+
topic,
55+
static (_, _) => new Lazy<string[]>([]),
56+
static (_, currentLazy, address) =>
4557
{
46-
list.Remove(publisherAddress);
47-
}
48-
}
49-
}
58+
if (currentLazy.Value.AsSpan().IndexOf(address) < 0)
59+
{
60+
return currentLazy;
61+
}
5062

51-
public IReadOnlyList<string> GetSubscribers(string topic)
52-
{
53-
if (!subscriptions.TryGetValue(topic, out var list))
54-
{
55-
return [];
56-
}
63+
return new Lazy<string[]>(() =>
64+
{
65+
var current = currentLazy.Value;
66+
var currentSpan = current.AsSpan();
67+
var index = currentSpan.IndexOf(address);
68+
if (index < 0)
69+
{
70+
return current;
71+
}
5772

58-
lock (list)
59-
{
60-
return [.. list];
61-
}
62-
}
73+
if (current.Length == 1)
74+
{
75+
return [];
76+
}
77+
78+
var next = new string[current.Length - 1];
79+
var nextSpan = next.AsSpan();
80+
currentSpan[..index].CopyTo(nextSpan[..index]);
81+
currentSpan[(index + 1)..].CopyTo(nextSpan[index..]);
82+
return next;
83+
});
84+
},
85+
publisherAddress);
86+
87+
public IReadOnlyList<string> GetSubscribers(string topic) => subscriptions.TryGetValue(topic, out var lazy) ? lazy.Value : [];
6388

6489
public long GetNextSequenceNumber() => Interlocked.Increment(ref sequenceNumber);
6590

@@ -80,25 +105,9 @@ public bool TryDequeueDelayed(DateTimeOffset now, [NotNullWhen(true)] out Broker
80105
}
81106
}
82107

83-
internal Task SimulateSendAsync(string destination, CancellationToken cancellationToken = default)
84-
{
85-
if (!HasSimulationFor(InMemorySimulationOperation.Send, destination))
86-
{
87-
return Task.CompletedTask;
88-
}
89-
90-
return ApplySimulationAsync(InMemorySimulationOperation.Send, destination, cancellationToken);
91-
}
108+
internal Task SimulateSendAsync(string destination, CancellationToken cancellationToken = default) => !HasSimulationFor(InMemorySimulationOperation.Send, destination) ? Task.CompletedTask : ApplySimulationAsync(InMemorySimulationOperation.Send, destination, cancellationToken);
92109

93-
internal Task SimulateReceiveAsync(string destination, CancellationToken cancellationToken = default)
94-
{
95-
if (!HasSimulationFor(InMemorySimulationOperation.Receive, destination))
96-
{
97-
return Task.CompletedTask;
98-
}
99-
100-
return ApplySimulationAsync(InMemorySimulationOperation.Receive, destination, cancellationToken);
101-
}
110+
internal Task SimulateReceiveAsync(string destination, CancellationToken cancellationToken = default) => !HasSimulationFor(InMemorySimulationOperation.Receive, destination) ? Task.CompletedTask : ApplySimulationAsync(InMemorySimulationOperation.Receive, destination, cancellationToken);
102111

103112
bool HasSimulationFor(InMemorySimulationOperation operation, string queue)
104113
{
@@ -216,12 +225,12 @@ async Task ApplyCustomRateLimiterAsync(InMemorySimulationOperation operation, st
216225
var factory = resolved.RateLimiterFactory;
217226
var limiter = resolved.RateLimiter ?? customLimiters.GetOrAdd(
218227
(operation, queue),
219-
static (key, state) => state.RateLimiterFactory(state.TimeProvider),
228+
static (_, state) => state.RateLimiterFactory(state.TimeProvider),
220229
(RateLimiterFactory: factory!, resolved.TimeProvider));
221230

222231
if (resolved.Mode == InMemorySimulationMode.Reject)
223232
{
224-
using var lease = limiter.AttemptAcquire(1);
233+
using var lease = limiter.AttemptAcquire();
225234
if (lease.IsAcquired)
226235
{
227236
return;
@@ -388,7 +397,7 @@ bool TryAcquirePermit(InMemorySimulationOperation operation, string queue, InMem
388397
return deliverAt - now;
389398
}
390399

391-
bool TryDequeueDelayedCore(DateTimeOffset now, [System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out BrokerEnvelope? envelope)
400+
bool TryDequeueDelayedCore(DateTimeOffset now, [NotNullWhen(true)] out BrokerEnvelope? envelope)
392401
{
393402
if (delayedMessages.Count == 0)
394403
{
@@ -456,7 +465,7 @@ public Task StartPump(CancellationToken cancellationToken = default)
456465
}
457466

458467
readonly ConcurrentDictionary<string, InMemoryChannel> queues = new();
459-
readonly ConcurrentDictionary<string, List<string>> subscriptions = new();
468+
readonly ConcurrentDictionary<string, Lazy<string[]>> subscriptions = new();
460469
readonly PriorityQueue<BrokerEnvelope, (DateTimeOffset DeliverAt, long SequenceNumber)> delayedMessages = new();
461470
readonly Lock delayedMessagesLock = new();
462471
long sequenceNumber;

0 commit comments

Comments
 (0)