-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathCheckInWorker.cs
More file actions
409 lines (373 loc) · 17.2 KB
/
CheckInWorker.cs
File metadata and controls
409 lines (373 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Common;
using CogniteSdk;
using CogniteSdk.Alpha;
using Microsoft.Extensions.Logging;
namespace Cognite.Extractor.Utils.Unstable.Tasks
{
/// <summary>
/// Worker for submitting periodic check-ins to the integrations API.
/// </summary>
public class CheckInWorker : IIntegrationSink
{
private readonly object _lock = new object();
private readonly Dictionary<string, ErrorWithTask> _errors = new Dictionary<string, ErrorWithTask>();
private List<TaskUpdate> _taskUpdates = new List<TaskUpdate>();
private readonly Client _client;
private readonly string _integrationId;
private readonly ILogger _logger;
private const int MAX_ERRORS_PER_CHECKIN = 1000;
private const int MAX_TASK_UPDATES_PER_CHECKIN = 1000;
private bool _isRunning;
private int? _activeRevision;
private readonly Action<int> _onRevisionChanged;
private SemaphoreSlim _flushLock = new SemaphoreSlim(1);
private bool _retryStartup;
private bool _hasReportedStartup;
private Random _random = new Random();
const int STARTUP_BACKOFF_SECONDS = 30;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="integrationId">ID of the integration the worker should write to.</param>
/// <param name="logger">Internal logger.</param>
/// <param name="client">Cognite client</param>
/// <param name="onRevisionChanged">Callback to call when the remote configuration revision is updated.</param>
/// <param name="activeRevision">Currently active config revision. Used to know whether the extractor has received a new
/// config revision since the last check-in. Null indiciates that the extractor is running local config,
/// and should not restart based on changes to remote config.</param>
/// <param name="retryStartup">Whether to retry the startup request if it fails,
/// beyond normal retries. If this is `true`, the check-in worker will retry startup requests indefinitely,
/// instead of raising an exception.</param>
public CheckInWorker(
string integrationId,
ILogger logger,
Client client,
Action<int> onRevisionChanged,
int? activeRevision,
bool retryStartup = false
)
{
_client = client;
_logger = logger;
_integrationId = integrationId;
_onRevisionChanged = onRevisionChanged;
_activeRevision = activeRevision;
_retryStartup = retryStartup;
}
/// <summary>
/// Start running the check-in worker.
///
/// This may only be called once.
/// </summary>
/// <param name="token">Cancellation token</param>
/// <param name="startupPayload">Payload to send to the startup endpoint before beginning to
/// report periodic check-ins..</param>
/// <param name="interval">Interval, defaults to 30 seconds.</param>
public async Task RunPeriodicCheckIn(CancellationToken token, StartupRequest startupPayload, TimeSpan? interval = null)
{
if (startupPayload is null) throw new ArgumentNullException(nameof(startupPayload));
lock (_lock)
{
if (_isRunning) throw new InvalidOperationException("Attempted to start a check-in worker that was already running");
_isRunning = true;
}
// Make sure the external ID in the startup payload matches the external ID of the target integration.
startupPayload.ExternalId = _integrationId;
// Hold the flush lock while reporting startup, to ensure that we don't start reporting check-ins
// before the startup request has been sent.
// With this, calls to flush will wait until the startup request has been sent,
// or startup fails.
// This keeps us from reporting events before the startup, in case the extractor is started offline.
// In this case, we would like to potentially report a startup and anything that has happened
// while the connection to CDF was down.
try
{
await _flushLock.WaitAsync(token).ConfigureAwait(false);
}
catch
{
// Should only happen if we are cancelled while waiting.
if (token.IsCancellationRequested) return;
throw;
}
try
{
while (!token.IsCancellationRequested)
{
try
{
await ReportStartup(startupPayload, token).ConfigureAwait(false);
_hasReportedStartup = true;
break;
}
catch (Exception ex)
{
if (!_retryStartup) throw;
// Retry about every 30 seconds, but with some jitter so we don't
// end up with too bursty retries.
var toDelay = _random.Next(STARTUP_BACKOFF_SECONDS / 2, STARTUP_BACKOFF_SECONDS * 3 / 2);
_logger.LogError("Failed to report startup, retrying after {Time} seconds: {Err}", toDelay, ex.Message);
await Task.Delay(TimeSpan.FromSeconds(toDelay), token).ConfigureAwait(false);
continue;
}
}
}
finally
{
_flushLock.Release();
}
var rinterval = interval ?? TimeSpan.FromSeconds(30);
while (!token.IsCancellationRequested)
{
var waitTask = Task.Delay(rinterval, token);
try
{
await Flush(token).ConfigureAwait(false);
await waitTask.ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
}
_isRunning = false;
}
/// <summary>
/// Report a check-in immediately, flushing the cache.
///
/// This should be called after terminating everything else, to report a final check-in.
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async Task Flush(CancellationToken token)
{
// Ensure that only one flush is running at a time,
// importantly this also means that if we call flush, we will wait for
// any running flushes to complete, meaning that once this method returns,
// we are guaranteed to have flushed all updates that were not yet sent
// when the method was called.
try
{
await _flushLock.WaitAsync(token).ConfigureAwait(false);
}
catch
{
// Only really happens if we're cancelled while waiting.
return;
}
// Reporting check-in is safely behind locks, so we can just call report.
try
{
await ReportCheckIn(token).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during check-in: {Message}", ex.Message);
}
finally
{
_flushLock.Release();
}
}
private void RequeueCheckIn(IEnumerable<ErrorWithTask> errors, IEnumerable<TaskUpdate> tasks)
{
lock (_lock)
{
foreach (var err in errors)
{
if (!_errors.ContainsKey(err.ExternalId)) _errors.Add(err.ExternalId, err);
}
_taskUpdates.AddRange(tasks);
}
}
private async Task TryWriteCheckIn(IEnumerable<ErrorWithTask> errors, IEnumerable<TaskUpdate> tasks, CancellationToken token)
{
try
{
var response = await _client.Alpha.Integrations.CheckInAsync(new CheckInRequest
{
ExternalId = _integrationId,
TaskEvents = tasks,
Errors = errors,
}, token).ConfigureAwait(false);
HandleCheckInResponse(response);
}
catch (Exception ex)
{
if (ex is ResponseException rex && (rex.Code == 400 || rex.Code == 404))
{
_logger.LogError(rex, "CheckIn failed with a 400 status code, this is a bug! Dropping current check-in batch and continuing.");
return;
}
// If pushing the update failed, keep the updates to try again later.
RequeueCheckIn(errors, tasks);
throw;
}
}
private async Task ReportCheckIn(CancellationToken token)
{
List<ErrorWithTask> newErrors;
List<TaskUpdate> taskUpdates;
lock (_lock)
{
if (!_hasReportedStartup)
{
newErrors = _errors.Values.Where(e => e.Task == null).ToList();
// No point checking in pre-startup if there are no errors.
if (newErrors.Count == 0)
{
_logger.LogInformation("Check-in worker has not reported startup yet, skipping check-in.");
return;
}
_logger.LogWarning("Check-in worker has not reported startup yet, only reporting errors not associated with a task.");
foreach (var err in newErrors)
{
_errors.Remove(err.ExternalId);
}
taskUpdates = new List<TaskUpdate>();
}
else
{
newErrors = new List<ErrorWithTask>(_errors.Values);
foreach (var err in _errors.Values)
{
// Modifying err since it's an object reference in the dictionary, so changes will be reflected when we write the check-in,
// but we want to log the full error before truncating.
_logger.LogInformation("Error: {ExternalId}, Level: {Level}, Description: {Description}, Details: {Details}, Task: {Task}, StartTime: {StartTime}, EndTime: {EndTime}",
err.ExternalId, err.Level, err.Description, err.Details, err.Task, err.StartTime, err.EndTime);
if (err.Description != null && err.Description.Length > 5000)
{
_logger.LogWarning("Truncating description for error {ExternalId} to 5,000 characters. Original length was {OriginalLength}", err.ExternalId, err.Description.Length);
err.Description = err.Description.Substring(0, 5000);
}
else if (err.Details != null && err.Details.Length > 5000)
{
_logger.LogWarning("Error {ExternalId} has details exceeding 5,000 characters, truncating. Original length was {OriginalLength}", err.ExternalId, err.Details.Length);
err.Details = err.Details.Substring(0, 5000);
}
}
_errors.Clear();
taskUpdates = _taskUpdates;
_taskUpdates = new List<TaskUpdate>();
}
}
newErrors.Sort((a, b) =>
{
long? aTime = a.EndTime ?? a.StartTime;
long? bTime = b.EndTime ?? b.StartTime;
// Handle null timestamps by treating them as 0 (earliest possible time)
return (aTime ?? 0).CompareTo(bTime ?? 0);
});
taskUpdates.Sort((a, b) => a.Timestamp.CompareTo(b.Timestamp));
while (!token.IsCancellationRequested)
{
if (newErrors.Count <= MAX_ERRORS_PER_CHECKIN && taskUpdates.Count <= MAX_TASK_UPDATES_PER_CHECKIN)
{
var errorsToWrite = newErrors;
var tasksToWrite = taskUpdates;
newErrors = new List<ErrorWithTask>();
taskUpdates = new List<TaskUpdate>();
await TryWriteCheckIn(errorsToWrite, tasksToWrite, token).ConfigureAwait(false);
break;
}
var errIdx = 0;
var taskIdx = 0;
// In the (unlikely) case that we have more than 1000 updates, we need to send them in order of the time where they occured,
// roughly.
while ((errIdx < newErrors.Count || taskIdx < taskUpdates.Count) && errIdx < MAX_ERRORS_PER_CHECKIN && taskIdx < MAX_TASK_UPDATES_PER_CHECKIN)
{
var taskTime = taskUpdates.ElementAtOrDefault(taskIdx)?.Timestamp ?? long.MaxValue;
var err = newErrors.ElementAtOrDefault(errIdx);
var errTime = err?.EndTime ?? err?.StartTime ?? long.MaxValue;
if (taskTime <= errTime)
{
taskIdx++;
}
if (errTime <= taskTime)
{
errIdx++;
}
}
var errorsBatch = newErrors.Take(errIdx).ToList();
var taskBatch = taskUpdates.Take(taskIdx).ToList();
if (errIdx > 0) newErrors = newErrors.Skip(errIdx).ToList();
if (taskIdx > 0) taskUpdates = taskUpdates.Skip(taskIdx).ToList();
await TryWriteCheckIn(errorsBatch, taskBatch, token).ConfigureAwait(false);
if (newErrors.Count == 0 && taskUpdates.Count == 0) break;
}
// If the task was cancelled, re-queue any unsubmitted errors and updates.
// This way, we don't lose any updates, and can push them when doing the final flush.
if (token.IsCancellationRequested)
{
RequeueCheckIn(newErrors, taskUpdates);
}
}
private async Task ReportStartup(StartupRequest request, CancellationToken token)
{
var response = await _client.Alpha.Integrations.StartupAsync(request, token).ConfigureAwait(false);
HandleCheckInResponse(response);
}
/// <inheritdoc />
public void ReportError(ExtractorError error)
{
if (error == null) throw new ArgumentNullException(nameof(error));
lock (_lock)
{
_errors[error.ExternalId] = error.ToSdk();
}
}
/// <inheritdoc />
public void ReportTaskEnd(string taskName, TaskUpdatePayload? update = null, DateTime? timestamp = null)
{
if (string.IsNullOrEmpty(taskName)) throw new ArgumentNullException(nameof(taskName));
lock (_lock)
{
_taskUpdates.Add(new TaskUpdate
{
Type = TaskUpdateType.ended,
Name = taskName,
Timestamp = (timestamp ?? DateTime.UtcNow).ToUnixTimeMilliseconds(),
Message = update?.Message,
});
}
}
/// <inheritdoc />
public void ReportTaskStart(string taskName, TaskUpdatePayload? update = null, DateTime? timestamp = null)
{
if (string.IsNullOrEmpty(taskName)) throw new ArgumentNullException(nameof(taskName));
lock (_lock)
{
_taskUpdates.Add(new TaskUpdate
{
Type = TaskUpdateType.started,
Name = taskName,
Timestamp = (timestamp ?? DateTime.UtcNow).ToUnixTimeMilliseconds(),
Message = update?.Message,
});
}
}
private void HandleCheckInResponse(CheckInResponse response)
{
if (response.LastConfigRevision != _activeRevision && response.LastConfigRevision != null)
{
if (_activeRevision != null && _onRevisionChanged != null)
{
_logger.LogInformation("Remote config revision changed {From} -> {To}", _activeRevision, response.LastConfigRevision);
_onRevisionChanged(response.LastConfigRevision.Value);
}
else if (_activeRevision != null)
{
_logger.LogInformation(
"Remote config revision changed {From} -> {To}. The extractor is currently using local configuration and will need to be manually restarted and configured to use remote config for the new config to take effect.",
_activeRevision, response.LastConfigRevision);
}
_activeRevision = response.LastConfigRevision.Value;
}
}
}
}