Skip to content

Commit 7838739

Browse files
committed
fix: Update configuration and implement job execution timeout handling
1 parent b9449ee commit 7838739

4 files changed

Lines changed: 200 additions & 4 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ services.AddGusto<JobRecord, InMemoryJobStorageProvider>(
111111
"Gusto": {
112112
"BatchSize": 50,
113113
"Concurrency": 8,
114-
"PollInterval": "00:00:10"
114+
"PollInterval": "00:00:10",
115+
"JobExecutionTimeout": "00:05:00"
115116
}
116117
}
117118
```
@@ -167,6 +168,7 @@ public class UserController : ControllerBase
167168
- **BatchSize**: Jobs to process per batch (default: 10)
168169
- **Concurrency**: Max parallel jobs (default: Environment.ProcessorCount)
169170
- **PollInterval**: Polling frequency (default: 10 seconds)
171+
- **JobExecutionTimeout**: Max runtime for a single job execution before it's treated as a failure and passed to `OnHandlerExecutionFailureAsync` (default: 5 minutes)
170172

171173
## Advanced Patterns
172174

src/ByteBard.GUSTO.JobQueue/GustoConfig.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@ public class GustoConfig
99
public TimeSpan PollInterval { get; set; } = TimeSpan.FromSeconds(10);
1010

1111
public int Concurrency { get; set; } = Environment.ProcessorCount;
12+
13+
public TimeSpan JobExecutionTimeout { get; set; } = TimeSpan.FromMinutes(5);
1214
}

src/ByteBard.GUSTO.JobQueue/JobQueueWorker.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
101101
await ProcessBatchCycleAsync(parallelOptions, stoppingToken);
102102
SignalBatchCompletedIfSet();
103103
}
104+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
105+
{
106+
break;
107+
}
104108
catch (Exception ex)
105109
{
106110
_logger.LogError(ex, "Unexpected failure in JobQueueWorker.");
@@ -195,18 +199,33 @@ private async Task ExecuteJobAsync(
195199

196200
using var scope = _serviceProvider.CreateScope();
197201
var storage = scope.ServiceProvider.GetRequiredService<IJobStorageProvider<TStorageRecord>>();
202+
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
203+
timeoutCts.CancelAfter(_config.JobExecutionTimeout);
198204

199205
try
200206
{
201207
var jobType = Type.GetType(storedJob.JobType);
202208
var arguments = JsonConvert.DeserializeObject<object[]>(storedJob.ArgumentsJson, _settings);
203209
var jobInstance = ActivatorUtilities.CreateInstance(scope.ServiceProvider, jobType);
204210
var method = jobType.GetMethod(storedJob.MethodName);
205-
await (Task)method.Invoke(jobInstance, arguments);
211+
var handlerTask = (Task)method.Invoke(jobInstance, arguments);
212+
213+
await handlerTask.WaitAsync(timeoutCts.Token);
206214

207215
await storage.MarkJobAsCompleteAsync(storedJob, ct);
208216
RecordJobSuccess(storedJob, jobStopwatch, jobActivity);
209217
}
218+
catch (OperationCanceledException ex) when (timeoutCts.IsCancellationRequested && !ct.IsCancellationRequested)
219+
{
220+
var timeoutException = new TimeoutException(
221+
$"Job execution timed out after {_config.JobExecutionTimeout}.",
222+
ex);
223+
await RecordJobFailureAsync(storedJob, storage, timeoutException, jobStopwatch, jobActivity, ct);
224+
}
225+
catch (OperationCanceledException) when (ct.IsCancellationRequested)
226+
{
227+
throw;
228+
}
210229
catch (Exception ex)
211230
{
212231
await RecordJobFailureAsync(storedJob, storage, ex, jobStopwatch, jobActivity, ct);

src/ByteBard.GUSTO.Tests/JobQueueWorkerTests.cs

Lines changed: 175 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TestableJob
2727

2828
public static void CreateSemaphore()
2929
{
30-
semaphoreSlim = new SemaphoreSlim(0, 1);
30+
semaphoreSlim = new SemaphoreSlim(0, int.MaxValue);
3131
}
3232

3333
public virtual Task ExecuteAsyncSuccess(string input)
@@ -53,15 +53,49 @@ public virtual Task ExecuteAsyncFail(string input)
5353
semaphoreSlim.Release();
5454
}
5555
}
56+
57+
public virtual async Task ExecuteAsyncHang(string input)
58+
{
59+
semaphoreSlim.Release();
60+
await Task.Delay(Timeout.InfiniteTimeSpan);
61+
}
5662
}
5763

5864
private GustoConfig GetTestConfig() => new GustoConfig
5965
{
6066
Concurrency = 1,
6167
PollInterval = TimeSpan.FromMilliseconds(10),
62-
BatchSize = 1
68+
BatchSize = 1,
69+
JobExecutionTimeout = TimeSpan.FromSeconds(30)
6370
};
6471

72+
private static async Task AssertEventuallyAsync(Action assertion, TimeSpan timeout)
73+
{
74+
var started = DateTime.UtcNow;
75+
Exception? lastException = null;
76+
77+
while (DateTime.UtcNow - started < timeout)
78+
{
79+
try
80+
{
81+
assertion();
82+
return;
83+
}
84+
catch (Exception ex)
85+
{
86+
lastException = ex;
87+
await Task.Delay(25);
88+
}
89+
}
90+
91+
if (lastException != null)
92+
{
93+
throw lastException;
94+
}
95+
96+
assertion();
97+
}
98+
6599
[Fact]
66100
public async Task ExecuteAsync_WhenNoJobsAvailable_DelaysAndContinuesLoop()
67101
{
@@ -162,6 +196,145 @@ public async Task ExecuteAsync_WhenJobThrowsException_LogsAndHandlesFailure()
162196
await storage.Received().OnHandlerExecutionFailureAsync(job, Arg.Any<Exception>(), Arg.Any<CancellationToken>());
163197
}
164198

199+
[Fact]
200+
public async Task ExecuteAsync_WhenOneJobHangs_OtherJobsCompleteAndHangingJobTimesOut()
201+
{
202+
// Arrange
203+
var hangingJob = new TestJob
204+
{
205+
TrackingId = Guid.NewGuid(),
206+
JobType = typeof(TestableJob).AssemblyQualifiedName,
207+
MethodName = nameof(TestableJob.ExecuteAsyncHang),
208+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "hang" }),
209+
ExecuteAfter = DateTime.UtcNow,
210+
IsComplete = false
211+
};
212+
213+
var healthyJob = new TestJob
214+
{
215+
TrackingId = Guid.NewGuid(),
216+
JobType = typeof(TestableJob).AssemblyQualifiedName,
217+
MethodName = nameof(TestableJob.ExecuteAsyncSuccess),
218+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "ok" }),
219+
ExecuteAfter = DateTime.UtcNow,
220+
IsComplete = false
221+
};
222+
223+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
224+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
225+
.Returns(new List<TestJob> { hangingJob, healthyJob }, new List<TestJob>());
226+
227+
var services = new ServiceCollection();
228+
services.AddScoped<IJobStorageProvider<TestJob>>(_ => storage);
229+
var serviceProvider = services.BuildServiceProvider();
230+
231+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
232+
var config = Options.Create(new GustoConfig
233+
{
234+
Concurrency = 2,
235+
PollInterval = TimeSpan.FromMilliseconds(10),
236+
BatchSize = 2,
237+
JobExecutionTimeout = TimeSpan.FromMilliseconds(100)
238+
});
239+
240+
var worker = new JobQueueWorker<TestJob>(serviceProvider, config, logger);
241+
242+
// Act
243+
TestableJob.CreateSemaphore();
244+
await worker.StartAsync(CancellationToken.None);
245+
246+
try
247+
{
248+
await AssertEventuallyAsync(
249+
() =>
250+
{
251+
storage.Received().MarkJobAsCompleteAsync(healthyJob, Arg.Any<CancellationToken>());
252+
},
253+
TimeSpan.FromSeconds(2));
254+
255+
await AssertEventuallyAsync(
256+
() =>
257+
{
258+
storage.Received().OnHandlerExecutionFailureAsync(
259+
hangingJob,
260+
Arg.Is<Exception>(ex => ex is TimeoutException),
261+
Arg.Any<CancellationToken>());
262+
},
263+
TimeSpan.FromSeconds(2));
264+
}
265+
finally
266+
{
267+
await worker.StopAsync(CancellationToken.None);
268+
}
269+
}
270+
271+
[Fact]
272+
public async Task ExecuteAsync_WhenJobTimesOut_WorkerContinuesPollingNextCycles()
273+
{
274+
// Arrange
275+
var hangingJob = new TestJob
276+
{
277+
TrackingId = Guid.NewGuid(),
278+
JobType = typeof(TestableJob).AssemblyQualifiedName,
279+
MethodName = nameof(TestableJob.ExecuteAsyncHang),
280+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "hang" }),
281+
ExecuteAfter = DateTime.UtcNow,
282+
IsComplete = false
283+
};
284+
285+
var getBatchCallCount = 0;
286+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
287+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
288+
.Returns(_ =>
289+
{
290+
var callCount = Interlocked.Increment(ref getBatchCallCount);
291+
IEnumerable<TestJob> jobs = callCount == 1
292+
? new List<TestJob> { hangingJob }
293+
: new List<TestJob>();
294+
return Task.FromResult(jobs);
295+
});
296+
297+
var services = new ServiceCollection();
298+
services.AddScoped<IJobStorageProvider<TestJob>>(_ => storage);
299+
var serviceProvider = services.BuildServiceProvider();
300+
301+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
302+
var config = Options.Create(new GustoConfig
303+
{
304+
Concurrency = 1,
305+
PollInterval = TimeSpan.FromMilliseconds(10),
306+
BatchSize = 1,
307+
JobExecutionTimeout = TimeSpan.FromMilliseconds(100)
308+
});
309+
310+
var worker = new JobQueueWorker<TestJob>(serviceProvider, config, logger);
311+
312+
// Act
313+
TestableJob.CreateSemaphore();
314+
await worker.StartAsync(CancellationToken.None);
315+
316+
try
317+
{
318+
await AssertEventuallyAsync(
319+
() => Assert.True(Volatile.Read(ref getBatchCallCount) >= 2),
320+
TimeSpan.FromSeconds(2));
321+
322+
await AssertEventuallyAsync(
323+
() =>
324+
{
325+
storage.Received().OnHandlerExecutionFailureAsync(
326+
hangingJob,
327+
Arg.Is<Exception>(ex => ex is TimeoutException),
328+
Arg.Any<CancellationToken>());
329+
},
330+
TimeSpan.FromSeconds(2));
331+
}
332+
finally
333+
{
334+
await worker.StopAsync(CancellationToken.None);
335+
}
336+
}
337+
165338
[Fact]
166339
public async Task ExecuteAsync_WhenStorageThrowsException_LogsAndDelays()
167340
{

0 commit comments

Comments
 (0)