Skip to content

Commit 17330be

Browse files
committed
feat: Enable lease backend concurrency in CI
Updated the CI workflow configuration to utilize the lease backend for concurrency management, ensuring the test environment matches the intended feature deployment.
1 parent 50cbb41 commit 17330be

File tree

5 files changed

+197
-34
lines changed

5 files changed

+197
-34
lines changed

hack/gh-workflow-ci.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ create_pac_github_app_secret() {
2121
kubectl patch configmap -n pipelines-as-code -p "{\"data\":{\"bitbucket-cloud-check-source-ip\": \"false\"}}" \
2222
--type merge pipelines-as-code
2323

24+
# Enable lease backend concurrency
25+
kubectl patch configmap -n pipelines-as-code pipelines-as-code -p '{"data":{"concurrency-backend":"lease"}}'
26+
2427
# restart controller
2528
kubectl -n pipelines-as-code delete pod -l app.kubernetes.io/name=controller
2629

pkg/queue/common.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ func IsRecoverableQueuedPipelineRun(pr *tektonv1.PipelineRun) bool {
4242
return ok
4343
}
4444

45+
func HasActiveLeaseQueueClaim(pr *tektonv1.PipelineRun, now time.Time, ttl time.Duration) bool {
46+
if pr == nil {
47+
return false
48+
}
49+
50+
claimedBy, claimAge := LeaseQueueClaimInfo(pr, now)
51+
return claimedBy != "" && claimAge != unknownDuration() && claimAge <= ttl
52+
}
53+
4554
func ExecutionOrderList(pr *tektonv1.PipelineRun) []string {
4655
order := pr.GetAnnotations()[keys.ExecutionOrder]
4756
if order == "" {

pkg/queue/lease_manager.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ func (m *LeaseManager) RecoveryInterval() time.Duration {
9191
return m.claimTTL
9292
}
9393

94+
func DefaultLeaseClaimTTL() time.Duration {
95+
return defaultLeaseClaimTTL
96+
}
97+
9498
func (m *LeaseManager) RemoveRepository(repo *v1alpha1.Repository) {
9599
releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
96100
defer cancel()
@@ -201,6 +205,8 @@ type pipelineOrderMeta struct {
201205

202206
func (m *LeaseManager) claimNextQueued(ctx context.Context, repo *v1alpha1.Repository, preferredOrder []string, excludeKey string) ([]string, error) {
203207
claimed := []string{}
208+
var debugState *repoQueueState
209+
var debugNewlyClaimed map[string]struct{}
204210

205211
err := m.withRepoLease(ctx, repo, func(lockCtx context.Context) error {
206212
state, err := m.getRepoQueueState(lockCtx, repo, preferredOrder, excludeKey)
@@ -251,9 +257,8 @@ func (m *LeaseManager) claimNextQueued(ctx context.Context, repo *v1alpha1.Repos
251257
m.logger.Debugf("pipelinerun %s could not be claimed for repository %s", PrKey(&pr), RepoKey(repo))
252258
}
253259
state.queued = remainingQueued
254-
if err := m.syncQueueDebugState(lockCtx, repo, state, newlyClaimed); err != nil {
255-
m.logger.Warnf("failed to sync queue debug state for repository %s: %v", RepoKey(repo), err)
256-
}
260+
debugState = cloneRepoQueueState(state)
261+
debugNewlyClaimed = cloneStringSet(newlyClaimed)
257262

258263
if available <= 0 && len(claimed) == 0 {
259264
m.logger.Debugf("repository %s has no available concurrency slots", RepoKey(repo))
@@ -262,6 +267,11 @@ func (m *LeaseManager) claimNextQueued(ctx context.Context, repo *v1alpha1.Repos
262267
})
263268

264269
if err == nil {
270+
if debugState != nil {
271+
if err := m.syncQueueDebugState(ctx, repo, debugState, debugNewlyClaimed); err != nil {
272+
m.logger.Warnf("failed to sync queue debug state for repository %s: %v", RepoKey(repo), err)
273+
}
274+
}
265275
m.logger.Debugf("finished lease queue claim for repository %s; claimed=%v", RepoKey(repo), claimed)
266276
}
267277
return claimed, err
@@ -413,21 +423,8 @@ func comparePipelineRuns(left, right *tektonv1.PipelineRun) bool {
413423
}
414424

415425
func (m *LeaseManager) hasActiveClaim(pr *tektonv1.PipelineRun, now time.Time) bool {
416-
annotations := pr.GetAnnotations()
417-
claimedBy := annotations[keys.QueueClaimedBy]
418-
claimedAt := annotations[keys.QueueClaimedAt]
419-
if claimedBy == "" || claimedAt == "" {
420-
return false
421-
}
422-
423-
claimedTime, err := time.Parse(time.RFC3339Nano, claimedAt)
424-
if err != nil {
425-
m.logger.Debugf("ignoring invalid queue claim timestamp %q on pipelinerun %s: %v", claimedAt, PrKey(pr), err)
426-
return false
427-
}
428-
429-
age := now.Sub(claimedTime)
430-
active := age <= m.claimTTL
426+
active := HasActiveLeaseQueueClaim(pr, now, m.claimTTL)
427+
claimedBy, age := LeaseQueueClaimInfo(pr, now)
431428
m.logger.Debugf(
432429
"evaluated queue claim for pipelinerun %s: claimedBy=%s age=%s ttl=%s active=%t",
433430
PrKey(pr), claimedBy, age, m.claimTTL, active,
@@ -667,6 +664,36 @@ func pipelineRunKeys(prs []tektonv1.PipelineRun) []string {
667664
return keys
668665
}
669666

667+
func cloneRepoQueueState(state *repoQueueState) *repoQueueState {
668+
if state == nil {
669+
return nil
670+
}
671+
672+
cloned := &repoQueueState{}
673+
if len(state.running) > 0 {
674+
cloned.running = append([]tektonv1.PipelineRun(nil), state.running...)
675+
}
676+
if len(state.claimed) > 0 {
677+
cloned.claimed = append([]tektonv1.PipelineRun(nil), state.claimed...)
678+
}
679+
if len(state.queued) > 0 {
680+
cloned.queued = append([]tektonv1.PipelineRun(nil), state.queued...)
681+
}
682+
return cloned
683+
}
684+
685+
func cloneStringSet(values map[string]struct{}) map[string]struct{} {
686+
if len(values) == 0 {
687+
return nil
688+
}
689+
690+
cloned := make(map[string]struct{}, len(values))
691+
for key := range values {
692+
cloned[key] = struct{}{}
693+
}
694+
return cloned
695+
}
696+
670697
func (m *LeaseManager) syncQueueDebugState(
671698
ctx context.Context,
672699
repo *v1alpha1.Repository,

pkg/reconciler/controller.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,24 +196,39 @@ func leaseQueueRecoveryCandidates(lister tektonv1lister.PipelineRunLister) ([]*t
196196
}
197197

198198
func selectLeaseQueueRecoveryCandidates(pipelineRuns []*tektonv1.PipelineRun) []*tektonv1.PipelineRun {
199+
return selectLeaseQueueRecoveryCandidatesAt(pipelineRuns, time.Now(), queuepkg.DefaultLeaseClaimTTL())
200+
}
201+
202+
func selectLeaseQueueRecoveryCandidatesAt(
203+
pipelineRuns []*tektonv1.PipelineRun,
204+
now time.Time,
205+
claimTTL time.Duration,
206+
) []*tektonv1.PipelineRun {
199207
candidatesByRepo := map[string]*tektonv1.PipelineRun{}
208+
healthyRepos := map[string]struct{}{}
200209

201210
for _, pipelineRun := range pipelineRuns {
202-
if !isEligibleLeaseQueueRecoveryCandidate(pipelineRun) {
211+
repoKey, ok := leaseQueueRecoveryRepoKey(pipelineRun)
212+
if !ok {
203213
continue
204214
}
205215

206-
repoKey := types.NamespacedName{
207-
Namespace: pipelineRun.GetNamespace(),
208-
Name: pipelineRun.GetAnnotations()[keys.Repository],
209-
}.String()
216+
if hasHealthyLeaseQueueProgress(pipelineRun, now, claimTTL) {
217+
healthyRepos[repoKey] = struct{}{}
218+
}
219+
if !isEligibleLeaseQueueRecoveryCandidate(pipelineRun) {
220+
continue
221+
}
210222
if existing, ok := candidatesByRepo[repoKey]; !ok || shouldPreferLeaseQueueRecoveryCandidate(pipelineRun, existing) {
211223
candidatesByRepo[repoKey] = pipelineRun
212224
}
213225
}
214226

215227
selected := make([]*tektonv1.PipelineRun, 0, len(candidatesByRepo))
216-
for _, pipelineRun := range candidatesByRepo {
228+
for repoKey, pipelineRun := range candidatesByRepo {
229+
if _, ok := healthyRepos[repoKey]; ok {
230+
continue
231+
}
217232
selected = append(selected, pipelineRun)
218233
}
219234

@@ -247,6 +262,41 @@ func isEligibleLeaseQueueRecoveryCandidate(pipelineRun *tektonv1.PipelineRun) bo
247262
return queuepkg.IsRecoverableQueuedPipelineRun(pipelineRun)
248263
}
249264

265+
func leaseQueueRecoveryRepoKey(pipelineRun *tektonv1.PipelineRun) (string, bool) {
266+
if pipelineRun == nil {
267+
return "", false
268+
}
269+
270+
repoName := pipelineRun.GetAnnotations()[keys.Repository]
271+
if repoName == "" {
272+
return "", false
273+
}
274+
275+
return types.NamespacedName{
276+
Namespace: pipelineRun.GetNamespace(),
277+
Name: repoName,
278+
}.String(), true
279+
}
280+
281+
func hasHealthyLeaseQueueProgress(
282+
pipelineRun *tektonv1.PipelineRun,
283+
now time.Time,
284+
claimTTL time.Duration,
285+
) bool {
286+
if pipelineRun == nil {
287+
return false
288+
}
289+
290+
switch pipelineRun.GetAnnotations()[keys.State] {
291+
case kubeinteraction.StateStarted:
292+
return !pipelineRun.IsDone() && !pipelineRun.IsCancelled()
293+
case kubeinteraction.StateQueued:
294+
return queuepkg.HasActiveLeaseQueueClaim(pipelineRun, now, claimTTL)
295+
default:
296+
return false
297+
}
298+
}
299+
250300
func shouldPreferLeaseQueueRecoveryCandidate(left, right *tektonv1.PipelineRun) bool {
251301
if left.CreationTimestamp.Equal(&right.CreationTimestamp) {
252302
return left.GetName() < right.GetName()

pkg/reconciler/controller_test.go

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package reconciler
22

33
import (
44
"context"
5+
"maps"
56
"path"
67
"testing"
78
"time"
@@ -10,6 +11,7 @@ import (
1011
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
1112
"github.com/openshift-pipelines/pipelines-as-code/pkg/events"
1213
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
14+
queuepkg "github.com/openshift-pipelines/pipelines-as-code/pkg/queue"
1315
testclient "github.com/openshift-pipelines/pipelines-as-code/pkg/test/clients"
1416
tektontest "github.com/openshift-pipelines/pipelines-as-code/pkg/test/tekton"
1517
pipelinev1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
@@ -19,6 +21,7 @@ import (
1921
"gotest.tools/v3/assert"
2022
corev1 "k8s.io/api/core/v1"
2123
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/types"
2225
"k8s.io/client-go/tools/cache"
2326
duckv1 "knative.dev/pkg/apis/duck/v1"
2427
"knative.dev/pkg/controller"
@@ -113,8 +116,14 @@ func TestSelectLeaseQueueRecoveryKeys(t *testing.T) {
113116
newLeaseRecoveryPR("valid", "test-ns", "repo-b", now.Add(3*time.Second), map[string]string{}, false),
114117
newLeaseRecoveryPR("valid-after-malformed", "test-ns", "repo-c", now.Add(4*time.Second), map[string]string{}, false),
115118
newLeaseRecoveryPR("other-namespace", "other-ns", "repo-a", now.Add(4*time.Second), map[string]string{}, false),
119+
newLeaseRecoveryPR("waiting-behind-started", "test-ns", "repo-e", now.Add(4*time.Second), map[string]string{}, false),
120+
newLeaseRecoveryStartedPR("running", "repo-e", now.Add(5*time.Second)),
121+
newLeaseRecoveryPR("actively-claimed", "test-ns", "repo-f", now.Add(6*time.Second), map[string]string{
122+
keys.QueueClaimedBy: "watcher-1",
123+
keys.QueueClaimedAt: now.Format(time.RFC3339Nano),
124+
}, false),
116125
newLeaseRecoveryPR("missing-repo", "test-ns", "", now.Add(5*time.Second), map[string]string{}, false),
117-
newLeaseRecoveryStartedPR("started", "test-ns", "repo-d", now.Add(6*time.Second)),
126+
newLeaseRecoveryStartedPR("started", "repo-d", now.Add(6*time.Second)),
118127
newLeaseRecoveryPR("done", "test-ns", "repo-d", now.Add(7*time.Second), map[string]string{}, true),
119128
},
120129
want: []string{
@@ -128,9 +137,9 @@ func TestSelectLeaseQueueRecoveryKeys(t *testing.T) {
128137

129138
for _, tt := range tests {
130139
t.Run(tt.name, func(t *testing.T) {
131-
keys := selectLeaseQueueRecoveryKeys(tt.pipelineRuns)
132-
got := make([]string, 0, len(keys))
133-
for _, key := range keys {
140+
recoveryKeys := selectLeaseQueueRecoveryKeysAt(tt.pipelineRuns, now, queuepkg.DefaultLeaseClaimTTL())
141+
got := make([]string, 0, len(recoveryKeys))
142+
for _, key := range recoveryKeys {
134143
got = append(got, key.String())
135144
}
136145
assert.DeepEqual(t, got, tt.want)
@@ -155,7 +164,7 @@ func TestRunLeaseQueueRecovery(t *testing.T) {
155164
newLeaseRecoveryPR("first", "test-ns", "repo-a", now, map[string]string{}, false),
156165
newLeaseRecoveryPR("second", "test-ns", "repo-a", now.Add(time.Second), map[string]string{}, false),
157166
newLeaseRecoveryPR("third", "test-ns", "repo-b", now.Add(2*time.Second), map[string]string{}, false),
158-
newLeaseRecoveryStartedPR("started", "test-ns", "repo-c", now.Add(3*time.Second)),
167+
newLeaseRecoveryStartedPR("started", "repo-c", now.Add(3*time.Second)),
159168
} {
160169
assert.NilError(t, indexer.Add(pipelineRun))
161170
_, err := stdata.Pipeline.TektonV1().PipelineRuns(pipelineRun.Namespace).Create(ctx, pipelineRun, metav1.CreateOptions{})
@@ -271,6 +280,73 @@ func TestRunLeaseQueueRecoverySkipsStaleAdvancedLivePipelineRun(t *testing.T) {
271280
}
272281
}
273282

283+
func TestRunLeaseQueueRecoverySkipsHealthyQueuedRuns(t *testing.T) {
284+
observer, catcher := zapobserver.New(zap.DebugLevel)
285+
logger := zap.New(observer).Sugar()
286+
ctx, _ := rtesting.SetupFakeContext(t)
287+
now := time.Unix(1_700_001_300, 0)
288+
claimedAt := time.Now().UTC().Format(time.RFC3339Nano)
289+
290+
started := newLeaseRecoveryStartedPR("running", "repo-a", now)
291+
waiting := newLeaseRecoveryPR("waiting", "test-ns", "repo-a", now.Add(time.Second), map[string]string{}, false)
292+
claimed := newLeaseRecoveryPR("claimed", "test-ns", "repo-b", now.Add(2*time.Second), map[string]string{
293+
keys.QueueClaimedBy: "watcher-1",
294+
keys.QueueClaimedAt: claimedAt,
295+
}, false)
296+
297+
stdata, _ := testclient.SeedTestData(t, ctx, testclient.Data{
298+
PipelineRuns: []*pipelinev1.PipelineRun{started, waiting, claimed},
299+
})
300+
wh := &fakeReconciler{}
301+
impl := controller.NewContext(context.TODO(), wh, controller.ControllerOptions{
302+
WorkQueueName: "LeaseRecovery",
303+
Logger: logger.Named("LeaseRecovery"),
304+
})
305+
306+
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
307+
for _, pipelineRun := range []*pipelinev1.PipelineRun{started, waiting, claimed} {
308+
assert.NilError(t, indexer.Add(pipelineRun))
309+
}
310+
311+
emitter := events.NewEventEmitter(stdata.Kube, logger)
312+
runLeaseQueueRecovery(ctx, logger, impl, tektonv1lister.NewPipelineRunLister(indexer), stdata.Pipeline, emitter)
313+
314+
assert.Equal(t, catcher.FilterMessageSnippet("Adding to queue").Len(), 0)
315+
316+
for _, name := range []string{"waiting", "claimed"} {
317+
updated, err := stdata.Pipeline.TektonV1().PipelineRuns("test-ns").Get(ctx, name, metav1.GetOptions{})
318+
assert.NilError(t, err)
319+
assert.Equal(t, updated.GetAnnotations()[keys.QueueDecision], "")
320+
}
321+
322+
events, err := stdata.Kube.CoreV1().Events("test-ns").List(ctx, metav1.ListOptions{})
323+
assert.NilError(t, err)
324+
recoveryEvents := 0
325+
for _, event := range events.Items {
326+
if event.Reason == "QueueRecoveryRequeued" {
327+
recoveryEvents++
328+
}
329+
}
330+
assert.Equal(t, recoveryEvents, 0)
331+
}
332+
333+
func selectLeaseQueueRecoveryKeysAt(
334+
pipelineRuns []*pipelinev1.PipelineRun,
335+
now time.Time,
336+
claimTTL time.Duration,
337+
) []types.NamespacedName {
338+
selected := selectLeaseQueueRecoveryCandidatesAt(pipelineRuns, now, claimTTL)
339+
340+
recoveryKeys := make([]types.NamespacedName, 0, len(selected))
341+
for _, pipelineRun := range selected {
342+
recoveryKeys = append(recoveryKeys, types.NamespacedName{
343+
Namespace: pipelineRun.GetNamespace(),
344+
Name: pipelineRun.GetName(),
345+
})
346+
}
347+
return recoveryKeys
348+
}
349+
274350
func newLeaseRecoveryPR(
275351
name, namespace, repo string,
276352
createdAt time.Time,
@@ -284,9 +360,7 @@ func newLeaseRecoveryPR(
284360
if repo != "" {
285361
annotations[keys.Repository] = repo
286362
}
287-
for key, value := range extraAnnotations {
288-
annotations[key] = value
289-
}
363+
maps.Copy(annotations, extraAnnotations)
290364

291365
pipelineRun := &pipelinev1.PipelineRun{
292366
ObjectMeta: metav1.ObjectMeta{
@@ -308,11 +382,11 @@ func newLeaseRecoveryPR(
308382
return pipelineRun
309383
}
310384

311-
func newLeaseRecoveryStartedPR(name, namespace, repo string, createdAt time.Time) *pipelinev1.PipelineRun {
385+
func newLeaseRecoveryStartedPR(name, repo string, createdAt time.Time) *pipelinev1.PipelineRun {
312386
return &pipelinev1.PipelineRun{
313387
ObjectMeta: metav1.ObjectMeta{
314388
Name: name,
315-
Namespace: namespace,
389+
Namespace: "test-ns",
316390
CreationTimestamp: metav1.Time{Time: createdAt},
317391
Annotations: map[string]string{
318392
keys.Repository: repo,

0 commit comments

Comments
 (0)