Skip to content

Commit 9de5381

Browse files
committed
fix: more scoped serviceproviders
The original fix, didnt actually fix the bug. this time ive included a test for it.
1 parent 69f0458 commit 9de5381

2 files changed

Lines changed: 224 additions & 10 deletions

File tree

src/ByteBard.GUSTO.JobQueue/JobQueueWorker.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,22 @@ private static void SignalBatchCompletedIfSet()
128128

129129
private async Task ProcessBatchCycleAsync(ParallelOptions parallelOptions, CancellationToken stoppingToken)
130130
{
131-
using var scope = _serviceProvider.CreateScope();
132-
var storage = scope.ServiceProvider.GetRequiredService<IJobStorageProvider<TStorageRecord>>();
131+
List<TStorageRecord> jobStorageRecords;
133132

134-
var jobs = await FetchPendingJobsAsync(storage, stoppingToken);
135-
var jobStorageRecords = jobs.ToList();
133+
using (var scope = _serviceProvider.CreateScope())
134+
{
135+
var storage = scope.ServiceProvider.GetRequiredService<IJobStorageProvider<TStorageRecord>>();
136+
var jobs = await FetchPendingJobsAsync(storage, stoppingToken);
137+
jobStorageRecords = jobs.ToList();
138+
}
136139

137140
if (!jobStorageRecords.Any())
138141
{
139142
await Task.Delay(_config.PollInterval, stoppingToken);
140143
return;
141144
}
142145

143-
await ProcessBatchAsync(jobStorageRecords, storage, parallelOptions);
146+
await ProcessBatchAsync(jobStorageRecords, parallelOptions);
144147
}
145148

146149
private async Task<IEnumerable<TStorageRecord>> FetchPendingJobsAsync(
@@ -161,7 +164,6 @@ private async Task<IEnumerable<TStorageRecord>> FetchPendingJobsAsync(
161164

162165
private async Task ProcessBatchAsync(
163166
List<TStorageRecord> jobStorageRecords,
164-
IJobStorageProvider<TStorageRecord> storage,
165167
ParallelOptions parallelOptions)
166168
{
167169
using var batchActivity = ActivitySource.StartActivity("ProcessBatch");
@@ -172,7 +174,7 @@ private async Task ProcessBatchAsync(
172174

173175
await Parallel.ForEachAsync(jobStorageRecords, parallelOptions, async (storedJob, ct) =>
174176
{
175-
await ExecuteJobAsync(storedJob, storage, ct);
177+
await ExecuteJobAsync(storedJob, ct);
176178
});
177179

178180
batchStopwatch.Stop();
@@ -182,7 +184,6 @@ await Parallel.ForEachAsync(jobStorageRecords, parallelOptions, async (storedJob
182184

183185
private async Task ExecuteJobAsync(
184186
TStorageRecord storedJob,
185-
IJobStorageProvider<TStorageRecord> storage,
186187
CancellationToken ct)
187188
{
188189
using var jobActivity = ActivitySource.StartActivity("ExecuteJob");
@@ -191,15 +192,18 @@ private async Task ExecuteJobAsync(
191192
jobActivity?.SetTag("job.method", storedJob.MethodName);
192193

193194
var jobStopwatch = Stopwatch.StartNew();
195+
196+
using var scope = _serviceProvider.CreateScope();
197+
var storage = scope.ServiceProvider.GetRequiredService<IJobStorageProvider<TStorageRecord>>();
198+
194199
try
195200
{
196201
var jobType = Type.GetType(storedJob.JobType);
197202
var arguments = JsonConvert.DeserializeObject<object[]>(storedJob.ArgumentsJson, _settings);
198-
using var scope = _serviceProvider.CreateScope();
199203
var jobInstance = ActivatorUtilities.CreateInstance(scope.ServiceProvider, jobType);
200204
var method = jobType.GetMethod(storedJob.MethodName);
201205
await (Task)method.Invoke(jobInstance, arguments);
202-
206+
203207
await storage.MarkJobAsCompleteAsync(storedJob, ct);
204208
RecordJobSuccess(storedJob, jobStopwatch, jobActivity);
205209
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
using System.Collections.Concurrent;
2+
using ByteBard.GUSTO;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Hosting;
6+
7+
public class ServiceProviderScopeTests
8+
{
9+
[Fact]
10+
public async Task StorageProvider_UsedConcurrently_ThrowsDbContextConcurrencyError()
11+
{
12+
// This test demonstrates a bug with scoped services (particular dbcontext usage): when multiple jobs run in parallel,
13+
// they share the same storage provider instance (and its scoped DbContext),
14+
// causing EF Core to throw a concurrency exception
15+
16+
FakeDbContext.SeenHashes.Clear();
17+
FakeDbContext.DetectedConcurrentAccess = false;
18+
FakeJobService.Completed.Clear();
19+
20+
var services = new ServiceCollection();
21+
var inMemorySettings = new Dictionary<string, string?>
22+
{
23+
["Gusto:BatchSize"] = "5",
24+
["Gusto:Concurrency"] = "5",
25+
["Gusto:PollInterval"] = "00:00:01"
26+
};
27+
28+
IConfiguration configuration = new ConfigurationBuilder()
29+
.AddInMemoryCollection(inMemorySettings)
30+
.Build();
31+
services.AddLogging();
32+
services.AddGusto<FakeJobRecord, InMemoryJobStorageProvider>(configuration);
33+
34+
services.AddScoped<IFakeDbContext, FakeDbContext>();
35+
services.AddScoped<FakeJobService>();
36+
37+
var sp = services.BuildServiceProvider();
38+
var jobQueue = sp.GetRequiredService<JobQueue<FakeJobRecord>>();
39+
var worker = sp.GetRequiredService<IHostedService>();
40+
41+
// Arrange barriers
42+
JobQueueWorker<FakeJobRecord>.BatchStartBarrier = new TaskCompletionSource();
43+
JobQueueWorker<FakeJobRecord>.BatchCompletedBarrier = new TaskCompletionSource();
44+
45+
for (int i = 0; i < 5; i++)
46+
{
47+
await jobQueue.EnqueueAsync<FakeJobService>(
48+
svc => svc.RunAsync($"job-{i}"));
49+
}
50+
51+
// Act
52+
await worker.StartAsync(CancellationToken.None);
53+
JobQueueWorker<FakeJobRecord>.BatchStartBarrier.SetResult();
54+
await JobQueueWorker<FakeJobRecord>.BatchCompletedBarrier.Task;
55+
await worker.StopAsync(CancellationToken.None);
56+
57+
Assert.False(FakeDbContext.DetectedConcurrentAccess,
58+
"Concurrent DbContext access was detected! " +
59+
"Multiple parallel jobs are sharing the same storage provider instance " +
60+
"(with the same scoped DbContext). Each job should get its own scope.");
61+
}
62+
63+
}
64+
public interface IFakeDbContext
65+
{
66+
Task DoWorkAsync(string jobId);
67+
}
68+
69+
public class FakeDbContext : IFakeDbContext
70+
{
71+
public static ConcurrentBag<int> SeenHashes = new();
72+
public static volatile bool DetectedConcurrentAccess = false;
73+
74+
private int _inUse;
75+
76+
public async Task DoWorkAsync(string jobId)
77+
{
78+
SeenHashes.Add(GetHashCode());
79+
if (Interlocked.Exchange(ref _inUse, 1) == 1)
80+
{
81+
DetectedConcurrentAccess = true;
82+
throw new InvalidOperationException(
83+
$"A second operation was started on this context instance before a previous operation completed. " +
84+
$"DbContext {GetHashCode()} already in use! This simulates EF Core's concurrency detection.");
85+
}
86+
87+
try { await Task.Delay(50); }
88+
finally { Interlocked.Exchange(ref _inUse, 0); }
89+
}
90+
}
91+
92+
93+
public class FakeJobService
94+
{
95+
private readonly IFakeDbContext _db;
96+
public static ConcurrentBag<string> Completed = new();
97+
98+
public FakeJobService(IFakeDbContext db) => _db = db;
99+
100+
public async Task RunAsync(string id)
101+
{
102+
await _db.DoWorkAsync(id);
103+
Completed.Add(id);
104+
}
105+
}
106+
107+
public class InMemoryJobStorageProvider : IJobStorageProvider<FakeJobRecord>
108+
{
109+
private static readonly List<FakeJobRecord> _jobs = new();
110+
private readonly object _lock = new();
111+
private readonly IFakeDbContext _dbContext;
112+
113+
public InMemoryJobStorageProvider(IFakeDbContext dbContext)
114+
{
115+
_dbContext = dbContext;
116+
}
117+
118+
public Task StoreJobAsync(FakeJobRecord jobStorageRecord, CancellationToken cancellationToken)
119+
{
120+
lock (_lock)
121+
{
122+
_jobs.Add(jobStorageRecord);
123+
}
124+
return Task.CompletedTask;
125+
}
126+
127+
public Task<IEnumerable<FakeJobRecord>> GetBatchAsync(JobSearchParams<FakeJobRecord> parameters, CancellationToken cancellationToken)
128+
{
129+
lock (_lock)
130+
{
131+
var results = _jobs.Where(j => j.Status == JobStatus.Ready).Where(parameters.Match.Compile()).Take(parameters.Limit);
132+
return Task.FromResult(results);
133+
}
134+
}
135+
136+
public async Task MarkJobAsCompleteAsync(FakeJobRecord jobStorageRecord, CancellationToken cancellationToken)
137+
{
138+
// Simulate async database operation like EF Core would do
139+
await _dbContext.DoWorkAsync($"MarkComplete-{jobStorageRecord.TrackingId}");
140+
141+
lock (_lock)
142+
{
143+
var job = _jobs.FirstOrDefault(j => j.TrackingId == jobStorageRecord.TrackingId);
144+
if (job != null)
145+
{
146+
job.IsComplete = true;
147+
148+
ProcessContinuations(job.TrackingId, successful: true);
149+
}
150+
}
151+
}
152+
153+
public Task CancelJobAsync(Guid trackingId, CancellationToken cancellationToken)
154+
{
155+
lock (_lock)
156+
{
157+
_jobs.RemoveAll(j => j.TrackingId == trackingId);
158+
}
159+
return Task.CompletedTask;
160+
}
161+
162+
public async Task OnHandlerExecutionFailureAsync(FakeJobRecord jobStorageRecord, Exception exception, CancellationToken cancellationToken)
163+
{
164+
// Simulate async database operation like EF Core would do
165+
await _dbContext.DoWorkAsync($"HandleFailure-{jobStorageRecord.TrackingId}");
166+
167+
lock (_lock)
168+
{
169+
jobStorageRecord.ExecuteAfter = DateTime.UtcNow.AddMinutes(5);
170+
}
171+
}
172+
173+
private void ProcessContinuations(Guid completedJobId, bool successful)
174+
{
175+
var continuations = _jobs.Where(j =>
176+
j.Status == JobStatus.WaitingForParent &&
177+
j.ParentJobId == completedJobId).ToList();
178+
179+
foreach (var continuation in continuations)
180+
{
181+
if (successful)
182+
{
183+
continuation.Status = JobStatus.Ready;
184+
continuation.ExecuteAfter = DateTime.UtcNow;
185+
186+
// don't forget to store the change if moving to persisted storage.
187+
}
188+
}
189+
}
190+
}
191+
192+
public class FakeJobRecord : IJobStorageRecord
193+
{
194+
public Guid TrackingId { get; set; }
195+
public DateTime CreatedOn { get; set; }
196+
public DateTime? ExecuteAfter { get; set; }
197+
public DateTime? ExpireOn { get; set; }
198+
public bool IsComplete { get; set; }
199+
public string JobType { get; set; }
200+
public string MethodName { get; set; }
201+
public string ArgumentsJson { get; set; }
202+
public JobStatus Status { get; set; }
203+
public Guid ParentJobId { get; set; }
204+
}
205+
206+
public enum JobStatus
207+
{
208+
Ready,
209+
WaitingForParent,
210+
}

0 commit comments

Comments
 (0)