Skip to content

Commit 15a9bc9

Browse files
committed
feat: Implement background recovery for queued PipelineRuns
Introduced a background recovery mechanism for the lease manager to periodically re-enqueue PipelineRuns that remained in a queued state. This ensured that tasks did not get stuck if a lease was lost or if the controller restarted. The loop identified eligible pending runs for each repository and added them back to the work queue. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 0395e25 commit 15a9bc9

File tree

8 files changed

+278
-3
lines changed

8 files changed

+278
-3
lines changed

pkg/params/settings/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type Settings struct {
8484

8585
RememberOKToTest bool `json:"remember-ok-to-test"`
8686
RequireOkToTestSHA bool `json:"require-ok-to-test-sha"`
87-
ConcurrencyBackend string `default:"memory" json:"concurrency-backend"`
87+
ConcurrencyBackend string `default:"memory" json:"concurrency-backend"`
8888
}
8989

9090
func (s *Settings) DeepCopy(out *Settings) {

pkg/queue/lease_manager.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package queue
22

33
import (
44
"context"
5-
"crypto/sha1" //nolint:gosec
5+
"crypto/sha256"
66
"encoding/hex"
77
"fmt"
88
"os"
@@ -86,6 +86,10 @@ func (m *LeaseManager) InitQueues(context.Context, tektonclient.Interface, versi
8686
return nil
8787
}
8888

89+
func (m *LeaseManager) RecoveryInterval() time.Duration {
90+
return m.claimTTL
91+
}
92+
8993
func (m *LeaseManager) RemoveRepository(repo *v1alpha1.Repository) {
9094
releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
9195
defer cancel()
@@ -568,6 +572,6 @@ func (m *LeaseManager) releaseLease(leaseName string) {
568572
}
569573

570574
func repoLeaseName(repoKey string) string {
571-
sum := sha1.Sum([]byte(repoKey))
575+
sum := sha256.Sum256([]byte(repoKey))
572576
return fmt.Sprintf("pac-concurrency-%s", hex.EncodeToString(sum[:8]))
573577
}

pkg/queue/queue_manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func NewManager(logger *zap.SugaredLogger) *Manager {
3737
}
3838
}
3939

40+
func (*Manager) RecoveryInterval() time.Duration {
41+
return 0
42+
}
43+
4044
// getSemaphore returns existing semaphore created for repository or create
4145
// a new one with limit provided in repository
4246
// Semaphore: nothing but a waiting and a running queue for a repository

pkg/queue/queue_manager_interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queue
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
89
"github.com/openshift-pipelines/pipelines-as-code/pkg/generated/clientset/versioned"
@@ -12,6 +13,7 @@ import (
1213

1314
type ManagerInterface interface {
1415
InitQueues(ctx context.Context, tekton tektonVersionedClient.Interface, pac versioned.Interface) error
16+
RecoveryInterval() time.Duration
1517
RemoveRepository(repo *v1alpha1.Repository)
1618
QueuedPipelineRuns(repo *v1alpha1.Repository) []string
1719
RunningPipelineRuns(repo *v1alpha1.Repository) []string

pkg/queue/queue_manager_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ func TestSomeoneElseSetPendingWithNoConcurrencyLimit(t *testing.T) {
4646
assert.Equal(t, len(started), 1)
4747
}
4848

49+
func TestManagerRecoveryInterval(t *testing.T) {
50+
observer, _ := zapobserver.New(zap.InfoLevel)
51+
logger := zap.New(observer).Sugar()
52+
53+
memoryManager := NewManager(logger)
54+
assert.Equal(t, memoryManager.RecoveryInterval(), time.Duration(0))
55+
56+
leaseManager := NewLeaseManager(logger, nil, nil, "pac")
57+
assert.Equal(t, leaseManager.RecoveryInterval(), defaultLeaseClaimTTL)
58+
}
59+
4960
func TestAddToPendingQueueDirectly(t *testing.T) {
5061
observer, _ := zapobserver.New(zap.InfoLevel)
5162
logger := zap.New(observer).Sugar()

pkg/reconciler/controller.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package reconciler
33
import (
44
"context"
55
"path"
6+
"sort"
7+
"time"
68

79
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode"
810
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
@@ -16,6 +18,9 @@ import (
1618
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
1719
tektonPipelineRunInformerv1 "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/pipelinerun"
1820
tektonPipelineRunReconcilerv1 "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1/pipelinerun"
21+
tektonv1lister "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1"
22+
"go.uber.org/zap"
23+
"k8s.io/apimachinery/pkg/labels"
1924
"k8s.io/apimachinery/pkg/types"
2025
"knative.dev/pkg/configmap"
2126
"knative.dev/pkg/controller"
@@ -24,6 +29,8 @@ import (
2429
"knative.dev/pkg/system"
2530
)
2631

32+
const leaseQueueRecoveryBuffer = time.Second
33+
2734
func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
2835
return func(ctx context.Context, _ configmap.Watcher) *controller.Impl {
2936
ctx = info.StoreNS(ctx, system.Namespace())
@@ -71,13 +78,128 @@ func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
7178
logging.FromContext(ctx).Panicf("Couldn't register PipelineRun informer event handler: %w", err)
7279
}
7380

81+
if recoveryInterval := r.qm.RecoveryInterval(); recoveryInterval > 0 {
82+
startLeaseQueueRecoveryLoop(ctx, log, impl, r.pipelineRunLister, recoveryInterval+leaseQueueRecoveryBuffer)
83+
}
84+
7485
// Start pac config syncer after the initial settings have been loaded.
7586
go params.StartConfigSync(ctx, run)
7687

7788
return impl
7889
}
7990
}
8091

92+
func startLeaseQueueRecoveryLoop(
93+
ctx context.Context,
94+
logger *zap.SugaredLogger,
95+
impl *controller.Impl,
96+
lister tektonv1lister.PipelineRunLister,
97+
interval time.Duration,
98+
) {
99+
if interval <= 0 {
100+
return
101+
}
102+
103+
logger.Infof("starting lease queue recovery loop with interval %s", interval)
104+
runLeaseQueueRecovery(logger, impl, lister)
105+
106+
go func() {
107+
ticker := time.NewTicker(interval)
108+
defer ticker.Stop()
109+
110+
for {
111+
select {
112+
case <-ctx.Done():
113+
return
114+
case <-ticker.C:
115+
runLeaseQueueRecovery(logger, impl, lister)
116+
}
117+
}
118+
}()
119+
}
120+
121+
func runLeaseQueueRecovery(logger *zap.SugaredLogger, impl *controller.Impl, lister tektonv1lister.PipelineRunLister) {
122+
recoveryKeys, err := leaseQueueRecoveryKeys(lister)
123+
if err != nil {
124+
logger.Warnf("failed to list queued PipelineRuns for lease recovery: %v", err)
125+
return
126+
}
127+
128+
for _, key := range recoveryKeys {
129+
impl.EnqueueKey(key)
130+
}
131+
}
132+
133+
func leaseQueueRecoveryKeys(lister tektonv1lister.PipelineRunLister) ([]types.NamespacedName, error) {
134+
pipelineRuns, err := lister.List(labels.Everything())
135+
if err != nil {
136+
return nil, err
137+
}
138+
return selectLeaseQueueRecoveryKeys(pipelineRuns), nil
139+
}
140+
141+
func selectLeaseQueueRecoveryKeys(pipelineRuns []*tektonv1.PipelineRun) []types.NamespacedName {
142+
candidatesByRepo := map[string]*tektonv1.PipelineRun{}
143+
144+
for _, pipelineRun := range pipelineRuns {
145+
if !isEligibleLeaseQueueRecoveryCandidate(pipelineRun) {
146+
continue
147+
}
148+
149+
repoKey := types.NamespacedName{
150+
Namespace: pipelineRun.GetNamespace(),
151+
Name: pipelineRun.GetAnnotations()[keys.Repository],
152+
}.String()
153+
if existing, ok := candidatesByRepo[repoKey]; !ok || shouldPreferLeaseQueueRecoveryCandidate(pipelineRun, existing) {
154+
candidatesByRepo[repoKey] = pipelineRun
155+
}
156+
}
157+
158+
selected := make([]*tektonv1.PipelineRun, 0, len(candidatesByRepo))
159+
for _, pipelineRun := range candidatesByRepo {
160+
selected = append(selected, pipelineRun)
161+
}
162+
163+
sort.Slice(selected, func(i, j int) bool {
164+
return shouldPreferLeaseQueueRecoveryCandidate(selected[i], selected[j])
165+
})
166+
167+
recoveryKeys := make([]types.NamespacedName, 0, len(selected))
168+
for _, pipelineRun := range selected {
169+
recoveryKeys = append(recoveryKeys, types.NamespacedName{
170+
Namespace: pipelineRun.GetNamespace(),
171+
Name: pipelineRun.GetName(),
172+
})
173+
}
174+
return recoveryKeys
175+
}
176+
177+
func isEligibleLeaseQueueRecoveryCandidate(pipelineRun *tektonv1.PipelineRun) bool {
178+
if pipelineRun == nil {
179+
return false
180+
}
181+
if pipelineRun.GetAnnotations()[keys.Repository] == "" {
182+
return false
183+
}
184+
if pipelineRun.GetAnnotations()[keys.State] != kubeinteraction.StateQueued {
185+
return false
186+
}
187+
if pipelineRun.Spec.Status != tektonv1.PipelineRunSpecStatusPending {
188+
return false
189+
}
190+
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
191+
return false
192+
}
193+
return pipelineRun.GetAnnotations()[keys.QueuePromotionBlocked] != "true"
194+
}
195+
196+
func shouldPreferLeaseQueueRecoveryCandidate(left, right *tektonv1.PipelineRun) bool {
197+
if left.CreationTimestamp.Equal(&right.CreationTimestamp) {
198+
return left.GetName() < right.GetName()
199+
}
200+
return left.CreationTimestamp.Before(&right.CreationTimestamp)
201+
}
202+
81203
// enqueue only the pipelineruns which are in `started` state
82204
// pipelinerun will have a label `pipelinesascode.tekton.dev/state` to describe the state.
83205
func checkStateAndEnqueue(impl *controller.Impl) func(obj any) {

pkg/reconciler/controller_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@ import (
44
"context"
55
"path"
66
"testing"
7+
"time"
78

89
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode"
910
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
11+
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
1012
tektontest "github.com/openshift-pipelines/pipelines-as-code/pkg/test/tekton"
1113
pipelinev1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
14+
tektonv1lister "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1"
1215
"go.uber.org/zap"
1316
zapobserver "go.uber.org/zap/zaptest/observer"
1417
"gotest.tools/v3/assert"
1518
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/client-go/tools/cache"
20+
duckv1 "knative.dev/pkg/apis/duck/v1"
1621
"knative.dev/pkg/controller"
1722
)
1823

@@ -81,3 +86,125 @@ func TestCtrlOpts(t *testing.T) {
8186
// Assert that the promote filter function returns true.
8287
assert.Assert(t, promote)
8388
}
89+
90+
func TestSelectLeaseQueueRecoveryKeys(t *testing.T) {
91+
now := time.Unix(1_700_001_000, 0)
92+
93+
tests := []struct {
94+
name string
95+
pipelineRuns []*pipelinev1.PipelineRun
96+
want []string
97+
}{
98+
{
99+
name: "selects one oldest queued pending run per repository",
100+
pipelineRuns: []*pipelinev1.PipelineRun{
101+
newLeaseRecoveryPR("later", "test-ns", "repo-a", now.Add(2*time.Second), map[string]string{}, false),
102+
newLeaseRecoveryPR("earlier", "test-ns", "repo-a", now.Add(time.Second), map[string]string{}, false),
103+
newLeaseRecoveryPR("blocked", "test-ns", "repo-b", now, map[string]string{
104+
keys.QueuePromotionBlocked: "true",
105+
}, false),
106+
newLeaseRecoveryPR("valid", "test-ns", "repo-b", now.Add(3*time.Second), map[string]string{}, false),
107+
newLeaseRecoveryPR("other-namespace", "other-ns", "repo-a", now.Add(4*time.Second), map[string]string{}, false),
108+
newLeaseRecoveryPR("missing-repo", "test-ns", "", now.Add(5*time.Second), map[string]string{}, false),
109+
newLeaseRecoveryStartedPR("started", "test-ns", "repo-c", now.Add(6*time.Second)),
110+
newLeaseRecoveryPR("done", "test-ns", "repo-d", now.Add(7*time.Second), map[string]string{}, true),
111+
},
112+
want: []string{
113+
"test-ns/earlier",
114+
"test-ns/valid",
115+
"other-ns/other-namespace",
116+
},
117+
},
118+
}
119+
120+
for _, tt := range tests {
121+
t.Run(tt.name, func(t *testing.T) {
122+
keys := selectLeaseQueueRecoveryKeys(tt.pipelineRuns)
123+
got := make([]string, 0, len(keys))
124+
for _, key := range keys {
125+
got = append(got, key.String())
126+
}
127+
assert.DeepEqual(t, got, tt.want)
128+
})
129+
}
130+
}
131+
132+
func TestRunLeaseQueueRecovery(t *testing.T) {
133+
observer, catcher := zapobserver.New(zap.DebugLevel)
134+
logger := zap.New(observer).Sugar()
135+
wh := &fakeReconciler{}
136+
impl := controller.NewContext(context.TODO(), wh, controller.ControllerOptions{
137+
WorkQueueName: "LeaseRecovery",
138+
Logger: logger.Named("LeaseRecovery"),
139+
})
140+
141+
now := time.Unix(1_700_001_100, 0)
142+
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
143+
for _, pipelineRun := range []*pipelinev1.PipelineRun{
144+
newLeaseRecoveryPR("first", "test-ns", "repo-a", now, map[string]string{}, false),
145+
newLeaseRecoveryPR("second", "test-ns", "repo-a", now.Add(time.Second), map[string]string{}, false),
146+
newLeaseRecoveryPR("third", "test-ns", "repo-b", now.Add(2*time.Second), map[string]string{}, false),
147+
newLeaseRecoveryStartedPR("started", "test-ns", "repo-c", now.Add(3*time.Second)),
148+
} {
149+
assert.NilError(t, indexer.Add(pipelineRun))
150+
}
151+
152+
runLeaseQueueRecovery(logger, impl, tektonv1lister.NewPipelineRunLister(indexer))
153+
154+
assert.Equal(t, catcher.FilterMessageSnippet("Adding to queue test-ns/first").Len(), 1)
155+
assert.Equal(t, catcher.FilterMessageSnippet("Adding to queue test-ns/third").Len(), 1)
156+
assert.Equal(t, catcher.FilterMessageSnippet("Adding to queue").Len(), 2)
157+
}
158+
159+
func newLeaseRecoveryPR(
160+
name, namespace, repo string,
161+
createdAt time.Time,
162+
extraAnnotations map[string]string,
163+
done bool,
164+
) *pipelinev1.PipelineRun {
165+
annotations := map[string]string{
166+
keys.State: kubeinteraction.StateQueued,
167+
}
168+
if repo != "" {
169+
annotations[keys.Repository] = repo
170+
}
171+
for key, value := range extraAnnotations {
172+
annotations[key] = value
173+
}
174+
175+
pipelineRun := &pipelinev1.PipelineRun{
176+
ObjectMeta: metav1.ObjectMeta{
177+
Name: name,
178+
Namespace: namespace,
179+
CreationTimestamp: metav1.Time{Time: createdAt},
180+
Annotations: annotations,
181+
},
182+
Spec: pipelinev1.PipelineRunSpec{
183+
Status: pipelinev1.PipelineRunSpecStatusPending,
184+
},
185+
}
186+
if done {
187+
pipelineRun.Status.Conditions = duckv1.Conditions{{
188+
Type: "Succeeded",
189+
Status: "True",
190+
}}
191+
}
192+
return pipelineRun
193+
}
194+
195+
func newLeaseRecoveryStartedPR(name, namespace, repo string, createdAt time.Time) *pipelinev1.PipelineRun {
196+
return &pipelinev1.PipelineRun{
197+
ObjectMeta: metav1.ObjectMeta{
198+
Name: name,
199+
Namespace: namespace,
200+
CreationTimestamp: metav1.Time{Time: createdAt},
201+
Annotations: map[string]string{
202+
keys.Repository: repo,
203+
keys.State: kubeinteraction.StateStarted,
204+
},
205+
},
206+
Spec: pipelinev1.PipelineRunSpec{
207+
Status: pipelinev1.PipelineRunSpecStatusPending,
208+
},
209+
}
210+
}

0 commit comments

Comments
 (0)