Skip to content

Commit 8c74257

Browse files
committed
fix: remove pipeline run promotion blocking logic
Removed the mechanism that permanently blocked pipeline runs from being promoted after a set number of failed attempts. This ensures that transient failures do not cause pipeline runs to get stuck indefinitely, allowing the system to recover automatically once the underlying issue is resolved. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 15a9bc9 commit 8c74257

File tree

7 files changed

+155
-78
lines changed

7 files changed

+155
-78
lines changed

pkg/queue/common.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
package queue
22

33
import (
4+
"slices"
5+
"strings"
46
"time"
7+
8+
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
9+
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
10+
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
511
)
612

713
type Semaphore interface {
@@ -18,3 +24,42 @@ type Semaphore interface {
1824
getCurrentRunning() []string
1925
getCurrentPending() []string
2026
}
27+
28+
func IsRecoverableQueuedPipelineRun(pr *tektonv1.PipelineRun) bool {
29+
if pr == nil {
30+
return false
31+
}
32+
if pr.GetAnnotations()[keys.State] != kubeinteraction.StateQueued {
33+
return false
34+
}
35+
if pr.Spec.Status != tektonv1.PipelineRunSpecStatusPending {
36+
return false
37+
}
38+
if pr.IsDone() || pr.IsCancelled() {
39+
return false
40+
}
41+
_, ok := executionOrderIndex(pr)
42+
return ok
43+
}
44+
45+
func executionOrderList(pr *tektonv1.PipelineRun) []string {
46+
order := pr.GetAnnotations()[keys.ExecutionOrder]
47+
if order == "" {
48+
return nil
49+
}
50+
return strings.Split(order, ",")
51+
}
52+
53+
func executionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
54+
order := executionOrderList(pr)
55+
if len(order) == 0 {
56+
return 0, false
57+
}
58+
59+
key := PrKey(pr)
60+
index := slices.Index(order, key)
61+
if index < 0 {
62+
return 0, false
63+
}
64+
return index, true
65+
}

pkg/queue/lease_manager.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/hex"
77
"fmt"
88
"os"
9-
"slices"
109
"sort"
1110
"strings"
1211
"time"
@@ -264,10 +263,7 @@ func (m *LeaseManager) getRepoQueueState(ctx context.Context, repo *v1alpha1.Rep
264263
state.running = append(state.running, pr)
265264
}
266265
case kubeinteraction.StateQueued:
267-
if pr.Spec.Status != tektonv1.PipelineRunSpecStatusPending || pr.IsDone() || pr.IsCancelled() {
268-
continue
269-
}
270-
if pr.GetAnnotations()[keys.QueuePromotionBlocked] == "true" {
266+
if !IsRecoverableQueuedPipelineRun(&pr) {
271267
continue
272268
}
273269
position, ok := executionOrderIndex(&pr)
@@ -341,28 +337,6 @@ func comparePipelineRuns(left, right *tektonv1.PipelineRun) bool {
341337
return left.CreationTimestamp.Before(&right.CreationTimestamp)
342338
}
343339

344-
func executionOrderList(pr *tektonv1.PipelineRun) []string {
345-
order := pr.GetAnnotations()[keys.ExecutionOrder]
346-
if order == "" {
347-
return nil
348-
}
349-
return strings.Split(order, ",")
350-
}
351-
352-
func executionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
353-
order := executionOrderList(pr)
354-
if len(order) == 0 {
355-
return 0, false
356-
}
357-
358-
key := PrKey(pr)
359-
index := slices.Index(order, key)
360-
if index < 0 {
361-
return 0, false
362-
}
363-
return index, true
364-
}
365-
366340
func (m *LeaseManager) hasActiveClaim(pr *tektonv1.PipelineRun, now time.Time) bool {
367341
annotations := pr.GetAnnotations()
368342
claimedBy := annotations[keys.QueueClaimedBy]

pkg/queue/lease_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func TestLeaseManagerIgnoresMalformedExecutionOrder(t *testing.T) {
140140
assert.DeepEqual(t, claimed, []string{"test-ns/bbb"})
141141
}
142142

143-
func TestLeaseManagerIgnoresBlockedQueuedRuns(t *testing.T) {
143+
func TestLeaseManagerStillConsidersPreviouslyBlockedQueuedRuns(t *testing.T) {
144144
ctx, _ := rtesting.SetupFakeContext(t)
145145
observer, _ := zapobserver.New(zap.InfoLevel)
146146
logger := zap.New(observer).Sugar()
@@ -160,7 +160,7 @@ func TestLeaseManagerIgnoresBlockedQueuedRuns(t *testing.T) {
160160

161161
claimed, err := manager.AddListToRunningQueue(ctx, repo, []string{"test-ns/aaa", "test-ns/bbb"})
162162
assert.NilError(t, err)
163-
assert.DeepEqual(t, claimed, []string{"test-ns/bbb"})
163+
assert.DeepEqual(t, claimed, []string{"test-ns/aaa"})
164164
}
165165

166166
func TestLeaseManagerReleaseKeepsLeaseAndClearsHolder(t *testing.T) {

pkg/reconciler/controller.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,16 +181,7 @@ func isEligibleLeaseQueueRecoveryCandidate(pipelineRun *tektonv1.PipelineRun) bo
181181
if pipelineRun.GetAnnotations()[keys.Repository] == "" {
182182
return false
183183
}
184-
if pipelineRun.GetAnnotations()[keys.State] != kubeinteraction.StateQueued {
185-
return false
186-
}
187-
if pipelineRun.Spec.Status != tektonv1.PipelineRunSpecStatusPending {
188-
return false
189-
}
190-
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
191-
return false
192-
}
193-
return pipelineRun.GetAnnotations()[keys.QueuePromotionBlocked] != "true"
184+
return queuepkg.IsRecoverableQueuedPipelineRun(pipelineRun)
194185
}
195186

196187
func shouldPreferLeaseQueueRecoveryCandidate(left, right *tektonv1.PipelineRun) bool {

pkg/reconciler/controller_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,24 @@ func TestSelectLeaseQueueRecoveryKeys(t *testing.T) {
100100
pipelineRuns: []*pipelinev1.PipelineRun{
101101
newLeaseRecoveryPR("later", "test-ns", "repo-a", now.Add(2*time.Second), map[string]string{}, false),
102102
newLeaseRecoveryPR("earlier", "test-ns", "repo-a", now.Add(time.Second), map[string]string{}, false),
103-
newLeaseRecoveryPR("blocked", "test-ns", "repo-b", now, map[string]string{
104-
keys.QueuePromotionBlocked: "true",
103+
newLeaseRecoveryPR("missing-order", "test-ns", "repo-b", now, map[string]string{
104+
keys.ExecutionOrder: "",
105+
}, false),
106+
newLeaseRecoveryPR("malformed-order", "test-ns", "repo-c", now.Add(250*time.Millisecond), map[string]string{
107+
keys.ExecutionOrder: "test-ns/some-other-pr",
105108
}, false),
106109
newLeaseRecoveryPR("valid", "test-ns", "repo-b", now.Add(3*time.Second), map[string]string{}, false),
110+
newLeaseRecoveryPR("valid-after-malformed", "test-ns", "repo-c", now.Add(4*time.Second), map[string]string{}, false),
107111
newLeaseRecoveryPR("other-namespace", "other-ns", "repo-a", now.Add(4*time.Second), map[string]string{}, false),
108112
newLeaseRecoveryPR("missing-repo", "test-ns", "", now.Add(5*time.Second), map[string]string{}, false),
109-
newLeaseRecoveryStartedPR("started", "test-ns", "repo-c", now.Add(6*time.Second)),
113+
newLeaseRecoveryStartedPR("started", "test-ns", "repo-d", now.Add(6*time.Second)),
110114
newLeaseRecoveryPR("done", "test-ns", "repo-d", now.Add(7*time.Second), map[string]string{}, true),
111115
},
112116
want: []string{
113117
"test-ns/earlier",
114118
"test-ns/valid",
115119
"other-ns/other-namespace",
120+
"test-ns/valid-after-malformed",
116121
},
117122
},
118123
}
@@ -163,7 +168,8 @@ func newLeaseRecoveryPR(
163168
done bool,
164169
) *pipelinev1.PipelineRun {
165170
annotations := map[string]string{
166-
keys.State: kubeinteraction.StateQueued,
171+
keys.State: kubeinteraction.StateQueued,
172+
keys.ExecutionOrder: namespace + "/" + name,
167173
}
168174
if repo != "" {
169175
annotations[keys.Repository] = repo

pkg/reconciler/queue_pipelineruns.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
)
1919

20-
const maxQueuePromotionRetries = 5
21-
2220
func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLogger, pr *tektonv1.PipelineRun) error {
2321
order, exist := pr.GetAnnotations()[keys.ExecutionOrder]
2422
if !exist {
@@ -99,17 +97,14 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
9997
continue
10098
}
10199
logger.Errorf("failed to update pipelineRun to in_progress: %w", err)
102-
blocked, retryErr := r.recordQueuePromotionFailure(ctx, logger, pr, err)
100+
retryErr := r.recordQueuePromotionFailure(ctx, logger, pr, err)
103101
if retryErr != nil {
104-
logger.Warnf("failed to record queue promotion failure for %s: %v", pr.GetName(), retryErr)
105-
}
106-
if blocked {
107-
logger.Errorf("blocking further queue promotion attempts for pipelineRun %s after %d failures", pr.GetName(), maxQueuePromotionRetries)
102+
return fmt.Errorf("failed to record queue promotion failure for %s after promotion error: %w", pr.GetName(), retryErr)
108103
}
109104
_ = r.qm.RemoveFromQueue(ctx, repo, prKeys)
110-
} else {
111-
processed = true
105+
return fmt.Errorf("failed to update pipelineRun to in_progress: %w", err)
112106
}
107+
processed = true
113108
}
114109
}
115110
if processed {
@@ -131,7 +126,7 @@ func (r *Reconciler) pipelineRunReachedStartedState(ctx context.Context, pr *tek
131126
return latest.GetAnnotations()[keys.State] == kubeinteraction.StateStarted, nil
132127
}
133128

134-
func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *zap.SugaredLogger, pr *tektonv1.PipelineRun, cause error) (bool, error) {
129+
func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *zap.SugaredLogger, pr *tektonv1.PipelineRun, cause error) error {
135130
retries := 0
136131
if current := pr.GetAnnotations()[keys.QueuePromotionRetries]; current != "" {
137132
parsed, err := strconv.Atoi(current)
@@ -145,18 +140,14 @@ func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *za
145140
keys.QueuePromotionRetries: strconv.Itoa(retries),
146141
keys.QueuePromotionLastErr: cause.Error(),
147142
}
148-
blocked := retries >= maxQueuePromotionRetries
149-
if blocked {
150-
annotations[keys.QueuePromotionBlocked] = "true"
151-
}
152143

153144
_, err := action.PatchPipelineRun(ctx, logger, "queue promotion failure", r.run.Clients.Tekton, pr, map[string]any{
154145
"metadata": map[string]any{
155146
"annotations": annotations,
156147
},
157148
})
158149
if err != nil {
159-
return false, err
150+
return err
160151
}
161-
return blocked, nil
152+
return nil
162153
}

pkg/reconciler/queue_pipelineruns_test.go

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package reconciler
33
import (
44
"errors"
55
"fmt"
6+
"strings"
67
"testing"
78

89
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
910
pacv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
11+
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
1012
"github.com/openshift-pipelines/pipelines-as-code/pkg/params"
1113
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/clients"
1214
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
@@ -17,6 +19,8 @@ import (
1719
zapobserver "go.uber.org/zap/zaptest/observer"
1820
"gotest.tools/v3/assert"
1921
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/apimachinery/pkg/runtime"
23+
k8stesting "k8s.io/client-go/testing"
2024
rtesting "knative.dev/pkg/reconciler/testing"
2125
)
2226

@@ -215,26 +219,21 @@ func TestRecordQueuePromotionFailure(t *testing.T) {
215219
fakelogger := zap.New(observer).Sugar()
216220

217221
tests := []struct {
218-
name string
219-
annotations map[string]string
220-
wantRetries string
221-
wantBlocked bool
222-
wantBlockedMarker string
222+
name string
223+
annotations map[string]string
224+
wantRetries string
223225
}{
224226
{
225227
name: "first failure records retry metadata",
226228
annotations: map[string]string{},
227229
wantRetries: "1",
228-
wantBlocked: false,
229230
},
230231
{
231-
name: "reaching retry limit blocks future promotion",
232+
name: "later failures keep incrementing retries without blocking promotion",
232233
annotations: map[string]string{
233234
keys.QueuePromotionRetries: "4",
234235
},
235-
wantRetries: "5",
236-
wantBlocked: true,
237-
wantBlockedMarker: "true",
236+
wantRetries: "5",
238237
},
239238
}
240239

@@ -261,21 +260,92 @@ func TestRecordQueuePromotionFailure(t *testing.T) {
261260
},
262261
}
263262

264-
blocked, err := r.recordQueuePromotionFailure(ctx, fakelogger, pipelineRun, errors.New("cannot patch"))
263+
err := r.recordQueuePromotionFailure(ctx, fakelogger, pipelineRun, errors.New("cannot patch"))
265264
assert.NilError(t, err)
266-
assert.Equal(t, blocked, tt.wantBlocked)
267265

268266
updatedPR, err := stdata.Pipeline.TektonV1().PipelineRuns("test").Get(ctx, "test-pr", metav1.GetOptions{})
269267
assert.NilError(t, err)
270268
assert.Equal(t, updatedPR.GetAnnotations()[keys.QueuePromotionRetries], tt.wantRetries)
271269
assert.Equal(t, updatedPR.GetAnnotations()[keys.QueuePromotionLastErr], "cannot patch")
272-
273-
if tt.wantBlockedMarker == "" {
274-
_, exists := updatedPR.GetAnnotations()[keys.QueuePromotionBlocked]
275-
assert.Assert(t, !exists, "QueuePromotionBlocked should not be set before the retry limit is reached")
276-
} else {
277-
assert.Equal(t, updatedPR.GetAnnotations()[keys.QueuePromotionBlocked], tt.wantBlockedMarker)
278-
}
270+
_, exists := updatedPR.GetAnnotations()[keys.QueuePromotionBlocked]
271+
assert.Assert(t, !exists, "QueuePromotionBlocked should not be set when promotion fails")
279272
})
280273
}
281274
}
275+
276+
func TestQueuePipelineRunStopsAfterSinglePromotionFailure(t *testing.T) {
277+
observer, _ := zapobserver.New(zap.InfoLevel)
278+
fakelogger := zap.New(observer).Sugar()
279+
ctx, _ := rtesting.SetupFakeContext(t)
280+
281+
pipelineRun := &tektonv1.PipelineRun{
282+
ObjectMeta: metav1.ObjectMeta{
283+
Name: "queued",
284+
Namespace: "test",
285+
Annotations: map[string]string{
286+
keys.ExecutionOrder: "test/queued",
287+
keys.Repository: "test",
288+
keys.State: kubeinteraction.StateQueued,
289+
},
290+
},
291+
Spec: tektonv1.PipelineRunSpec{
292+
Status: tektonv1.PipelineRunSpecStatusPending,
293+
},
294+
}
295+
testRepo := &pacv1alpha1.Repository{
296+
ObjectMeta: metav1.ObjectMeta{
297+
Name: "test",
298+
Namespace: "test",
299+
},
300+
Spec: pacv1alpha1.RepositorySpec{
301+
URL: randomURL,
302+
},
303+
}
304+
305+
testData := testclient.Data{
306+
Repositories: []*pacv1alpha1.Repository{testRepo},
307+
PipelineRuns: []*tektonv1.PipelineRun{pipelineRun},
308+
}
309+
stdata, informers := testclient.SeedTestData(t, ctx, testData)
310+
patchCalls := 0
311+
stdata.Pipeline.PrependReactor("patch", "pipelineruns", func(_ k8stesting.Action) (bool, runtime.Object, error) {
312+
patchCalls++
313+
if patchCalls == 1 {
314+
return true, nil, errors.New("boom")
315+
}
316+
return false, nil, nil
317+
})
318+
319+
r := &Reconciler{
320+
qm: testconcurrency.TestQMI{
321+
RunningQueue: []string{"test/queued"},
322+
},
323+
repoLister: informers.Repository.Lister(),
324+
run: &params.Run{
325+
Info: info.Info{
326+
Kube: &info.KubeOpts{
327+
Namespace: "global",
328+
},
329+
Controller: &info.ControllerInfo{},
330+
},
331+
Clients: clients.Clients{
332+
PipelineAsCode: stdata.PipelineAsCode,
333+
Tekton: stdata.Pipeline,
334+
Kube: stdata.Kube,
335+
Log: fakelogger,
336+
},
337+
},
338+
}
339+
340+
err := r.queuePipelineRun(ctx, fakelogger, pipelineRun)
341+
assert.ErrorContains(t, err, "failed to update pipelineRun to in_progress")
342+
343+
updatedPR, getErr := stdata.Pipeline.TektonV1().PipelineRuns("test").Get(ctx, "queued", metav1.GetOptions{})
344+
assert.NilError(t, getErr)
345+
assert.Equal(t, updatedPR.GetAnnotations()[keys.QueuePromotionRetries], "1")
346+
assert.Assert(t, updatedPR.GetAnnotations()[keys.QueuePromotionLastErr] != "")
347+
assert.Assert(t, strings.Contains(updatedPR.GetAnnotations()[keys.QueuePromotionLastErr], "cannot update state"))
348+
assert.Assert(t, strings.Contains(updatedPR.GetAnnotations()[keys.QueuePromotionLastErr], "boom"))
349+
_, exists := updatedPR.GetAnnotations()[keys.QueuePromotionBlocked]
350+
assert.Assert(t, !exists, "QueuePromotionBlocked should not be set when queuePipelineRun returns after a failed promotion")
351+
}

0 commit comments

Comments
 (0)