Skip to content

Commit 27c9b5e

Browse files
committed
fix: better support for scoped services.
added more tests as well
1 parent 6c2ad0d commit 27c9b5e

3 files changed

Lines changed: 294 additions & 11 deletions

File tree

src/ByteBard.GUSTO.JobQueue/JobQueueWorker.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
6666

6767
await Parallel.ForEachAsync(jobStorageRecords, parallelOptions, async (storedJob, ct) =>
6868
{
69+
using var scope = _serviceProvider.CreateScope();
6970
try
7071
{
7172
var jobType = Type.GetType(storedJob.JobType);
7273
var arguments = JsonConvert.DeserializeObject<object[]>(storedJob.ArgumentsJson, _settings);
73-
var jobInstance = ActivatorUtilities.CreateInstance(_serviceProvider, jobType);
74+
var jobInstance = ActivatorUtilities.CreateInstance(scope.ServiceProvider, jobType);
7475
var method = jobType.GetMethod(storedJob.MethodName);
7576
await (Task)method.Invoke(jobInstance, arguments);
7677

src/ByteBard.GUSTO.Tests/JobQueueTests.cs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public virtual Task DoSomethingAsync(string input)
3131
}
3232

3333
[Fact]
34-
public async Task EnqueueAsync_ValidInterfaceJobType_StoresSerializedJobWithRealType()
34+
public async Task EnqueueAsync_WhenValidInterfaceJobTypeProvided_StoresSerializedJobWithRealType()
3535
{
3636
// Arrange
3737
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
@@ -54,7 +54,7 @@ await storageProvider.Received(1).StoreJobAsync(
5454
}
5555

5656
[Fact]
57-
public async Task EnqueueAsync_ValidMethodCall_StoresSerializedJob()
57+
public async Task EnqueueAsync_WhenValidMethodCallProvided_StoresSerializedJob()
5858
{
5959
// Arrange
6060
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
@@ -78,7 +78,7 @@ await storageProvider.Received(1).StoreJobAsync(
7878

7979

8080
[Fact]
81-
public async Task EnqueueAsyncGeneric_ValidMethodCall_StoresSerializedJob()
81+
public async Task EnqueueAsyncGeneric_WhenValidMethodCallProvided_StoresSerializedJob()
8282
{
8383
// Arrange
8484
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
@@ -100,7 +100,7 @@ await storageProvider.Received(1).StoreJobAsync(
100100
}
101101

102102
[Fact]
103-
public async Task EnqueueAsync_WithExecuteAfter_UsesProvidedTime()
103+
public async Task EnqueueAsync_WhenExecuteAfterTimeProvided_UsesProvidedTime()
104104
{
105105
// Arrange
106106
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
@@ -119,7 +119,7 @@ await storageProvider.Received(1).StoreJobAsync(
119119
}
120120

121121
[Fact]
122-
public async Task EnqueueAsync_WithoutExecuteAfter_SetsToUtcNow()
122+
public async Task EnqueueAsync_WhenExecuteAfterNotProvided_SetsToUtcNow()
123123
{
124124
// Arrange
125125
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
@@ -137,4 +137,99 @@ await storageProvider.Received(1).StoreJobAsync(
137137
record.ExecuteAfter >= beforeCall && record.ExecuteAfter <= afterCall),
138138
cancellationToken);
139139
}
140+
141+
public class JobWithMultipleParameters
142+
{
143+
public Task ProcessAsync(string message, int count, bool isEnabled)
144+
{
145+
return Task.CompletedTask;
146+
}
147+
}
148+
149+
[Fact]
150+
public async Task EnqueueAsync_WhenMultipleParametersProvided_SerializesAllArguments()
151+
{
152+
// Arrange
153+
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
154+
var jobQueue = new JobQueue<TestJobStorageRecord>(storageProvider);
155+
var cancellationToken = CancellationToken.None;
156+
157+
// Act
158+
var trackingId = await jobQueue.EnqueueAsync<JobWithMultipleParameters>(
159+
job => job.ProcessAsync("hello", 42, true),
160+
null,
161+
cancellationToken);
162+
163+
// Assert
164+
await storageProvider.Received(1).StoreJobAsync(
165+
Arg.Is<TestJobStorageRecord>(record =>
166+
record.TrackingId == trackingId &&
167+
record.MethodName == "ProcessAsync" &&
168+
record.JobType == typeof(JobWithMultipleParameters).AssemblyQualifiedName &&
169+
JsonConvert.DeserializeObject<object[]>(record.ArgumentsJson).Length == 3 &&
170+
JsonConvert.DeserializeObject<object[]>(record.ArgumentsJson)[0].ToString() == "hello" &&
171+
JsonConvert.DeserializeObject<object[]>(record.ArgumentsJson)[1].ToString() == "42" &&
172+
JsonConvert.DeserializeObject<object[]>(record.ArgumentsJson)[2].ToString() == "True"),
173+
cancellationToken);
174+
}
175+
176+
[Fact]
177+
public async Task ConstructRecordFromExpression_WhenCalled_ReturnsValidRecord()
178+
{
179+
// Arrange
180+
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
181+
var jobQueue = new JobQueue<TestJobStorageRecord>(storageProvider);
182+
var executeAfter = DateTime.UtcNow.AddMinutes(5);
183+
184+
// Act
185+
var record = jobQueue.ConstructRecordFromExpression<TestJob>(
186+
job => job.DoSomethingAsync("test"),
187+
executeAfter);
188+
189+
// Assert
190+
Assert.NotEqual(Guid.Empty, record.TrackingId);
191+
Assert.Equal("DoSomethingAsync", record.MethodName);
192+
Assert.Equal(typeof(TestJob).AssemblyQualifiedName, record.JobType);
193+
Assert.False(record.IsComplete);
194+
Assert.Equal(executeAfter, record.ExecuteAfter);
195+
Assert.True(record.CreatedOn <= DateTime.UtcNow);
196+
197+
var args = JsonConvert.DeserializeObject<object[]>(record.ArgumentsJson);
198+
Assert.Single(args);
199+
Assert.Equal("test", args[0]);
200+
}
201+
202+
[Fact]
203+
public async Task EnqueueAsync_WhenStorageProviderThrowsException_PropagatesException()
204+
{
205+
// Arrange
206+
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
207+
storageProvider.StoreJobAsync(Arg.Any<TestJobStorageRecord>(), Arg.Any<CancellationToken>())
208+
.Returns<Task>(_ => throw new InvalidOperationException("Storage error"));
209+
210+
var jobQueue = new JobQueue<TestJobStorageRecord>(storageProvider);
211+
var cancellationToken = CancellationToken.None;
212+
213+
// Act & Assert
214+
await Assert.ThrowsAsync<InvalidOperationException>(async () =>
215+
await jobQueue.EnqueueAsync<TestJob>(job => job.DoSomethingAsync("test"), null, cancellationToken));
216+
}
217+
218+
[Fact]
219+
public async Task EnqueueAsync_WhenCancellationRequested_PropagatesCancellationToStorage()
220+
{
221+
// Arrange
222+
var storageProvider = Substitute.For<IJobStorageProvider<TestJobStorageRecord>>();
223+
storageProvider.StoreJobAsync(Arg.Any<TestJobStorageRecord>(), Arg.Any<CancellationToken>())
224+
.Returns(Task.FromCanceled(new CancellationToken(true)));
225+
226+
var jobQueue = new JobQueue<TestJobStorageRecord>(storageProvider);
227+
228+
using var cts = new CancellationTokenSource();
229+
cts.Cancel();
230+
231+
// Act & Assert
232+
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
233+
await jobQueue.EnqueueAsync<TestJob>(job => job.DoSomethingAsync("test"), null, cts.Token));
234+
}
140235
}

src/ByteBard.GUSTO.Tests/JobQueueWorkerTests.cs

Lines changed: 192 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public virtual Task ExecuteAsyncFail(string input)
6161
};
6262

6363
[Fact]
64-
public async Task ExecuteAsync_NoJobs_DelaysAndContinuesLoop()
64+
public async Task ExecuteAsync_WhenNoJobsAvailable_DelaysAndContinuesLoop()
6565
{
6666
// Arrange
6767
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
@@ -85,7 +85,7 @@ public async Task ExecuteAsync_NoJobs_DelaysAndContinuesLoop()
8585
}
8686

8787
[Fact]
88-
public async Task ExecuteAsync_ValidJob_InvokesJobAndMarksComplete()
88+
public async Task ExecuteAsync_WhenValidJobExists_InvokesJobAndMarksComplete()
8989
{
9090
// Arrange
9191
var job = new TestJob
@@ -120,7 +120,7 @@ public async Task ExecuteAsync_ValidJob_InvokesJobAndMarksComplete()
120120
}
121121

122122
[Fact]
123-
public async Task ExecuteAsync_JobThrowsException_LogsAndHandlesFailure()
123+
public async Task ExecuteAsync_WhenJobThrowsException_LogsAndHandlesFailure()
124124
{
125125
// Arrange
126126
var job = new TestJob
@@ -154,7 +154,7 @@ public async Task ExecuteAsync_JobThrowsException_LogsAndHandlesFailure()
154154
}
155155

156156
[Fact]
157-
public async Task ExecuteAsync_StorageThrowsException_LogsAndDelays()
157+
public async Task ExecuteAsync_WhenStorageThrowsException_LogsAndDelays()
158158
{
159159
// Arrange
160160
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
@@ -178,7 +178,7 @@ public async Task ExecuteAsync_StorageThrowsException_LogsAndDelays()
178178
}
179179

180180
[Fact]
181-
public async Task ExecuteAsync_EnqueuedViaJobQueue_JobIsExecutedAndMarkedComplete()
181+
public async Task ExecuteAsync_WhenEnqueuedViaJobQueue_JobIsExecutedAndMarkedComplete()
182182
{
183183
// Arrange
184184
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
@@ -214,4 +214,191 @@ public async Task ExecuteAsync_EnqueuedViaJobQueue_JobIsExecutedAndMarkedComplet
214214
await storage.Received().MarkJobAsCompleteAsync(capturedJob, Arg.Any<CancellationToken>());
215215
Assert.Equal("from real queue", JsonConvert.DeserializeObject<string[]>(capturedJob.ArgumentsJson)[0]);
216216
}
217+
218+
public interface IScopedTestService
219+
{
220+
string ProcessData(string input);
221+
bool IsDisposed { get; }
222+
}
223+
224+
public class ScopedTestService : IScopedTestService, IDisposable
225+
{
226+
public bool IsDisposed { get; private set; }
227+
228+
public string ProcessData(string input)
229+
{
230+
if (IsDisposed) throw new ObjectDisposedException(nameof(ScopedTestService));
231+
return $"Processed: {input}";
232+
}
233+
234+
public void Dispose()
235+
{
236+
IsDisposed = true;
237+
}
238+
}
239+
240+
public class JobWithScopedDependency
241+
{
242+
private readonly IScopedTestService _scopedService;
243+
private readonly ITestResultCollector _resultCollector;
244+
245+
public JobWithScopedDependency(IScopedTestService scopedService, ITestResultCollector resultCollector)
246+
{
247+
_scopedService = scopedService;
248+
_resultCollector = resultCollector;
249+
}
250+
251+
public Task ExecuteWithScopedService(string input)
252+
{
253+
try
254+
{
255+
var result = _scopedService.ProcessData(input);
256+
_resultCollector.AddResult(result);
257+
return Task.CompletedTask;
258+
}
259+
finally
260+
{
261+
_resultCollector.SignalCompletion();
262+
}
263+
}
264+
}
265+
266+
public interface ITestResultCollector
267+
{
268+
void AddResult(string result);
269+
void SignalCompletion();
270+
Task WaitForCompletionAsync(int expectedCount, TimeSpan timeout);
271+
List<string> GetResults();
272+
}
273+
274+
public class TestResultCollector : ITestResultCollector
275+
{
276+
private readonly List<string> _results = new();
277+
private readonly SemaphoreSlim _semaphore = new(0, 10);
278+
279+
public void AddResult(string result)
280+
{
281+
lock (_results)
282+
{
283+
_results.Add(result);
284+
}
285+
}
286+
287+
public void SignalCompletion()
288+
{
289+
_semaphore.Release();
290+
}
291+
292+
public async Task WaitForCompletionAsync(int expectedCount, TimeSpan timeout)
293+
{
294+
for (int i = 0; i < expectedCount; i++)
295+
{
296+
await _semaphore.WaitAsync(timeout);
297+
}
298+
}
299+
300+
public List<string> GetResults()
301+
{
302+
lock (_results)
303+
{
304+
return new List<string>(_results);
305+
}
306+
}
307+
}
308+
309+
[Fact]
310+
public async Task ExecuteAsync_WhenJobRequiresScopedService_CreatesAndUsesServiceFromScope()
311+
{
312+
// Arrange
313+
var resultCollector = new TestResultCollector();
314+
315+
var job = new TestJob
316+
{
317+
TrackingId = Guid.NewGuid(),
318+
JobType = typeof(JobWithScopedDependency).AssemblyQualifiedName,
319+
MethodName = nameof(JobWithScopedDependency.ExecuteWithScopedService),
320+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "test-data" }),
321+
ExecuteAfter = DateTime.UtcNow,
322+
IsComplete = false
323+
};
324+
325+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
326+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
327+
.Returns(new List<TestJob> { job }, new List<TestJob>());
328+
329+
var services = new ServiceCollection();
330+
services.AddScoped<IScopedTestService, ScopedTestService>();
331+
services.AddSingleton<ITestResultCollector>(resultCollector);
332+
var serviceProvider = services.BuildServiceProvider();
333+
334+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
335+
var config = Options.Create(GetTestConfig());
336+
337+
var worker = new JobQueueWorker<TestJob>(serviceProvider, storage, config, logger);
338+
339+
// Act
340+
var running = worker.StartAsync(CancellationToken.None);
341+
await resultCollector.WaitForCompletionAsync(1, TimeSpan.FromSeconds(5));
342+
await worker.StopAsync(CancellationToken.None);
343+
344+
// Assert
345+
await storage.Received().MarkJobAsCompleteAsync(job, Arg.Any<CancellationToken>());
346+
var results = resultCollector.GetResults();
347+
Assert.Single(results);
348+
Assert.Equal("Processed: test-data", results[0]);
349+
}
350+
351+
[Fact]
352+
public async Task ExecuteAsync_WhenMultipleJobsWithScopedServices_EachJobGetsOwnServiceInstance()
353+
{
354+
// Arrange
355+
var resultCollector = new TestResultCollector();
356+
357+
var job1 = new TestJob
358+
{
359+
TrackingId = Guid.NewGuid(),
360+
JobType = typeof(JobWithScopedDependency).AssemblyQualifiedName,
361+
MethodName = nameof(JobWithScopedDependency.ExecuteWithScopedService),
362+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "job1" }),
363+
ExecuteAfter = DateTime.UtcNow,
364+
IsComplete = false
365+
};
366+
367+
var job2 = new TestJob
368+
{
369+
TrackingId = Guid.NewGuid(),
370+
JobType = typeof(JobWithScopedDependency).AssemblyQualifiedName,
371+
MethodName = nameof(JobWithScopedDependency.ExecuteWithScopedService),
372+
ArgumentsJson = JsonConvert.SerializeObject(new object[] { "job2" }),
373+
ExecuteAfter = DateTime.UtcNow,
374+
IsComplete = false
375+
};
376+
377+
var storage = Substitute.For<IJobStorageProvider<TestJob>>();
378+
storage.GetBatchAsync(Arg.Any<JobSearchParams<TestJob>>(), Arg.Any<CancellationToken>())
379+
.Returns(new List<TestJob> { job1, job2 }, new List<TestJob>());
380+
381+
var services = new ServiceCollection();
382+
services.AddScoped<IScopedTestService, ScopedTestService>();
383+
services.AddSingleton<ITestResultCollector>(resultCollector);
384+
var serviceProvider = services.BuildServiceProvider();
385+
386+
var logger = Substitute.For<ILogger<JobQueueWorker<TestJob>>>();
387+
var config = Options.Create(new GustoConfig { Concurrency = 2, PollInterval = TimeSpan.FromMilliseconds(10), BatchSize = 2 });
388+
389+
var worker = new JobQueueWorker<TestJob>(serviceProvider, storage, config, logger);
390+
391+
// Act
392+
var running = worker.StartAsync(CancellationToken.None);
393+
await resultCollector.WaitForCompletionAsync(2, TimeSpan.FromSeconds(5));
394+
await worker.StopAsync(CancellationToken.None);
395+
396+
// Assert
397+
await storage.Received().MarkJobAsCompleteAsync(job1, Arg.Any<CancellationToken>());
398+
await storage.Received().MarkJobAsCompleteAsync(job2, Arg.Any<CancellationToken>());
399+
var results = resultCollector.GetResults();
400+
Assert.Equal(2, results.Count);
401+
Assert.Contains("Processed: job1", results);
402+
Assert.Contains("Processed: job2", results);
403+
}
217404
}

0 commit comments

Comments
 (0)