Skip to content

Commit 07f769e

Browse files
einarmoozangoktan
andauthored
Base extractor (#522)
First take on a new base extractor class. Some more work goes into this in the next step, adding startup reporting and integrating things a bit more tightly, but this is a nice first draft that contains some of the core logic. The base extractor has a few roles, currently: - It contains the config object, in the future it will also handle changes to config revision and restart. - It acts as an error reporter, for raw errors. - It manages "monitored tasks", which are essentially just C# Tasks. The idea is that if something fails in a background process, this needs to be properly reported to the extractor so that it can take appropriate action. Background tasks have a name, can be cancelled independently, and can be expected to end or not, so if a task ends without an error, the extractor can take appropriate action. - It also contains shutdown logic, which is highly customizable, since extractor implementations may want to shut components down in a specific order. In general, different parts of the process are supposed to be independently cancelled, so that graceful shutdown is possible. As with the last iteration of the base extractor, it is intended to be used as a base class for user extractors. * Comments from review --------- Co-authored-by: Ozan Göktan <72358629+ozangoktan@users.noreply.github.com>
1 parent 8b44bc4 commit 07f769e

File tree

7 files changed

+701
-10
lines changed

7 files changed

+701
-10
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cognite.Extractor.Common;
7+
using Cognite.Extractor.Testing;
8+
using Cognite.Extractor.Utils;
9+
using Cognite.ExtractorUtils.Unstable;
10+
using Cognite.ExtractorUtils.Unstable.Tasks;
11+
using ExtractorUtils.Test.unit.Unstable;
12+
using Microsoft.Extensions.DependencyInjection;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace ExtractorUtils.Test.Unit.Unstable
17+
{
18+
class DummyConfig { }
19+
20+
class DummyExtractor : Cognite.ExtractorUtils.Unstable.BaseExtractor<DummyConfig>
21+
{
22+
public Action<ExtractorTaskScheduler> InitAction { get; set; }
23+
24+
public DummyExtractor(
25+
DummyConfig config,
26+
IServiceProvider provider,
27+
ExtractorTaskScheduler taskScheduler,
28+
IIntegrationSink sink,
29+
CogniteDestination destination = null) : base(config, provider,
30+
taskScheduler, sink, destination)
31+
{
32+
}
33+
34+
public void AddMonitoredTaskPub(Func<CancellationToken, Task> task, ExtractorTaskResult staticResult, string name)
35+
{
36+
AddMonitoredTask(task, staticResult, name);
37+
}
38+
39+
public async Task CancelMonitoredTaskAndWaitPub(string name)
40+
{
41+
await CancelMonitoredTaskAndWait(name);
42+
}
43+
44+
protected override Task InitTasks()
45+
{
46+
InitAction?.Invoke(TaskScheduler);
47+
return Task.CompletedTask;
48+
}
49+
}
50+
51+
public class BaseExtractorTest
52+
{
53+
private readonly ITestOutputHelper _output;
54+
public BaseExtractorTest(ITestOutputHelper output)
55+
{
56+
_output = output;
57+
}
58+
59+
private (DummyExtractor, DummySink) CreateExtractor()
60+
{
61+
var sink = new DummySink();
62+
var services = new ServiceCollection();
63+
services.AddSingleton(new DummyConfig());
64+
services.AddSingleton<IIntegrationSink>(sink);
65+
services.AddTestLogging(_output);
66+
services.AddTransient<ExtractorTaskScheduler>();
67+
services.AddTransient<DummyExtractor>();
68+
var provider = services.BuildServiceProvider();
69+
return (provider.GetRequiredService<DummyExtractor>(), sink);
70+
}
71+
72+
[Fact]
73+
public async Task TestBaseExtractor()
74+
{
75+
var (ext, sink) = CreateExtractor();
76+
var taskCompletedEvent = new ManualResetEvent(false);
77+
// Run the extractor and verify that scheduled tasks are run.
78+
ext.InitAction = (sched) =>
79+
{
80+
sched.AddScheduledTask(new RunQuickTask("task1", async (task, token) =>
81+
{
82+
await Task.Delay(100, token);
83+
taskCompletedEvent.Set();
84+
return new TaskUpdatePayload();
85+
}), true);
86+
};
87+
var runTask = ext.Start(CancellationToken.None);
88+
Assert.True(await CommonUtils.WaitAsync(taskCompletedEvent, TimeSpan.FromSeconds(2), CancellationToken.None));
89+
await ext.DisposeAsync();
90+
91+
Assert.Single(sink.TaskStart);
92+
Assert.Single(sink.TaskEnd);
93+
Assert.Empty(sink.Errors);
94+
}
95+
96+
[Fact]
97+
public async Task TestBaseExtractorInnerError()
98+
{
99+
var (ext, sink) = CreateExtractor();
100+
var taskCompletedEvent = new ManualResetEvent(false);
101+
// Run the extractor and verify that scheduled tasks are run.
102+
ext.InitAction = (sched) =>
103+
{
104+
sched.AddScheduledTask(new RunQuickTask("task1", async (task, token) =>
105+
{
106+
await Task.Delay(100, token);
107+
taskCompletedEvent.Set();
108+
throw new Exception("Inner error");
109+
}), true);
110+
};
111+
var runTask = ext.Start(CancellationToken.None);
112+
Assert.True(await CommonUtils.WaitAsync(taskCompletedEvent, TimeSpan.FromSeconds(2), CancellationToken.None));
113+
await ext.DisposeAsync();
114+
115+
Assert.Single(sink.TaskStart);
116+
Assert.Single(sink.TaskEnd);
117+
Assert.Equal(2, sink.Errors.Count);
118+
Assert.Equal("Inner error", sink.Errors[0].Description);
119+
}
120+
121+
[Fact]
122+
public async Task TestBaseExtractorMonitoredError()
123+
{
124+
var (ext, sink) = CreateExtractor();
125+
var runTask = ext.Start(CancellationToken.None);
126+
ext.AddMonitoredTaskPub(async t =>
127+
{
128+
await Task.Delay(100, t);
129+
throw new Exception("Monitored error");
130+
}, ExtractorTaskResult.Unexpected, "task1");
131+
var delayTask = Task.Delay(2000);
132+
Assert.NotEqual(delayTask, await Task.WhenAny(runTask, delayTask));
133+
Assert.Equal(2, sink.Errors.Count);
134+
Assert.Equal("Internal task task1 failed, restarting extractor: Monitored error", sink.Errors[0].Description);
135+
}
136+
137+
[Fact]
138+
public async Task TestBaseExtractorUnexpectedExit()
139+
{
140+
var (ext, sink) = CreateExtractor();
141+
var runTask = ext.Start(CancellationToken.None);
142+
ext.AddMonitoredTaskPub(async t =>
143+
{
144+
await Task.Delay(100, t);
145+
}, ExtractorTaskResult.Unexpected, "task1");
146+
var delayTask = Task.Delay(2000);
147+
Assert.NotEqual(delayTask, await Task.WhenAny(runTask, delayTask));
148+
Assert.Equal(2, sink.Errors.Count);
149+
Assert.Equal("Internal task task1 completed, but was not expected to stop, restarting extractor.", sink.Errors[0].Description);
150+
}
151+
152+
[Fact]
153+
public async Task TestCancelMonitoredTask()
154+
{
155+
var (ext, sink) = CreateExtractor();
156+
var runTask = ext.Start(CancellationToken.None);
157+
ext.AddMonitoredTaskPub(async t =>
158+
{
159+
while (!t.IsCancellationRequested)
160+
{
161+
await Task.Delay(100, t);
162+
}
163+
}, ExtractorTaskResult.Unexpected, "task1");
164+
var delayTask = Task.Delay(2000);
165+
Assert.NotEqual(delayTask, await Task.WhenAny(ext.CancelMonitoredTaskAndWaitPub("task1"), delayTask));
166+
Assert.NotEqual(delayTask, await Task.WhenAny(ext.Shutdown(), delayTask));
167+
Assert.NotEqual(delayTask, await Task.WhenAny(runTask, delayTask));
168+
169+
// Dispose should work, even if we're already shut-down.
170+
await ext.DisposeAsync();
171+
}
172+
}
173+
}

ExtractorUtils.Test/unit/Unstable/CheckInWorkerTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public async Task TestCheckInWorker()
8888
using var p = provider;
8989
using var source = new CancellationTokenSource();
9090
// Check that this doesn't crash, and properly cancels out at the end.
91-
var runTask = checkIn.Run(source.Token, Timeout.InfiniteTimeSpan);
91+
var runTask = checkIn.RunPeriodicCheckin(source.Token, Timeout.InfiniteTimeSpan);
9292
// First, we should very quickly report a checkin on the start of the run task...
9393
await TestUtils.WaitForCondition(() => _checkInCount == 1, 5);
9494

@@ -136,7 +136,7 @@ public async Task TestCheckInWorkerBatch()
136136
using var source = new CancellationTokenSource();
137137

138138
// Check that this doesn't crash, and properly cancels out at the end.
139-
var runTask = checkIn.Run(source.Token, Timeout.InfiniteTimeSpan);
139+
var runTask = checkIn.RunPeriodicCheckin(source.Token, Timeout.InfiniteTimeSpan);
140140
// First, we should very quickly report a checkin on the start of the run task...
141141
await TestUtils.WaitForCondition(() => _checkInCount == 1, 5);
142142

ExtractorUtils.Test/unit/Unstable/TaskSchedulerTest.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Cognite.Extractor.Common;
66
using Cognite.ExtractorUtils.Unstable.Tasks;
77
using CogniteSdk.Alpha;
8+
using Microsoft.Extensions.Logging.Abstractions;
89
using Xunit;
910
using Xunit.Abstractions;
1011

@@ -16,6 +17,11 @@ class DummySink : BaseErrorReporter, IIntegrationSink
1617
public List<(string, DateTime)> TaskStart { get; } = new();
1718
public List<(string, DateTime)> TaskEnd { get; } = new();
1819

20+
public Task Flush(CancellationToken token)
21+
{
22+
return Task.CompletedTask;
23+
}
24+
1925
public override ExtractorError NewError(ErrorLevel level, string description, string details = null, DateTime? now = null)
2026
{
2127
return new ExtractorError(level, description, this, details, null, now);
@@ -35,6 +41,11 @@ public void ReportTaskStart(string taskName, TaskUpdatePayload update = null, Da
3541
{
3642
TaskStart.Add((taskName, timestamp ?? DateTime.UtcNow));
3743
}
44+
45+
public async Task RunPeriodicCheckin(CancellationToken token, TimeSpan? interval = null)
46+
{
47+
while (!token.IsCancellationRequested) await Task.Delay(100000, token);
48+
}
3849
}
3950

4051
class RunQuickTask : BaseSchedulableTask
@@ -78,7 +89,7 @@ public TaskSchedulerTest(ITestOutputHelper output)
7889
public async Task TestScheduler()
7990
{
8091
var sink = new DummySink();
81-
using var sched = new ExtractorTaskScheduler(sink);
92+
using var sched = new ExtractorTaskScheduler(sink, new NullLogger<ExtractorTaskScheduler>());
8293
using var source = new CancellationTokenSource();
8394

8495
var running = sched.Run(source.Token);
@@ -157,7 +168,7 @@ public async Task TestScheduler()
157168
public async Task TestSchedulerErrors()
158169
{
159170
var sink = new DummySink();
160-
using var sched = new ExtractorTaskScheduler(sink);
171+
using var sched = new ExtractorTaskScheduler(sink, new NullLogger<ExtractorTaskScheduler>());
161172
using var source = new CancellationTokenSource();
162173

163174
var running = sched.Run(source.Token);
@@ -197,7 +208,7 @@ public async Task TestSchedulerErrors()
197208
public async Task TestSchedulerFatalError()
198209
{
199210
var sink = new DummySink();
200-
using var sched = new ExtractorTaskScheduler(sink);
211+
using var sched = new ExtractorTaskScheduler(sink, new NullLogger<ExtractorTaskScheduler>());
201212
using var source = new CancellationTokenSource();
202213

203214
var running = sched.Run(source.Token);

0 commit comments

Comments
 (0)