Skip to content

Commit 2d41a8f

Browse files
authored
Re-connecting should be part of DCP request retry (#18121)
1 parent e2a4187 commit 2d41a8f

2 files changed

Lines changed: 329 additions & 35 deletions

File tree

src/Aspire.Hosting/Dcp/KubernetesService.cs

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -471,17 +471,23 @@ private async Task<TResult> ExecuteWithRetry<TResult>(
471471
Func<Exception, bool> isRetryable,
472472
CancellationToken cancellationToken)
473473
{
474-
var clientReady = await EnsureKubernetesAsync(cancellationToken).ConfigureAwait(false);
475474
using var activity = ProfilingTelemetry.StartDcpKubernetesApi(configuration, operationType, resourceType);
476-
activity.AddKubernetesClientReady(clientReady.WaitMilliseconds, clientReady.Initialized);
477475
var retryCount = 0;
478476

479477
var resiliencePipeline = CreateKubernetesCallResiliencePipeline(isRetryable, activity, () => retryCount++);
480478

481479
try
482480
{
483-
return await resiliencePipeline.ExecuteAsync<TResult>(async (cancellationToken) =>
481+
return await resiliencePipeline.ExecuteAsync(async (cancellationToken) =>
484482
{
483+
// Establish (or re-establish) the connection to DCP inside the retry loop,
484+
// to guard against a failure from missing or partially-written kubeconfig.
485+
var clientReady = await EnsureKubernetesAsync(cancellationToken).ConfigureAwait(false);
486+
if (clientReady.Initialized)
487+
{
488+
activity.AddKubernetesClientReady(clientReady.WaitMilliseconds, clientReady.Initialized);
489+
}
490+
485491
return await operation(_kubernetes!).ConfigureAwait(false);
486492
}, cancellationToken).ConfigureAwait(false);
487493
}
@@ -593,43 +599,44 @@ private async Task<KubernetesClientReady> EnsureKubernetesAsync(CancellationToke
593599
readStopwatch.Start();
594600

595601
var readPipeline = CreateReadKubeconfigResiliencePipeline();
596-
var timeoutPipeline = new ResiliencePipelineBuilder<DcpKubernetesClient>()
597-
.AddTimeout(new TimeoutStrategyOptions { Timeout = MaxRetryDuration })
598-
.Build();
599602

600603
try
601604
{
602-
_kubernetes = await timeoutPipeline.ExecuteAsync(async (cancellationToken) =>
605+
_kubernetes = await readPipeline.ExecuteAsync(async (cancellationToken) =>
603606
{
604-
return await readPipeline.ExecuteAsync<DcpKubernetesClient>(async (cancellationToken) =>
607+
var fileWaitStopwatch = Stopwatch.StartNew();
608+
var fileInfo = new FileInfo(locations.DcpKubeconfigPath);
609+
while (!fileInfo.Exists)
605610
{
606-
var fileWaitStopwatch = Stopwatch.StartNew();
607-
var fileInfo = new FileInfo(locations.DcpKubeconfigPath);
608-
while (!fileInfo.Exists)
609-
{
610-
await Task.Delay(TimeSpan.FromMilliseconds(dcpOptions.Value.KubernetesConfigReadRetryIntervalMilliseconds), cancellationToken).ConfigureAwait(false);
611-
fileInfo = new FileInfo(locations.DcpKubeconfigPath);
612-
}
613-
fileWaitStopwatch.Stop();
614-
activity.SetDcpKubeconfigFileWait(fileWaitStopwatch.ElapsedMilliseconds);
615-
activity.AddKubeconfigFileDetected();
616-
617-
var buildConfigStopwatch = Stopwatch.StartNew();
618-
var config = await KubernetesClientConfiguration.BuildConfigFromConfigFileAsync(kubeconfig: fileInfo, useRelativePaths: false).ConfigureAwait(false);
619-
buildConfigStopwatch.Stop();
620-
readStopwatch.Stop();
621-
622-
logger.LogDebug(
623-
"Successfully read Kubernetes configuration from '{DcpKubeconfigPath}' after {DurationMs} milliseconds.",
624-
locations.DcpKubeconfigPath,
625-
readStopwatch.ElapsedMilliseconds
626-
);
627-
activity.SetDcpKubeconfigBuildDuration(buildConfigStopwatch.ElapsedMilliseconds);
628-
activity.SetDcpKubeconfigReadDuration(readStopwatch.ElapsedMilliseconds);
629-
activity.AddKubeconfigReadComplete();
630-
631-
return new DcpKubernetesClient(config);
632-
}, cancellationToken).ConfigureAwait(false);
611+
await Task.Delay(TimeSpan.FromMilliseconds(dcpOptions.Value.KubernetesConfigReadRetryIntervalMilliseconds), cancellationToken).ConfigureAwait(false);
612+
fileInfo = new FileInfo(locations.DcpKubeconfigPath);
613+
}
614+
fileWaitStopwatch.Stop();
615+
activity.SetDcpKubeconfigFileWait(fileWaitStopwatch.ElapsedMilliseconds);
616+
activity.AddKubeconfigFileDetected();
617+
618+
var buildConfigStopwatch = Stopwatch.StartNew();
619+
// Open the file with FileShare.ReadWrite | FileShare.Delete so we do not interfere with DCP
620+
// if we happen to open the file while DCP is still writing to it.
621+
var kubeconfigStream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete);
622+
KubernetesClientConfiguration config;
623+
await using (kubeconfigStream.ConfigureAwait(false))
624+
{
625+
config = await KubernetesClientConfiguration.BuildConfigFromConfigFileAsync(kubeconfigStream).ConfigureAwait(false);
626+
}
627+
buildConfigStopwatch.Stop();
628+
readStopwatch.Stop();
629+
630+
logger.LogDebug(
631+
"Successfully read Kubernetes configuration from '{DcpKubeconfigPath}' after {DurationMs} milliseconds.",
632+
locations.DcpKubeconfigPath,
633+
readStopwatch.ElapsedMilliseconds
634+
);
635+
activity.SetDcpKubeconfigBuildDuration(buildConfigStopwatch.ElapsedMilliseconds);
636+
activity.SetDcpKubeconfigReadDuration(readStopwatch.ElapsedMilliseconds);
637+
activity.AddKubeconfigReadComplete();
638+
639+
return new DcpKubernetesClient(config);
633640
}, cancellationToken).ConfigureAwait(false);
634641
}
635642
catch (Exception ex)
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
#pragma warning disable ASPIREFILESYSTEM001 // IFileSystemService is for evaluation purposes only.
5+
6+
using System.Globalization;
7+
using Aspire.Hosting.Dcp;
8+
using Aspire.Hosting.Dcp.Model;
9+
using Microsoft.AspNetCore.Builder;
10+
using Microsoft.AspNetCore.Hosting;
11+
using Microsoft.AspNetCore.Http;
12+
using Microsoft.Extensions.Configuration;
13+
using Microsoft.Extensions.Logging;
14+
using Microsoft.Extensions.Logging.Abstractions;
15+
using Microsoft.Extensions.Options;
16+
17+
namespace Aspire.Hosting.Tests.Dcp;
18+
19+
public class KubernetesServiceTests
20+
{
21+
// Verifies that establishing the connection happens inside the retry loop: when the kubeconfig does not
22+
// exist yet (DCP has not finished writing it), the operation waits and succeeds once it appears.
23+
[Fact]
24+
public async Task ExecuteWithRetry_EstablishesConnection_WhenKubeconfigInitiallyMissing()
25+
{
26+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
27+
28+
var (service, kubeconfigPath, fileSystem) = CreateService();
29+
using var disposableFileSystem = fileSystem;
30+
using var disposableService = service;
31+
32+
// No kubeconfig on disk initially.
33+
Assert.False(File.Exists(kubeconfigPath));
34+
35+
var listTask = service.ListAsync<Container>(cancellationToken: cts.Token);
36+
37+
await Task.Delay(300, cts.Token);
38+
39+
await using var server = await TestDcpApiServer.StartAsync(cts.Token);
40+
WriteKubeconfig(kubeconfigPath, server.Port);
41+
42+
var result = await listTask;
43+
Assert.Empty(result);
44+
}
45+
46+
// Verifies that establishing the connection survives a partially-written kubeconfig: when the file exists
47+
// but DCP has only flushed part of it (so it does not yet parse as a valid kubeconfig), the read is retried
48+
// and the operation succeeds once the complete, valid kubeconfig is written.
49+
[Fact]
50+
public async Task ExecuteWithRetry_EstablishesConnection_WhenKubeconfigInitiallyPartiallyWritten()
51+
{
52+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
53+
54+
var (service, kubeconfigPath, fileSystem) = CreateService();
55+
using var disposableFileSystem = fileSystem;
56+
using var disposableService = service;
57+
58+
// Simulate DCP having flushed only the first part of the kubeconfig
59+
WritePartialKubeconfig(kubeconfigPath);
60+
61+
var listTask = service.ListAsync<Container>(cancellationToken: cts.Token);
62+
63+
// Give the read pipeline time to observe and retry the partial file before we finish writing it.
64+
await Task.Delay(300, cts.Token);
65+
66+
await using var server = await TestDcpApiServer.StartAsync(cts.Token);
67+
68+
// Finish the write by appending the remainder onto the same file. In-flight DCP calls will now succeed.
69+
CompleteKubeconfig(kubeconfigPath, server.Port);
70+
71+
var result = await listTask;
72+
Assert.Empty(result);
73+
}
74+
75+
private static (KubernetesService Service, string KubeconfigPath, IDisposable FileSystem) CreateService()
76+
{
77+
var configuration = new ConfigurationBuilder().Build();
78+
79+
// Decouple the kubeconfig location from the production FileSystemService
80+
var fileSystem = new TestFileSystemService();
81+
try
82+
{
83+
var locations = new Locations(fileSystem);
84+
85+
var dcpOptions = Options.Create(new DcpOptions
86+
{
87+
// Poll quickly so the kubeconfig file-wait/read retries react promptly in tests.
88+
KubernetesConfigReadRetryIntervalMilliseconds = 50,
89+
KubernetesConfigReadRetryCount = 300,
90+
});
91+
92+
var service = new KubernetesService(NullLogger<KubernetesService>.Instance, dcpOptions, locations, configuration)
93+
{
94+
// Generous enough that the test can flip the kubeconfig before the retry budget is exhausted.
95+
MaxRetryDuration = TimeSpan.FromSeconds(30),
96+
};
97+
98+
return (service, locations.DcpKubeconfigPath, fileSystem);
99+
}
100+
catch
101+
{
102+
// Don't orphan the temp directory if wiring up the service fails before the test takes ownership.
103+
fileSystem.Dispose();
104+
throw;
105+
}
106+
}
107+
108+
private static void WriteKubeconfig(string path, int port)
109+
{
110+
// Minimal kubeconfig pointing at a plain-HTTP loopback endpoint with no auth, which is all the
111+
// DcpKubernetesClient needs to issue custom-object requests against the fake server.
112+
var content = string.Format(CultureInfo.InvariantCulture, """
113+
apiVersion: v1
114+
kind: Config
115+
clusters:
116+
- name: dcp
117+
cluster:
118+
server: http://127.0.0.1:{0}
119+
contexts:
120+
- name: dcp
121+
context:
122+
cluster: dcp
123+
user: dcp
124+
current-context: dcp
125+
users:
126+
- name: dcp
127+
user:
128+
token: dcp-test-token
129+
""", port);
130+
131+
// Write atomically (temp file + move on the same volume) so a concurrent read by the service never
132+
// observes a half-written file.
133+
var tempPath = path + ".tmp";
134+
File.WriteAllText(tempPath, content);
135+
File.Move(tempPath, path, overwrite: true);
136+
}
137+
138+
private static void WritePartialKubeconfig(string path)
139+
{
140+
// A genuine prefix of the final kubeconfig that stops in the middle of the double-quoted server value.
141+
// The unterminated quote makes this deterministically fail YAML parsing, which models DCP having only
142+
// flushed part of the file. There is intentionally no trailing newline, so appending the remainder in
143+
// CompleteKubeconfig closes the quote and yields exactly-valid YAML.
144+
File.WriteAllText(path, """
145+
apiVersion: v1
146+
kind: Config
147+
clusters:
148+
- name: dcp
149+
cluster:
150+
server: "http://127.0.0.1:
151+
""");
152+
}
153+
154+
private static void CompleteKubeconfig(string path, int port)
155+
{
156+
// Append (do not rewrite) the remainder of the kubeconfig that WritePartialKubeconfig left unfinished.
157+
// The remainder begins by closing the open server scalar ({port}") so the combined file parses as a
158+
// valid kubeconfig pointing at the loopback fake server with no auth.
159+
File.AppendAllText(path, string.Format(CultureInfo.InvariantCulture, """
160+
{0}"
161+
contexts:
162+
- name: dcp
163+
context:
164+
cluster: dcp
165+
user: dcp
166+
current-context: dcp
167+
users:
168+
- name: dcp
169+
user:
170+
token: dcp-test-token
171+
""", port));
172+
}
173+
174+
// A self-contained IFileSystemService for these tests. It hands Locations a single, uniquely-suffixed temp
175+
// directory that the test owns, decoupling the kubeconfig location from the production FileSystemService so
176+
// concurrent runs on the same machine never share a path. Every file a test writes lives under this root, so
177+
// disposing the fake (always, via `using`) removes the kubeconfig and any partial/temp files regardless of the
178+
// test outcome.
179+
private sealed class TestFileSystemService : IFileSystemService, IDisposable
180+
{
181+
private readonly TestTempFileSystemService _tempDirectory = new();
182+
183+
public ITempFileSystemService TempDirectory => _tempDirectory;
184+
185+
public void Dispose() => _tempDirectory.Dispose();
186+
187+
private sealed class TestTempFileSystemService : ITempFileSystemService, IDisposable
188+
{
189+
private string? _root;
190+
191+
public TempDirectory CreateTempSubdirectory(string? prefix = null)
192+
{
193+
// Created lazily (Locations calls this exactly once) so the directory can't be orphaned if the
194+
// test fails to take ownership, and with a random suffix so each test instance is isolated.
195+
_root ??= Directory.CreateTempSubdirectory("test-kubeconfig-").FullName;
196+
return new TestTempDirectory(_root);
197+
}
198+
199+
public TempFile CreateTempFile(string? fileName = null)
200+
=> throw new NotSupportedException("The kubeconfig tests only allocate a temp subdirectory.");
201+
202+
public void Dispose()
203+
{
204+
if (_root is null)
205+
{
206+
return;
207+
}
208+
209+
try
210+
{
211+
if (Directory.Exists(_root))
212+
{
213+
Directory.Delete(_root, recursive: true);
214+
}
215+
}
216+
catch
217+
{
218+
// Best-effort cleanup; a teardown failure must never mask the test result.
219+
}
220+
}
221+
}
222+
223+
// The owning TestTempFileSystemService deletes the root recursively on Dispose,
224+
// so this handle has nothing of its own to release.
225+
private sealed class TestTempDirectory(string path) : TempDirectory
226+
{
227+
public override string Path => path;
228+
229+
public override void Dispose()
230+
{
231+
}
232+
}
233+
}
234+
235+
// A minimal stand-in for the DCP API server. It answers every request with an empty Kubernetes list, which
236+
// is enough for ListAsync<Container>() to deserialize successfully.
237+
//
238+
// It runs a real Kestrel server bound to port 0 so the OS assigns a free port that Kestrel actually binds and
239+
// holds for the lifetime of the server. The bound port is read back after startup. This avoids the classic
240+
// "probe a free port then release it and hope nobody grabs it before we rebind" race.
241+
private sealed class TestDcpApiServer : IAsyncDisposable
242+
{
243+
private readonly WebApplication _app;
244+
245+
private TestDcpApiServer(WebApplication app, int port)
246+
{
247+
_app = app;
248+
Port = port;
249+
}
250+
251+
public int Port { get; }
252+
253+
public static async Task<TestDcpApiServer> StartAsync(CancellationToken cancellationToken = default)
254+
{
255+
var builder = WebApplication.CreateSlimBuilder();
256+
// Keep the test output clean; the fake server's logs are noise.
257+
builder.Logging.ClearProviders();
258+
// Port 0 lets the OS pick a free port that Kestrel binds and holds. After StartAsync the addresses
259+
// feature (exposed via app.Urls) is rewritten with the resolved address, so we can read the real port.
260+
builder.WebHost.UseUrls("http://127.0.0.1:0");
261+
262+
var app = builder.Build();
263+
264+
// Answer every request with an empty container list.
265+
app.Run(async context =>
266+
{
267+
context.Response.StatusCode = StatusCodes.Status200OK;
268+
context.Response.ContentType = "application/json";
269+
await context.Response.WriteAsync("""{"apiVersion":"usvc-dev.developer.microsoft.com/v1","kind":"ContainerList","items":[]}""");
270+
});
271+
272+
await app.StartAsync(cancellationToken).ConfigureAwait(false);
273+
274+
// e.g. "http://127.0.0.1:54321" -> 54321
275+
var address = app.Urls.First();
276+
var port = new Uri(address).Port;
277+
278+
return new TestDcpApiServer(app, port);
279+
}
280+
281+
public async ValueTask DisposeAsync()
282+
{
283+
await _app.StopAsync().ConfigureAwait(false);
284+
await _app.DisposeAsync().ConfigureAwait(false);
285+
}
286+
}
287+
}

0 commit comments

Comments
 (0)