Skip to content

Commit e8ad407

Browse files
Ensure RunningEndpointInstance.DisposeAsync completes when host is disposed without StopAsync (#7806) (#7818)
* DisposeAsync should pass a bounded cancellation token to transport Shutdown * . * Update RunningEndpointInstance.cs * . * , * Update StartableEndpoint.cs * Update RunningEndpointInstance.cs * Remove shutdown token timeout since it would create akward split between dispose and shutdown. Instead cancel the token immediately on dispose. * Core acceptance test * Ignore exceptions during DisposeAsync to prevent throwing from disposal process --------- Co-authored-by: Simon Cropp <simon.cropp@gmail.com> Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent 24fbf92 commit e8ad407

5 files changed

Lines changed: 172 additions & 7 deletions

File tree

src/NServiceBus.AcceptanceTesting/Support/EndpointBehavior.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ public EndpointBehavior(IEndpointConfigurationFactory endpointBuilder, int insta
4444
return async token => await endpointLifecycle.Stop(token).ConfigureAwait(false);
4545
});
4646

47+
// For some super advanced scenarios require
48+
// disposing the endpoint, and this backdoor allows that without having to expose the lifecycle in Core
49+
collectionAdapter.AddKeyedSingleton<Func<ValueTask>>("Disposer", (provider, key) =>
50+
{
51+
var endpointLifecycle = provider.GetRequiredKeyedService<IEndpointLifecycle>(serviceKey);
52+
return async () => await endpointLifecycle.DisposeAsync().ConfigureAwait(false);
53+
});
54+
4755
return Task.FromResult(new StartableEndpointInstance(serviceKey));
4856
}, static (startableEndpoint, provider, cancellationToken) => startableEndpoint.Start(provider, cancellationToken));
4957
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
namespace NServiceBus.AcceptanceTests.Core.Stopping;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using AcceptanceTesting;
7+
using EndpointTemplates;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using NUnit.Framework;
10+
11+
public class When_disposing_without_stopping : NServiceBusAcceptanceTest
12+
{
13+
[Test]
14+
[CancelAfter(30000)]
15+
public async Task Should_initiate_immediate_handler_cancellation(CancellationToken cancellationToken = default) =>
16+
await Scenario.Define<Context>()
17+
.WithEndpoint<EndpointThatGetsRugPulled>(b =>
18+
b.ServiceResolve(static async (provider, context, token) =>
19+
{
20+
var session = provider.GetRequiredService<IMessageSession>();
21+
await session.SendLocal(new MessageThatTakesALongTime(), token);
22+
23+
await context.MessageReceived.Task.WaitAsync(token);
24+
25+
var disposer = provider.GetRequiredKeyedService<Func<ValueTask>>("Disposer");
26+
await disposer();
27+
}, true))
28+
.Run(cancellationToken);
29+
30+
public class Context : ScenarioContext
31+
{
32+
public TaskCompletionSource MessageReceived { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
33+
}
34+
35+
public class EndpointThatGetsRugPulled : EndpointConfigurationBuilder
36+
{
37+
public EndpointThatGetsRugPulled() => EndpointSetup<DefaultServer>();
38+
39+
[Handler]
40+
public class InfiniteHandler(Context testContext) : IHandleMessages<MessageThatTakesALongTime>
41+
{
42+
public async Task Handle(MessageThatTakesALongTime message, IMessageHandlerContext context)
43+
{
44+
try
45+
{
46+
testContext.MessageReceived.SetResult();
47+
await Task.Delay(Timeout.InfiniteTimeSpan, context.CancellationToken);
48+
}
49+
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
50+
{
51+
testContext.MarkAsCompleted();
52+
}
53+
}
54+
}
55+
}
56+
57+
public class MessageThatTakesALongTime : IMessage;
58+
}

src/NServiceBus.Core.Tests/Unicast/RunningEndpointInstanceTest.cs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using NUnit.Framework;
88
using Settings;
99
using Testing;
10+
using NServiceBus.Transport;
1011

1112
[TestFixture]
1213
public class RunningEndpointInstanceTest
@@ -17,10 +18,10 @@ static RunningEndpointInstance Create()
1718

1819
var testInstance = new RunningEndpointInstance(
1920
settings,
20-
null,
21+
null!,
2122
new FeatureComponent(new FeatureComponent.Settings()),
2223
new TestableMessageSession(),
23-
null,
24+
null!,
2425
new CancellationTokenSource(),
2526
NoOpAsyncDisposable.Instance,
2627
new EndpointLogSlot("RunningEndpointInstanceTest", endpointIdentifier: null));
@@ -47,6 +48,59 @@ public async Task ShouldAllowMultipleDispose()
4748
Assert.That(async () => await testInstance.DisposeAsync(), Throws.Nothing);
4849
}
4950

51+
[Test]
52+
public async Task DisposeAsync_should_pass_a_bounded_cancellation_token_to_transport_Shutdown()
53+
{
54+
var capturingTransport = new TokenCapturingTransportInfrastructure();
55+
56+
var testInstance = new RunningEndpointInstance(
57+
new SettingsHolder(),
58+
CreateEmptyReceiveComponent(),
59+
new FeatureComponent(new FeatureComponent.Settings()),
60+
new TestableMessageSession(),
61+
capturingTransport,
62+
new CancellationTokenSource(),
63+
NoOpAsyncDisposable.Instance,
64+
new EndpointLogSlot("RunningEndpointInstanceTest", endpointIdentifier: null));
65+
66+
await testInstance.DisposeAsync();
67+
68+
using (Assert.EnterMultipleScope())
69+
{
70+
Assert.That(capturingTransport.ShutdownWasCalled, Is.True,
71+
"transport.Shutdown was not called during DisposeAsync.");
72+
Assert.That(capturingTransport.ObservedToken.CanBeCanceled, Is.True,
73+
"DisposeAsync passed CancellationToken.None to StopCore (which then passes it to " +
74+
"transport.Shutdown). Disposal must be bounded by an internal cancellation token so " +
75+
"a stuck transport cannot hang shutdown indefinitely. " +
76+
"See RunningEndpointInstance.DisposeAsync — `await StopCore()` is missing a token.");
77+
}
78+
}
79+
80+
[Test]
81+
public async Task DisposeAsync_should_complete_even_when_transport_Shutdown_hangs()
82+
{
83+
var hangingTransport = new HangingTransportInfrastructure();
84+
85+
var testInstance = new RunningEndpointInstance(
86+
new SettingsHolder(),
87+
CreateEmptyReceiveComponent(),
88+
new FeatureComponent(new FeatureComponent.Settings()),
89+
new TestableMessageSession(),
90+
hangingTransport,
91+
new CancellationTokenSource(),
92+
NoOpAsyncDisposable.Instance,
93+
new EndpointLogSlot("RunningEndpointInstanceTest", endpointIdentifier: null));
94+
95+
var dispose = testInstance.DisposeAsync().AsTask();
96+
var winner = await Task.WhenAny(dispose, Task.Delay(TimeSpan.FromSeconds(5)));
97+
98+
Assert.That(winner, Is.SameAs(dispose),
99+
"DisposeAsync did not complete within 5s of a 250ms internal timeout — " +
100+
"the disposeShutdownTimeout must fire and let disposal proceed past a stuck transport.Shutdown.");
101+
Assert.DoesNotThrowAsync(() => dispose, "DisposeAsync threw an exception when transport.Shutdown hung. DisposeAsync should complete successfully even if transport.Shutdown does not.");
102+
}
103+
50104
[Test]
51105
public async Task ShouldThrowExceptionAfterInvokingStop()
52106
{
@@ -61,4 +115,32 @@ public async Task ShouldThrowExceptionAfterInvokingStop()
61115
Assert.Throws<InvalidOperationException>(() => testInstance.Subscribe(typeof(object), new SubscribeOptions()), "Invoking messaging operations on the endpoint instance after it has been triggered to stop is not supported.");
62116
Assert.Throws<InvalidOperationException>(() => testInstance.Unsubscribe(typeof(object), new UnsubscribeOptions()), "Invoking messaging operations on the endpoint instance after it has been triggered to stop is not supported.");
63117
}
118+
119+
// Empty ReceiveComponent (no receivers) so Stop is a no-op and StopCore reaches
120+
// transport.Shutdown without us having to stand up the full receive pipeline.
121+
static ReceiveComponent CreateEmptyReceiveComponent() =>
122+
new(configuration: null!, activityFactory: null!, endpointLogSlot: null!);
123+
124+
sealed class TokenCapturingTransportInfrastructure : TransportInfrastructure
125+
{
126+
public bool ShutdownWasCalled { get; private set; }
127+
public CancellationToken ObservedToken { get; private set; }
128+
129+
public override Task Shutdown(CancellationToken cancellationToken = default)
130+
{
131+
ShutdownWasCalled = true;
132+
ObservedToken = cancellationToken;
133+
return Task.CompletedTask;
134+
}
135+
136+
public override string ToTransportAddress(QueueAddress address) => address.BaseAddress;
137+
}
138+
139+
sealed class HangingTransportInfrastructure : TransportInfrastructure
140+
{
141+
public override Task Shutdown(CancellationToken cancellationToken = default) =>
142+
Task.Delay(Timeout.Infinite, cancellationToken);
143+
144+
public override string ToTransportAddress(QueueAddress address) => address.BaseAddress;
145+
}
64146
}

src/NServiceBus.Core/Receiving/ReceiveComponent.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace NServiceBus;
1717

1818
partial class ReceiveComponent
1919
{
20-
ReceiveComponent(Configuration configuration, IActivityFactory activityFactory, EndpointLogSlot endpointLogSlot)
20+
internal ReceiveComponent(Configuration configuration, IActivityFactory activityFactory, EndpointLogSlot endpointLogSlot)
2121
{
2222
this.configuration = configuration;
2323
this.activityFactory = activityFactory;

src/NServiceBus.Core/Unicast/RunningEndpointInstance.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,28 @@ public async ValueTask DisposeAsync()
9999
return;
100100
}
101101

102-
await StopCore().ConfigureAwait(false);
102+
// In case Stop was not called, we need to trigger shutdown before cleaning up resources.
103+
// Since we're already disposing, we want to bypass any waits and just trigger shutdown with a canceled token.
104+
// We are effectively indicating the graceful shutdown period has already elapsed and any ongoing operations should
105+
// be aborted immediately if they participate in the cooperative cancellation.
106+
var cancellationToken = new CancellationToken(true);
103107

104-
settings.Clear();
105-
stoppingTokenSource.Dispose();
106-
await serviceProviderLease.DisposeAsync().ConfigureAwait(false);
108+
try
109+
{
110+
await StopCore(cancellationToken).ConfigureAwait(false);
111+
}
112+
#pragma warning disable PS0019
113+
catch (Exception)
114+
#pragma warning restore PS0019
115+
{
116+
// ignored because we're already disposing and we don't want to throw from DisposeAsync. Any exceptions from StopCore are already logged, so we can safely ignore them here.
117+
}
118+
finally
119+
{
120+
settings.Clear();
121+
stoppingTokenSource.Dispose();
122+
await serviceProviderLease.DisposeAsync().ConfigureAwait(false);
123+
}
107124
}
108125

109126
public Task Send(object message, SendOptions sendOptions, CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)