@@ -61,7 +61,7 @@ public virtual Task ExecuteAsyncFail(string input)
6161 } ;
6262
6363 [ Fact ]
64- public async Task ExecuteAsync_NoJobs_DelaysAndContinuesLoop ( )
64+ public async Task ExecuteAsync_WhenNoJobsAvailable_DelaysAndContinuesLoop ( )
6565 {
6666 // Arrange
6767 var storage = Substitute . For < IJobStorageProvider < TestJob > > ( ) ;
@@ -85,7 +85,7 @@ public async Task ExecuteAsync_NoJobs_DelaysAndContinuesLoop()
8585 }
8686
8787 [ Fact ]
88- public async Task ExecuteAsync_ValidJob_InvokesJobAndMarksComplete ( )
88+ public async Task ExecuteAsync_WhenValidJobExists_InvokesJobAndMarksComplete ( )
8989 {
9090 // Arrange
9191 var job = new TestJob
@@ -120,7 +120,7 @@ public async Task ExecuteAsync_ValidJob_InvokesJobAndMarksComplete()
120120 }
121121
122122 [ Fact ]
123- public async Task ExecuteAsync_JobThrowsException_LogsAndHandlesFailure ( )
123+ public async Task ExecuteAsync_WhenJobThrowsException_LogsAndHandlesFailure ( )
124124 {
125125 // Arrange
126126 var job = new TestJob
@@ -154,7 +154,7 @@ public async Task ExecuteAsync_JobThrowsException_LogsAndHandlesFailure()
154154 }
155155
156156 [ Fact ]
157- public async Task ExecuteAsync_StorageThrowsException_LogsAndDelays ( )
157+ public async Task ExecuteAsync_WhenStorageThrowsException_LogsAndDelays ( )
158158 {
159159 // Arrange
160160 var storage = Substitute . For < IJobStorageProvider < TestJob > > ( ) ;
@@ -178,7 +178,7 @@ public async Task ExecuteAsync_StorageThrowsException_LogsAndDelays()
178178 }
179179
180180 [ Fact ]
181- public async Task ExecuteAsync_EnqueuedViaJobQueue_JobIsExecutedAndMarkedComplete ( )
181+ public async Task ExecuteAsync_WhenEnqueuedViaJobQueue_JobIsExecutedAndMarkedComplete ( )
182182 {
183183 // Arrange
184184 var storage = Substitute . For < IJobStorageProvider < TestJob > > ( ) ;
@@ -214,4 +214,145 @@ public async Task ExecuteAsync_EnqueuedViaJobQueue_JobIsExecutedAndMarkedComplet
214214 await storage . Received ( ) . MarkJobAsCompleteAsync ( capturedJob , Arg . Any < CancellationToken > ( ) ) ;
215215 Assert . Equal ( "from real queue" , JsonConvert . DeserializeObject < string [ ] > ( capturedJob . ArgumentsJson ) [ 0 ] ) ;
216216 }
217+
218+ public interface IScopedTestService
219+ {
220+ string ProcessData ( string input ) ;
221+ bool IsDisposed { get ; }
222+ }
223+
224+ public class ScopedTestService : IScopedTestService , IDisposable
225+ {
226+ public bool IsDisposed { get ; private set ; }
227+
228+ public string ProcessData ( string input )
229+ {
230+ if ( IsDisposed ) throw new ObjectDisposedException ( nameof ( ScopedTestService ) ) ;
231+ return $ "Processed: { input } ";
232+ }
233+
234+ public void Dispose ( )
235+ {
236+ IsDisposed = true ;
237+ }
238+ }
239+
240+ public class JobWithScopedDependency
241+ {
242+ private readonly IScopedTestService _scopedService ;
243+ public static List < string > ExecutionResults = new ( ) ;
244+ public static SemaphoreSlim ExecutionSemaphore = new ( 0 , 10 ) ;
245+
246+ public JobWithScopedDependency ( IScopedTestService scopedService )
247+ {
248+ _scopedService = scopedService ;
249+ }
250+
251+ public Task ExecuteWithScopedService ( string input )
252+ {
253+ try
254+ {
255+ var result = _scopedService . ProcessData ( input ) ;
256+ ExecutionResults . Add ( result ) ;
257+ return Task . CompletedTask ;
258+ }
259+ finally
260+ {
261+ ExecutionSemaphore . Release ( ) ;
262+ }
263+ }
264+ }
265+
266+ [ Fact ]
267+ public async Task ExecuteAsync_WhenJobRequiresScopedService_CreatesAndUsesServiceFromScope ( )
268+ {
269+ // Arrange
270+ JobWithScopedDependency . ExecutionResults . Clear ( ) ;
271+
272+ var job = new TestJob
273+ {
274+ TrackingId = Guid . NewGuid ( ) ,
275+ JobType = typeof ( JobWithScopedDependency ) . AssemblyQualifiedName ,
276+ MethodName = nameof ( JobWithScopedDependency . ExecuteWithScopedService ) ,
277+ ArgumentsJson = JsonConvert . SerializeObject ( new object [ ] { "test-data" } ) ,
278+ ExecuteAfter = DateTime . UtcNow ,
279+ IsComplete = false
280+ } ;
281+
282+ var storage = Substitute . For < IJobStorageProvider < TestJob > > ( ) ;
283+ storage . GetBatchAsync ( Arg . Any < JobSearchParams < TestJob > > ( ) , Arg . Any < CancellationToken > ( ) )
284+ . Returns ( new List < TestJob > { job } ) ;
285+
286+ var services = new ServiceCollection ( ) ;
287+ services . AddScoped < IScopedTestService , ScopedTestService > ( ) ;
288+ var serviceProvider = services . BuildServiceProvider ( ) ;
289+
290+ var logger = Substitute . For < ILogger < JobQueueWorker < TestJob > > > ( ) ;
291+ var config = Options . Create ( GetTestConfig ( ) ) ;
292+
293+ var worker = new JobQueueWorker < TestJob > ( serviceProvider , storage , config , logger ) ;
294+
295+ // Act
296+ var running = worker . StartAsync ( CancellationToken . None ) ;
297+ await JobWithScopedDependency . ExecutionSemaphore . WaitAsync ( TimeSpan . FromSeconds ( 5 ) ) ;
298+ await worker . StopAsync ( CancellationToken . None ) ;
299+
300+ // Assert
301+ await storage . Received ( ) . MarkJobAsCompleteAsync ( job , Arg . Any < CancellationToken > ( ) ) ;
302+ Assert . Single ( JobWithScopedDependency . ExecutionResults ) ;
303+ Assert . Equal ( "Processed: test-data" , JobWithScopedDependency . ExecutionResults [ 0 ] ) ;
304+ }
305+
306+ [ Fact ]
307+ public async Task ExecuteAsync_WhenMultipleJobsWithScopedServices_EachJobGetsOwnServiceInstance ( )
308+ {
309+ // Arrange
310+ JobWithScopedDependency . ExecutionResults . Clear ( ) ;
311+
312+ var job1 = new TestJob
313+ {
314+ TrackingId = Guid . NewGuid ( ) ,
315+ JobType = typeof ( JobWithScopedDependency ) . AssemblyQualifiedName ,
316+ MethodName = nameof ( JobWithScopedDependency . ExecuteWithScopedService ) ,
317+ ArgumentsJson = JsonConvert . SerializeObject ( new object [ ] { "job1" } ) ,
318+ ExecuteAfter = DateTime . UtcNow ,
319+ IsComplete = false
320+ } ;
321+
322+ var job2 = new TestJob
323+ {
324+ TrackingId = Guid . NewGuid ( ) ,
325+ JobType = typeof ( JobWithScopedDependency ) . AssemblyQualifiedName ,
326+ MethodName = nameof ( JobWithScopedDependency . ExecuteWithScopedService ) ,
327+ ArgumentsJson = JsonConvert . SerializeObject ( new object [ ] { "job2" } ) ,
328+ ExecuteAfter = DateTime . UtcNow ,
329+ IsComplete = false
330+ } ;
331+
332+ var storage = Substitute . For < IJobStorageProvider < TestJob > > ( ) ;
333+ storage . GetBatchAsync ( Arg . Any < JobSearchParams < TestJob > > ( ) , Arg . Any < CancellationToken > ( ) )
334+ . Returns ( new List < TestJob > { job1 , job2 } ) ;
335+
336+ var services = new ServiceCollection ( ) ;
337+ services . AddScoped < IScopedTestService , ScopedTestService > ( ) ;
338+ var serviceProvider = services . BuildServiceProvider ( ) ;
339+
340+ var logger = Substitute . For < ILogger < JobQueueWorker < TestJob > > > ( ) ;
341+ var config = Options . Create ( new GustoConfig { Concurrency = 2 , PollInterval = TimeSpan . FromMilliseconds ( 10 ) , BatchSize = 2 } ) ;
342+
343+ var worker = new JobQueueWorker < TestJob > ( serviceProvider , storage , config , logger ) ;
344+
345+ // Act
346+ var running = worker . StartAsync ( CancellationToken . None ) ;
347+ await JobWithScopedDependency . ExecutionSemaphore . WaitAsync ( TimeSpan . FromSeconds ( 5 ) ) ;
348+ await JobWithScopedDependency . ExecutionSemaphore . WaitAsync ( TimeSpan . FromSeconds ( 5 ) ) ;
349+ await worker . StopAsync ( CancellationToken . None ) ;
350+
351+ // Assert
352+ await storage . Received ( ) . MarkJobAsCompleteAsync ( job1 , Arg . Any < CancellationToken > ( ) ) ;
353+ await storage . Received ( ) . MarkJobAsCompleteAsync ( job2 , Arg . Any < CancellationToken > ( ) ) ;
354+ Assert . Equal ( 2 , JobWithScopedDependency . ExecutionResults . Count ) ;
355+ Assert . Contains ( "Processed: job1" , JobWithScopedDependency . ExecutionResults ) ;
356+ Assert . Contains ( "Processed: job2" , JobWithScopedDependency . ExecutionResults ) ;
357+ }
217358}
0 commit comments