Skip to content

Commit 137a1cd

Browse files
committed
feat: update job execution to respect CancellationToken and implement timeout handling
1 parent 7838739 commit 137a1cd

5 files changed

Lines changed: 281 additions & 1 deletion

File tree

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ A lightweight background job processing library for .NET. GUSTO provides a simpl
1111
- **Concurrent Processing**: Configurable parallel job execution
1212
- **Job Scheduling**: Schedule jobs for future execution
1313
- **Failure Handling**: Built-in retry logic with customizable strategies
14+
- **Cooperative Cancellation**: Handler `CancellationToken` parameters receive worker cancellation for shutdown and execution timeouts
1415
- **OpenTelemetry Support**: Built-in metrics and distributed tracing
1516
- **Easy Testing**: Test hooks for integration testing without polling
1617

@@ -170,6 +171,8 @@ public class UserController : ControllerBase
170171
- **PollInterval**: Polling frequency (default: 10 seconds)
171172
- **JobExecutionTimeout**: Max runtime for a single job execution before it's treated as a failure and passed to `OnHandlerExecutionFailureAsync` (default: 5 minutes)
172173

174+
If a job handler method includes a `CancellationToken` parameter, GUSTO supplies an execution token that is canceled on host shutdown or when `JobExecutionTimeout` is reached.
175+
173176
## Advanced Patterns
174177

175178
### MongoDB Storage Provider

src/ByteBard.GUSTO.JobQueue/JobQueue.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public TStorageRecord ConstructRecordFromExpression(Expression expression, DateT
4141
var methodCallExpression = (MethodCallExpression)expression;
4242
var method = methodCallExpression.Method;
4343
var arguments = methodCallExpression.Arguments.Select(arg => Expression.Lambda(arg).Compile().DynamicInvoke()).ToArray();
44+
arguments = NormalizeCancellationTokenArguments(method, arguments);
4445

4546
Type targetType = methodCallExpression.Object switch
4647
{
@@ -62,4 +63,30 @@ public TStorageRecord ConstructRecordFromExpression(Expression expression, DateT
6263

6364
return record;
6465
}
66+
67+
private static object?[] NormalizeCancellationTokenArguments(System.Reflection.MethodInfo method, object?[] arguments)
68+
{
69+
var parameters = method.GetParameters();
70+
var length = Math.Min(arguments.Length, parameters.Length);
71+
72+
if (length == 0)
73+
{
74+
return arguments;
75+
}
76+
77+
object?[]? normalizedArguments = null;
78+
79+
for (var i = 0; i < length; i++)
80+
{
81+
if (parameters[i].ParameterType != typeof(CancellationToken))
82+
{
83+
continue;
84+
}
85+
86+
normalizedArguments ??= (object?[])arguments.Clone();
87+
normalizedArguments[i] = default(CancellationToken);
88+
}
89+
90+
return normalizedArguments ?? arguments;
91+
}
6592
}

src/ByteBard.GUSTO.JobQueue/JobQueueWorker.cs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ private async Task ExecuteJobAsync(
208208
var arguments = JsonConvert.DeserializeObject<object[]>(storedJob.ArgumentsJson, _settings);
209209
var jobInstance = ActivatorUtilities.CreateInstance(scope.ServiceProvider, jobType);
210210
var method = jobType.GetMethod(storedJob.MethodName);
211-
var handlerTask = (Task)method.Invoke(jobInstance, arguments);
211+
var invocationArguments = ApplyExecutionCancellationTokenArguments(method, arguments, timeoutCts.Token);
212+
var handlerTask = (Task)method.Invoke(jobInstance, invocationArguments);
212213

213214
await handlerTask.WaitAsync(timeoutCts.Token);
214215

@@ -231,6 +232,63 @@ private async Task ExecuteJobAsync(
231232
await RecordJobFailureAsync(storedJob, storage, ex, jobStopwatch, jobActivity, ct);
232233
}
233234
}
235+
236+
private static object[] ApplyExecutionCancellationTokenArguments(
237+
System.Reflection.MethodInfo method,
238+
object[]? arguments,
239+
CancellationToken executionCancellationToken)
240+
{
241+
var providedArguments = arguments ?? Array.Empty<object>();
242+
var parameters = method.GetParameters();
243+
if (parameters.Length == 0)
244+
{
245+
return providedArguments;
246+
}
247+
248+
object[]? invocationArguments = null;
249+
250+
if (providedArguments.Length < parameters.Length)
251+
{
252+
var canExpandWithOptionalDefaults = true;
253+
for (var i = providedArguments.Length; i < parameters.Length; i++)
254+
{
255+
if (!parameters[i].IsOptional)
256+
{
257+
canExpandWithOptionalDefaults = false;
258+
break;
259+
}
260+
}
261+
262+
if (canExpandWithOptionalDefaults)
263+
{
264+
invocationArguments = new object[parameters.Length];
265+
Array.Copy(providedArguments, invocationArguments, providedArguments.Length);
266+
267+
for (var i = providedArguments.Length; i < parameters.Length; i++)
268+
{
269+
invocationArguments[i] = Type.Missing;
270+
}
271+
}
272+
}
273+
274+
var effectiveArguments = invocationArguments ?? providedArguments;
275+
var length = Math.Min(effectiveArguments.Length, parameters.Length);
276+
277+
object[]? overriddenArguments = null;
278+
279+
for (var i = 0; i < length; i++)
280+
{
281+
if (parameters[i].ParameterType != typeof(CancellationToken))
282+
{
283+
continue;
284+
}
285+
286+
overriddenArguments ??= (object[])effectiveArguments.Clone();
287+
overriddenArguments[i] = executionCancellationToken;
288+
}
289+
290+
return overriddenArguments ?? effectiveArguments;
291+
}
234292

235293

236294
private static void RecordJobSuccess(TStorageRecord storedJob, Stopwatch jobStopwatch, Activity? jobActivity)

src/ByteBard.GUSTO.Tests/JobQueueTests.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class TestJobStorageRecord : IJobStorageRecord
2020
private interface ITestJob
2121
{
2222
Task DoSomethingAsync(string input);
23+
24+
Task DoSomethingWithCancellationAsync(string input, CancellationToken cancellationToken);
2325
}
2426

2527
private class TestJob : ITestJob
@@ -28,6 +30,11 @@ public virtual Task DoSomethingAsync(string input)
2830
{
2931
return Task.CompletedTask;
3032
}
33+
34+
public virtual Task DoSomethingWithCancellationAsync(string input, CancellationToken cancellationToken)
35+
{
36+
return Task.CompletedTask;
37+
}
3138
}
3239

3340
[Fact]
@@ -215,6 +222,38 @@ await Assert.ThrowsAsync<InvalidOperationException>(async () =>
215222
await jobQueue.EnqueueAsync<TestJob>(job => job.DoSomethingAsync("test"), null, cancellationToken));
216223
}
217224

225+
[Fact]
226+
public async Task EnqueueAsync_WhenMethodHasCancellationToken_StoresDefaultCancellationToken()
227+
{
228+
// Arrange
229+
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
230+
var jobQueue = new JobQueue<TestJobStorageRecord>(storageProvider);
231+
TestJobStorageRecord? capturedRecord = null;
232+
storageProvider
233+
.When(x => x.StoreJobAsync(Arg.Any<TestJobStorageRecord>(), Arg.Any<CancellationToken>()))
234+
.Do(ci => capturedRecord = ci.Arg<TestJobStorageRecord>());
235+
236+
using var queuedTokenSource = new CancellationTokenSource();
237+
queuedTokenSource.Cancel();
238+
239+
// Act
240+
await jobQueue.EnqueueAsync<TestJob>(
241+
job => job.DoSomethingWithCancellationAsync("test", queuedTokenSource.Token),
242+
null,
243+
CancellationToken.None);
244+
245+
// Assert
246+
Assert.NotNull(capturedRecord);
247+
var args = JsonConvert.DeserializeObject<object[]>(
248+
capturedRecord.ArgumentsJson,
249+
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
250+
251+
Assert.Equal(2, args.Length);
252+
Assert.Equal("test", args[0]?.ToString());
253+
var cancellationToken = Assert.IsType<CancellationToken>(args[1]);
254+
Assert.False(cancellationToken.CanBeCanceled);
255+
}
256+
218257
[Fact]
219258
public async Task EnqueueAsync_WhenCancellationRequested_PropagatesCancellationToStorage()
220259
{

src/ByteBard.GUSTO.Tests/JobQueueWorkerTests.cs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,32 @@ public virtual async Task ExecuteAsyncHang(string input)
5959
semaphoreSlim.Release();
6060
await Task.Delay(Timeout.InfiniteTimeSpan);
6161
}
62+
63+
public virtual async Task ExecuteAsyncObserveCancellation(string input, CancellationToken cancellationToken)
64+
{
65+
try
66+
{
67+
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
68+
}
69+
catch (OperationCanceledException)
70+
{
71+
semaphoreSlim.Release();
72+
throw;
73+
}
74+
}
75+
76+
public virtual async Task ExecuteAsyncObserveOptionalCancellation(string input, CancellationToken cancellationToken = default)
77+
{
78+
try
79+
{
80+
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
81+
}
82+
catch (OperationCanceledException)
83+
{
84+
semaphoreSlim.Release();
85+
throw;
86+
}
87+
}
6288
}
6389

6490
private GustoConfig GetTestConfig() => new GustoConfig
@@ -335,6 +361,133 @@ await AssertEventuallyAsync(
335361
}
336362
}
337363

364+
[Fact]
365+
public async Task ExecuteAsync_WhenJobHasCancellationToken_OverridesSerializedTokenWithExecutionToken()
366+
{
367+
// Arrange
368+
using var queuedTokenSource = new CancellationTokenSource();
369+
queuedTokenSource.Cancel();
370+
371+
var hangingJob = new TestJob
372+
{
373+
TrackingId = Guid.NewGuid(),
374+
JobType = typeof(TestableJob).AssemblyQualifiedName,
375+
MethodName = nameof(TestableJob.ExecuteAsyncObserveCancellation),
376+
ArgumentsJson = JsonConvert.SerializeObject(
377+
new object[] { "observe", queuedTokenSource.Token },
378+
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }),
379+
ExecuteAfter = DateTime.UtcNow,
380+
IsComplete = false
381+
};
382+
383+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
384+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
385+
.Returns(new List<TestJob> { hangingJob }, new List<TestJob>());
386+
387+
var services = new ServiceCollection();
388+
services.AddScoped<IJobStorageProvider<TestJob>>(_ => storage);
389+
var serviceProvider = services.BuildServiceProvider();
390+
391+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
392+
var config = Options.Create(new GustoConfig
393+
{
394+
Concurrency = 1,
395+
PollInterval = TimeSpan.FromMilliseconds(10),
396+
BatchSize = 1,
397+
JobExecutionTimeout = TimeSpan.FromMilliseconds(150)
398+
});
399+
400+
var worker = new JobQueueWorker<TestJob>(serviceProvider, config, logger);
401+
402+
// Act
403+
TestableJob.CreateSemaphore();
404+
var startedAt = DateTime.UtcNow;
405+
await worker.StartAsync(CancellationToken.None);
406+
407+
try
408+
{
409+
var canceledInHandler = await TestableJob.semaphoreSlim.WaitAsync(TimeSpan.FromSeconds(2));
410+
Assert.True(canceledInHandler);
411+
412+
var elapsed = DateTime.UtcNow - startedAt;
413+
Assert.True(elapsed >= TimeSpan.FromMilliseconds(100));
414+
415+
await AssertEventuallyAsync(
416+
() =>
417+
{
418+
storage.Received().OnHandlerExecutionFailureAsync(
419+
hangingJob,
420+
Arg.Is<Exception>(ex => ex is TimeoutException),
421+
Arg.Any<CancellationToken>());
422+
},
423+
TimeSpan.FromSeconds(2));
424+
}
425+
finally
426+
{
427+
await worker.StopAsync(CancellationToken.None);
428+
}
429+
}
430+
431+
[Fact]
432+
public async Task ExecuteAsync_WhenOptionalCancellationTokenArgumentMissing_InjectsExecutionToken()
433+
{
434+
// Arrange
435+
var hangingJob = new TestJob
436+
{
437+
TrackingId = Guid.NewGuid(),
438+
JobType = typeof(TestableJob).AssemblyQualifiedName,
439+
MethodName = nameof(TestableJob.ExecuteAsyncObserveOptionalCancellation),
440+
ArgumentsJson = JsonConvert.SerializeObject(
441+
new object[] { "observe" },
442+
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }),
443+
ExecuteAfter = DateTime.UtcNow,
444+
IsComplete = false
445+
};
446+
447+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
448+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
449+
.Returns(new List<TestJob> { hangingJob }, new List<TestJob>());
450+
451+
var services = new ServiceCollection();
452+
services.AddScoped<IJobStorageProvider<TestJob>>(_ => storage);
453+
var serviceProvider = services.BuildServiceProvider();
454+
455+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
456+
var config = Options.Create(new GustoConfig
457+
{
458+
Concurrency = 1,
459+
PollInterval = TimeSpan.FromMilliseconds(10),
460+
BatchSize = 1,
461+
JobExecutionTimeout = TimeSpan.FromMilliseconds(150)
462+
});
463+
464+
var worker = new JobQueueWorker<TestJob>(serviceProvider, config, logger);
465+
466+
// Act
467+
TestableJob.CreateSemaphore();
468+
await worker.StartAsync(CancellationToken.None);
469+
470+
try
471+
{
472+
var canceledInHandler = await TestableJob.semaphoreSlim.WaitAsync(TimeSpan.FromSeconds(2));
473+
Assert.True(canceledInHandler);
474+
475+
await AssertEventuallyAsync(
476+
() =>
477+
{
478+
storage.Received().OnHandlerExecutionFailureAsync(
479+
hangingJob,
480+
Arg.Is<Exception>(ex => ex is TimeoutException),
481+
Arg.Any<CancellationToken>());
482+
},
483+
TimeSpan.FromSeconds(2));
484+
}
485+
finally
486+
{
487+
await worker.StopAsync(CancellationToken.None);
488+
}
489+
}
490+
338491
[Fact]
339492
public async Task ExecuteAsync_WhenStorageThrowsException_LogsAndDelays()
340493
{

0 commit comments

Comments
 (0)