From 3f1ef438dab7b2e5b88f93c7c3cdae4302cc4645 Mon Sep 17 00:00:00 2001 From: Toshad Salwekar Date: Thu, 9 Apr 2026 11:43:11 +0530 Subject: [PATCH 1/5] Restart extractor on config change --- .../unit/Unstable/RuntimeTest.cs | 4 ++++ ExtractorUtils/Unstable/Runtime/Runtime.cs | 20 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs index 50d9a03e..f1b7546f 100644 --- a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs +++ b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs @@ -187,6 +187,9 @@ private ExtractorRuntimeBuilder CreateMockRuntimeBu public async Task TestRuntimeRestartNewConfig() { var builder = CreateMockRuntimeBuilder(); + // Restart policy won't be the default "Always" in customer envs, but we should + // still restart on config change. + builder.RestartPolicy = ExtractorRestartPolicy.OnError; using var evt = new ManualResetEventSlim(false); @@ -199,6 +202,7 @@ public async Task TestRuntimeRestartNewConfig() extractor = ext; }; }; + using var source = new CancellationTokenSource(); diff --git a/ExtractorUtils/Unstable/Runtime/Runtime.cs b/ExtractorUtils/Unstable/Runtime/Runtime.cs index 8928b9da..f72527be 100644 --- a/ExtractorUtils/Unstable/Runtime/Runtime.cs +++ b/ExtractorUtils/Unstable/Runtime/Runtime.cs @@ -36,6 +36,11 @@ enum ExtractorRunResult /// The extractor crashed. /// Error, + /// + /// The extractor was stopped with a clean shutdown. + /// But we need to restart it (possibly due to a revision change). + /// + RestartRequired } /// @@ -138,6 +143,7 @@ public async Task Run() } else if (result == ExtractorRunResult.CleanShutdown) { + _activeLogger.LogInformation("Extractor stopped cleanly with policy {Policy}", _params.RestartPolicy); // Shut down, if the extractor is configured to only restart on error. if (_params.RestartPolicy != ExtractorRestartPolicy.Always) { @@ -147,6 +153,11 @@ public async Task Run() // Otherwise, immediately restart. backoff = 0; } + else if (result == ExtractorRunResult.RestartRequired) + { + _activeLogger.LogInformation("Extractor stopped cleanly with restart required, restarting with backoff"); + backoff = 1; + } if (backoff == 0) { @@ -328,6 +339,7 @@ private async Task BuildAndRunExtractor(ServiceProvider prov { using var internalTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_source.Token); TExtractor extractor; + var shouldRestart = false; try { if (_params.AddMetrics) @@ -386,6 +398,7 @@ private async Task BuildAndRunExtractor(ServiceProvider prov { _activeLogger.LogInformation("Revision changed, reloading config"); internalTokenSource.Cancel(); + shouldRestart = true; await extractorTask.ConfigureAwait(false); } @@ -394,6 +407,7 @@ private async Task BuildAndRunExtractor(ServiceProvider prov { ExceptionDispatchInfo.Capture(extractorTask.Exception).Throw(); } + } catch (OperationCanceledException) when (internalTokenSource.IsCancellationRequested) { @@ -415,7 +429,11 @@ private async Task BuildAndRunExtractor(ServiceProvider prov return ExtractorRunResult.Error; } - + if (shouldRestart) + { + _activeLogger.LogInformation("Extractor stopped cleanly with policy {Policy}, restart is required", _params.RestartPolicy); + return ExtractorRunResult.RestartRequired; + } return ExtractorRunResult.CleanShutdown; } From b1b4b371a281ac6f71b64f814a301b29d764c35a Mon Sep 17 00:00:00 2001 From: Toshad Salwekar Date: Thu, 9 Apr 2026 12:23:48 +0530 Subject: [PATCH 2/5] lint --- ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs index f1b7546f..d48c2569 100644 --- a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs +++ b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs @@ -202,7 +202,6 @@ public async Task TestRuntimeRestartNewConfig() extractor = ext; }; }; - using var source = new CancellationTokenSource(); From fdaf2942c1540353ffdddd7e95fed9aaf684c96a Mon Sep 17 00:00:00 2001 From: Toshad Salwekar Date: Thu, 9 Apr 2026 12:41:59 +0530 Subject: [PATCH 3/5] fix test timeout --- ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs index d48c2569..48efaa54 100644 --- a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs +++ b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs @@ -180,6 +180,9 @@ private ExtractorRuntimeBuilder CreateMockRuntimeBu builder.ExternalServices = services; builder.AddLogger = false; + // Reduce backoff times for faster tests + builder.BackoffBase = 50; + return builder; } From d331f2b586ccc7fdd07a4b6ee5b87cd0aaccb50f Mon Sep 17 00:00:00 2001 From: Toshad Salwekar Date: Fri, 10 Apr 2026 17:35:45 +0530 Subject: [PATCH 4/5] Add never restart test --- .../unit/Unstable/RuntimeTest.cs | 68 +++++++++++++++---- .../Unstable/Configuration/ConfigSource.cs | 8 +-- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs index 48efaa54..da821f12 100644 --- a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs +++ b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs @@ -186,16 +186,8 @@ private ExtractorRuntimeBuilder CreateMockRuntimeBu return builder; } - [Fact(Timeout = 5000)] - public async Task TestRuntimeRestartNewConfig() + private async Task<(DummyExtractor, ExtractorRuntime)> createExtractorForRestartTest(ManualResetEventSlim evt, ExtractorRuntimeBuilder builder, CancellationToken token) { - var builder = CreateMockRuntimeBuilder(); - // Restart policy won't be the default "Always" in customer envs, but we should - // still restart on config change. - builder.RestartPolicy = ExtractorRestartPolicy.OnError; - - using var evt = new ManualResetEventSlim(false); - DummyExtractor extractor = null; builder.OnCreateExtractor = (_, ext) => { @@ -206,15 +198,27 @@ public async Task TestRuntimeRestartNewConfig() }; }; - using var source = new CancellationTokenSource(); - - var runtime = await builder.MakeRuntime(source.Token); + var runtime = await builder.MakeRuntime(token); var runTask = runtime.Run(); - Assert.True(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(5), source.Token)); + Assert.True(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(5), token)); Assert.NotNull(extractor); + return (extractor, runtime); + } + + [Fact(Timeout = 5000)] + public async Task TestRuntimeRestartNewConfig() + { + var builder = CreateMockRuntimeBuilder(); + // Restart policy won't be the default "Always" in customer envs, but we should + // still restart on config change. + builder.RestartPolicy = ExtractorRestartPolicy.OnError; + + using var evt = new ManualResetEventSlim(false); + using var source = new CancellationTokenSource(); + + var (extractor, runtime) = await createExtractorForRestartTest(evt, builder, source.Token); - // Wait for a startup to be reported. await TestUtils.WaitForCondition(() => _startupCount == 1, 5); // Update the config revision and the extractor should be restarted. @@ -239,9 +243,43 @@ public async Task TestRuntimeRestartNewConfig() // Finally, shut down the extractor. source.Cancel(); - await runTask; + await runtime.Run(); } + [Fact(Timeout = 5000)] + public async Task TestNeverRestartPolicy() + { + var builder = CreateMockRuntimeBuilder(); + // Restart policy won't be the default "Always" in customer envs, but we should + // still restart on config change. + builder.RestartPolicy = ExtractorRestartPolicy.Never; + + using var evt = new ManualResetEventSlim(false); + using var source = new CancellationTokenSource(); + + var (extractor, runtime) = await createExtractorForRestartTest(evt, builder, source.Token); + // Wait for a startup to be reported. + await TestUtils.WaitForCondition(() => _startupCount == 1, 5); + + // Update the config revision and the extractor should be restarted. + var oldExtractor = extractor; + extractor = null; + evt.Reset(); + _responseRevision = new ConfigRevision + { + Revision = 2, + Config = "foo: baz" + }; + + // Flush the sink to speed things along + await oldExtractor.Sink.Flush(source.Token); + + // Extractor should not be restarted, so we should time out waiting for the event to be set. + Assert.False(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(1), source.Token)); + Assert.Null(extractor); + } + + [Fact(Timeout = 5000)] public async Task TestRuntimeRestartExtractorCrash() { diff --git a/ExtractorUtils/Unstable/Configuration/ConfigSource.cs b/ExtractorUtils/Unstable/Configuration/ConfigSource.cs index afe8c052..2035774f 100644 --- a/ExtractorUtils/Unstable/Configuration/ConfigSource.cs +++ b/ExtractorUtils/Unstable/Configuration/ConfigSource.cs @@ -155,7 +155,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep var msg = ex.Message; if (_lastErrorMsg != msg) { - reporter.Fatal($"Fatally failed to load configuration file from {_configFilePath}: {ex.Message}"); + reporter.Fatal($"Fatally failed to load configuration file {_configFilePath}: {ex.Message}", ex.Message + "\n" + ex.StackTrace); } _lastErrorMsg = msg; throw; @@ -170,7 +170,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep { if (isNewConfig) { - reporter.Fatal($"Failed to parse configuration file from {_configFilePath}: {ex.Message}"); + reporter.Fatal($"Failed to parse configuration file {_configFilePath}: {ex.Message}", ex.Message + "\n" + ex.StackTrace); } throw; } @@ -256,7 +256,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep var msg = ex.Message; if (_lastErrorMsg != msg) { - reporter.Fatal($"Fatally failed to load configuration file from CDF: {msg}"); + reporter.Fatal($"Fatally failed to load0 configuration file from CDF: {msg}", msg + "\n" + ex.StackTrace); } _lastErrorMsg = msg; throw; @@ -275,7 +275,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep { if (isNewConfig) { - reporter.Fatal($"Failed to parse configuration file from CDF: {ex.Message}"); + reporter.Fatal($"Failed to parse configuration file from CDF: {ex.Message}", ex.Message + "\n" + ex.StackTrace); } throw; } From 19b2c5d0e3e7e2cddce52b13498bfcf2be6c4aa3 Mon Sep 17 00:00:00 2001 From: Toshad Salwekar Date: Fri, 10 Apr 2026 18:09:44 +0530 Subject: [PATCH 5/5] fix test --- .../unit/Unstable/RuntimeTest.cs | 68 ++++--------------- .../Unstable/Configuration/ConfigSource.cs | 8 +-- 2 files changed, 19 insertions(+), 57 deletions(-) diff --git a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs index da821f12..48efaa54 100644 --- a/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs +++ b/ExtractorUtils.Test/unit/Unstable/RuntimeTest.cs @@ -186,8 +186,16 @@ private ExtractorRuntimeBuilder CreateMockRuntimeBu return builder; } - private async Task<(DummyExtractor, ExtractorRuntime)> createExtractorForRestartTest(ManualResetEventSlim evt, ExtractorRuntimeBuilder builder, CancellationToken token) + [Fact(Timeout = 5000)] + public async Task TestRuntimeRestartNewConfig() { + var builder = CreateMockRuntimeBuilder(); + // Restart policy won't be the default "Always" in customer envs, but we should + // still restart on config change. + builder.RestartPolicy = ExtractorRestartPolicy.OnError; + + using var evt = new ManualResetEventSlim(false); + DummyExtractor extractor = null; builder.OnCreateExtractor = (_, ext) => { @@ -198,27 +206,15 @@ private ExtractorRuntimeBuilder CreateMockRuntimeBu }; }; - var runtime = await builder.MakeRuntime(token); + using var source = new CancellationTokenSource(); + + var runtime = await builder.MakeRuntime(source.Token); var runTask = runtime.Run(); - Assert.True(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(5), token)); + Assert.True(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(5), source.Token)); Assert.NotNull(extractor); - return (extractor, runtime); - } - - [Fact(Timeout = 5000)] - public async Task TestRuntimeRestartNewConfig() - { - var builder = CreateMockRuntimeBuilder(); - // Restart policy won't be the default "Always" in customer envs, but we should - // still restart on config change. - builder.RestartPolicy = ExtractorRestartPolicy.OnError; - - using var evt = new ManualResetEventSlim(false); - using var source = new CancellationTokenSource(); - - var (extractor, runtime) = await createExtractorForRestartTest(evt, builder, source.Token); + // Wait for a startup to be reported. await TestUtils.WaitForCondition(() => _startupCount == 1, 5); // Update the config revision and the extractor should be restarted. @@ -243,43 +239,9 @@ public async Task TestRuntimeRestartNewConfig() // Finally, shut down the extractor. source.Cancel(); - await runtime.Run(); - } - - [Fact(Timeout = 5000)] - public async Task TestNeverRestartPolicy() - { - var builder = CreateMockRuntimeBuilder(); - // Restart policy won't be the default "Always" in customer envs, but we should - // still restart on config change. - builder.RestartPolicy = ExtractorRestartPolicy.Never; - - using var evt = new ManualResetEventSlim(false); - using var source = new CancellationTokenSource(); - - var (extractor, runtime) = await createExtractorForRestartTest(evt, builder, source.Token); - // Wait for a startup to be reported. - await TestUtils.WaitForCondition(() => _startupCount == 1, 5); - - // Update the config revision and the extractor should be restarted. - var oldExtractor = extractor; - extractor = null; - evt.Reset(); - _responseRevision = new ConfigRevision - { - Revision = 2, - Config = "foo: baz" - }; - - // Flush the sink to speed things along - await oldExtractor.Sink.Flush(source.Token); - - // Extractor should not be restarted, so we should time out waiting for the event to be set. - Assert.False(await CommonUtils.WaitAsync(evt.WaitHandle, TimeSpan.FromSeconds(1), source.Token)); - Assert.Null(extractor); + await runTask; } - [Fact(Timeout = 5000)] public async Task TestRuntimeRestartExtractorCrash() { diff --git a/ExtractorUtils/Unstable/Configuration/ConfigSource.cs b/ExtractorUtils/Unstable/Configuration/ConfigSource.cs index 2035774f..afe8c052 100644 --- a/ExtractorUtils/Unstable/Configuration/ConfigSource.cs +++ b/ExtractorUtils/Unstable/Configuration/ConfigSource.cs @@ -155,7 +155,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep var msg = ex.Message; if (_lastErrorMsg != msg) { - reporter.Fatal($"Fatally failed to load configuration file {_configFilePath}: {ex.Message}", ex.Message + "\n" + ex.StackTrace); + reporter.Fatal($"Fatally failed to load configuration file from {_configFilePath}: {ex.Message}"); } _lastErrorMsg = msg; throw; @@ -170,7 +170,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep { if (isNewConfig) { - reporter.Fatal($"Failed to parse configuration file {_configFilePath}: {ex.Message}", ex.Message + "\n" + ex.StackTrace); + reporter.Fatal($"Failed to parse configuration file from {_configFilePath}: {ex.Message}"); } throw; } @@ -256,7 +256,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep var msg = ex.Message; if (_lastErrorMsg != msg) { - reporter.Fatal($"Fatally failed to load0 configuration file from CDF: {msg}", msg + "\n" + ex.StackTrace); + reporter.Fatal($"Fatally failed to load configuration file from CDF: {msg}"); } _lastErrorMsg = msg; throw; @@ -275,7 +275,7 @@ public override async Task ResolveConfig(int? targetRevision, BaseErrorRep { if (isNewConfig) { - reporter.Fatal($"Failed to parse configuration file from CDF: {ex.Message}", ex.Message + "\n" + ex.StackTrace); + reporter.Fatal($"Failed to parse configuration file from CDF: {ex.Message}"); } throw; }