Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/Microsoft.DotNet.Helix/JobMonitor/JobMonitorRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)

IReadOnlySet<string> alreadyProcessed = await _azdo.GetProcessedHelixJobNamesAsync(cancellationToken);
HashSet<string> processedHelixJobs = new(alreadyProcessed, StringComparer.OrdinalIgnoreCase);
HashSet<HelixJobInfo> associatedJobs = new(HelixJobInfo.ByJobNameComparer);
// Keyed by Helix job name so that each poll overwrites the cached snapshot with the
// latest state from Helix. Using a HashSet here would retain the first-seen snapshot
// (typically Status="running", Finished=null), which would cause ReportTimeout and
// CancelInFlightHelixJobsAsync to treat already-completed jobs as still in flight.
Dictionary<string, HelixJobInfo> associatedJobs = new(StringComparer.OrdinalIgnoreCase);

try
{
Expand All @@ -141,7 +145,7 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)
// lost (and tests that observe UploadedJobNames immediately after a cancelled
// run see a partial set).
await WaitForPendingTestResultUploadsAsync(CancellationToken.None);
ReportTimeout(associatedJobs, processedHelixJobs);
ReportTimeout(associatedJobs.Values, processedHelixJobs);

// On cancellation (typically the overall timeout), proactively cancel any
// Helix jobs we know about that haven't finished yet so they don't keep
Expand All @@ -150,7 +154,7 @@ private async Task<int> RunCoreAsync(CancellationToken cancellationToken)
// already cancelled) so the best-effort cancel calls actually run.
using (var cancelCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
await CancelInFlightHelixJobsAsync(associatedJobs, processedHelixJobs, cancelCts.Token);
await CancelInFlightHelixJobsAsync(associatedJobs.Values, processedHelixJobs, cancelCts.Token);
}

return 1;
Expand Down Expand Up @@ -202,7 +206,7 @@ private async Task CancelInFlightHelixJobsAsync(

private async Task<int> RunLoopAsync(
HashSet<string> processedHelixJobs,
HashSet<HelixJobInfo> associatedJobs,
Dictionary<string, HelixJobInfo> associatedJobs,
JobResubmissionResult jobResubmission,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -233,7 +237,12 @@ private async Task<int> RunLoopAsync(
|| string.Equals(j.StageName, _options.StageName, StringComparison.OrdinalIgnoreCase))
];

associatedJobs.UnionWith(associatedJobsWithBuild);
// Overwrite per poll so the cached entry reflects the latest Helix-side state
// (in particular, the Finished timestamp transitioning from null to a value).
foreach (HelixJobInfo j in associatedJobsWithBuild)
{
associatedJobs[j.JobName] = j;
}

// Cache every job we have seen so GetSubmitterChainKey can follow the
// PreviousHelixJobName chain back to a root, even across polls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,74 @@ public async Task MonitorTimesOut_CancelsLatestInFlightHelixJobs()
helix.CanceledJobs.OrderBy(jobName => jobName, StringComparer.OrdinalIgnoreCase).ToArray());
}

/// <summary>
/// Regression: a Helix job that was "running" in the first poll and "finished" in a
/// later poll must not be listed in the timeout error message or cancelled, because the
/// monitor's cached snapshot has to reflect the latest Helix-side state when the
/// timeout fires. Only jobs that are still genuinely in flight at timeout should be
/// reported / cancelled.
/// </summary>
[Fact]
public async Task MonitorTimesOut_DoesNotReportOrCancelJobsThatFinishedAfterFirstPoll()
{
var azdo = new FakeAzureDevOpsService();
var helix = new FakeHelixService();
var logger = new RecordingLogger();

azdo.AddTimelineResponse(
MonitorJob(),
PipelineJob("Test Linux", "inProgress"));

// Poll 1: both jobs running.
helix.AddResponse(jobs:
[
HelixJob("helix-good", "running"),
HelixJob("helix-stuck", "running"),
]);
// Poll 2+: helix-good has finished on Helix, helix-stuck still running.
helix.AddResponse(
jobs:
[
HelixJob("helix-good", "finished"),
HelixJob("helix-stuck", "running"),
],
passFailByJob: new(StringComparer.OrdinalIgnoreCase)
{
["helix-good"] = PassFail(passed: ["good-wi"]),
});

int pollCount = 0;
using var cts = new CancellationTokenSource();
var runner = new JobMonitorRunner(DefaultOptions(), logger, azdo, helix,
async (_, _) =>
{
pollCount++;
if (pollCount >= 2)
{
// Wait until helix-good's results have actually been uploaded before
// cancelling, so the monitor has had a chance to record its terminal
// state.
Task completed = await Task.WhenAny(azdo.UploadCompleted.Task, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.Same(azdo.UploadCompleted.Task, completed);
cts.Cancel();
}
});

int exitCode = await runner.RunAsync(cts.Token);

Assert.Equal(1, exitCode);

// helix-good must not appear in the timeout's "had not finished" list because its
// cached snapshot was overwritten with the latest (finished) state.
string timeoutMessage = Assert.Single(logger.Messages, m =>
m.Contains("Helix Job Monitor timed out", StringComparison.Ordinal));
Assert.Contains("helix-stuck", timeoutMessage, StringComparison.Ordinal);
Assert.DoesNotContain("helix-good", timeoutMessage, StringComparison.Ordinal);

// And the best-effort cancel pass must only target the still-in-flight job.
Assert.Equal(["helix-stuck"], helix.CanceledJobs.OrderBy(j => j, StringComparer.OrdinalIgnoreCase).ToArray());
}

/// <summary>
/// The monitor times out while some pipeline jobs haven't submitted Helix work yet,
/// but one job's Helix results were already uploaded. On relaunch, the monitor picks up:
Expand Down
Loading