Skip to content

Commit bebbd1c

Browse files
committed
feat: Add k8 Lease backend for concurrency coord
Introduced an opt-in concurrency backend that used Kubernetes Lease objects and PipelineRun annotations for queue coordination. This addressed potential state drift and race conditions during watcher restarts or API delays by storing the queue state in the cluster instead of only in memory. Added a global configuration setting to choose between the legacy memory backend and the new lease-based logic. Updated the queue manager interface to support context-aware operations and ensured that stale claims were automatically reclaimed via TTL-based expiration. The system preserved existing lease objects during the release process instead of completely deleting them. This reused the same records across multiple acquisition cycles, which reduced excessive communication overhead with the cluster. AI-assisted-by: Cursor (Codex) Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 29725a3 commit bebbd1c

26 files changed

+2836
-43
lines changed

config/302-pac-configmap.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ data:
173173
# Default: true
174174
skip-push-event-for-pr-commits: "true"
175175

176+
# Selects the concurrency queue backend used by the watcher.
177+
# "memory" keeps the existing in-process queue state.
178+
# "lease" uses Kubernetes Leases plus PipelineRun claims to recover more safely
179+
# from watcher restarts and cluster/API timing issues.
180+
# Restart the watcher after changing this setting.
181+
concurrency-backend: "memory"
182+
176183
# Configure a custom console here, the driver support custom parameters from
177184
# Repo CR along a few other template variable, see documentation for more
178185
# details

docs/content/docs/advanced/concurrency.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ weight: 2
44
---
55
This page illustrates how Pipelines-as-Code manages concurrent PipelineRun execution. When you set a concurrency limit on a Repository CR, Pipelines-as-Code queues incoming PipelineRuns and starts them only when capacity allows.
66

7+
The watcher supports two queue backends controlled by the global `concurrency-backend` setting in the `pipelines-as-code` ConfigMap:
8+
9+
- `memory` keeps queue state in the watcher process. This is the historical behavior and remains the default.
10+
- `lease` stores queue coordination in Kubernetes using `Lease` objects and short-lived PipelineRun claims. This mode is more resilient when the watcher restarts or the cluster is slow to reconcile updates.
11+
712
## Flow diagram
813

914
```mermaid
@@ -37,3 +42,59 @@ graph TD
3742
K --> |No| N
3843
3944
```
45+
46+
## Backend selection
47+
48+
To enable the Kubernetes-backed queue coordination, set the global config to:
49+
50+
```yaml
51+
data:
52+
concurrency-backend: "lease"
53+
```
54+
55+
Restart the watcher after changing `concurrency-backend`; the backend is selected at startup.
56+
57+
When `lease` mode is enabled, Pipelines-as-Code still uses the existing `queued`, `started`, and `completed` PipelineRun states. The difference is that promotion of the next queued PipelineRun is serialized with a per-repository `Lease`, which reduces queue drift during cluster/API instability.
58+
59+
## Debugging the Lease Backend
60+
61+
When `concurrency-backend: "lease"` is enabled, queued `PipelineRun`s expose queue debugging state directly in annotations:
62+
63+
- `pipelinesascode.tekton.dev/queue-decision`
64+
- `pipelinesascode.tekton.dev/queue-debug-summary`
65+
- `pipelinesascode.tekton.dev/queue-claimed-by`
66+
- `pipelinesascode.tekton.dev/queue-claimed-at`
67+
- `pipelinesascode.tekton.dev/queue-promotion-retries`
68+
- `pipelinesascode.tekton.dev/queue-promotion-last-error`
69+
70+
This makes it possible to diagnose most queue issues with `kubectl` before looking at watcher logs.
71+
72+
### Useful commands
73+
74+
```bash
75+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-decision}{"\n"}'
76+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-debug-summary}{"\n"}'
77+
kubectl describe pipelinerun -n <namespace> <name>
78+
kubectl get events -n <namespace> --field-selector involvedObject.kind=Repository
79+
```
80+
81+
### Queue decisions
82+
83+
- `waiting_for_slot`: the run is queued and waiting for repository capacity.
84+
- `claim_active`: another watcher already holds an active short-lived claim on this run.
85+
- `claimed_for_promotion`: this run has been claimed and is being promoted to `started`.
86+
- `promotion_failed`: the watcher failed while promoting the run to `started`.
87+
- `recovery_requeued`: the lease recovery loop noticed this run and enqueued it again.
88+
- `missing_execution_order`: the run is queued but its execution order annotation does not include itself.
89+
- `not_recoverable`: the run is still `queued` but is no longer eligible for lease recovery.
90+
91+
### Events
92+
93+
The watcher also emits repository-scoped Kubernetes events for the most important transitions:
94+
95+
- `QueueClaimedForPromotion`
96+
- `QueuePromotionFailed`
97+
- `QueueRecoveryRequeued`
98+
- `QueueLeaseAcquireTimeout`
99+
100+
If the queue decision and events do not explain the behavior, switch the watcher to debug logging and grep for the repository key and PipelineRun key. The lease backend logs include lease acquisition attempts, active claim evaluation, queue-state snapshots, and recovery loop selections.

docs/content/docs/api/configmap.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,20 @@ skip-push-event-for-pr-commits: "true"
335335

336336
{{< /param >}}
337337

338+
{{< param name="concurrency-backend" type="string" default="memory" id="param-concurrency-backend" >}}
339+
Selects the queue coordination backend used by the watcher. Supported values:
340+
341+
- `memory`: in-process queue tracking. This is the default and matches the historical behavior.
342+
- `lease`: Kubernetes-backed coordination using `Lease` objects and short-lived PipelineRun claims for improved recovery during watcher restarts or API instability.
343+
344+
Restart the watcher after changing this setting.
345+
346+
```yaml
347+
concurrency-backend: "memory"
348+
```
349+
350+
{{< /param >}}
351+
338352
## Complete Example
339353

340354
```yaml
@@ -371,6 +385,7 @@ data:
371385
remember-ok-to-test: "true"
372386
require-ok-to-test-sha: "false"
373387
skip-push-event-for-pr-commits: "true"
388+
concurrency-backend: "memory"
374389
```
375390

376391
## Updating configuration

hack/gh-workflow-ci.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ create_pac_github_app_secret() {
2121
kubectl patch configmap -n pipelines-as-code -p "{\"data\":{\"bitbucket-cloud-check-source-ip\": \"false\"}}" \
2222
--type merge pipelines-as-code
2323

24+
# Enable lease backend concurrency
25+
kubectl patch configmap -n pipelines-as-code pipelines-as-code -p '{"data":{"concurrency-backend":"lease"}}'
26+
2427
# restart controller
2528
kubectl -n pipelines-as-code delete pod -l app.kubernetes.io/name=controller
2629

pkg/apis/pipelinesascode/keys/keys.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ const (
6161
LogURL = pipelinesascode.GroupName + "/log-url"
6262
ExecutionOrder = pipelinesascode.GroupName + "/execution-order"
6363
SCMReportingPLRStarted = pipelinesascode.GroupName + "/scm-reporting-plr-started"
64+
QueueClaimedBy = pipelinesascode.GroupName + "/queue-claimed-by"
65+
QueueClaimedAt = pipelinesascode.GroupName + "/queue-claimed-at"
66+
QueueDecision = pipelinesascode.GroupName + "/queue-decision"
67+
QueueDebugSummary = pipelinesascode.GroupName + "/queue-debug-summary"
68+
QueuePromotionRetries = pipelinesascode.GroupName + "/queue-promotion-retries"
69+
QueuePromotionBlocked = pipelinesascode.GroupName + "/queue-promotion-blocked"
70+
QueuePromotionLastErr = pipelinesascode.GroupName + "/queue-promotion-last-error"
6471
SecretCreated = pipelinesascode.GroupName + "/secret-created"
6572
CloneURL = pipelinesascode.GroupName + "/clone-url"
6673
// PublicGithubAPIURL default is "https://api.github.com" but it can be overridden by X-GitHub-Enterprise-Host header.

pkg/params/config_sync.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"knative.dev/pkg/system"
1414
)
1515

16+
var terminateProcessForConfigChange = func() {
17+
_ = syscall.Kill(os.Getpid(), syscall.SIGTERM)
18+
}
19+
1620
func StartConfigSync(ctx context.Context, run *Run) {
1721
// init pac config
1822
_ = run.UpdatePacConfig(ctx)
@@ -28,7 +32,19 @@ func StartConfigSync(ctx context.Context, run *Run) {
2832
// nothing to do
2933
},
3034
UpdateFunc: func(_, _ any) {
31-
_ = run.UpdatePacConfig(ctx)
35+
oldBackend, newBackend, changed, err := updatePacConfigAndDetectBackendChange(ctx, run)
36+
if err != nil {
37+
return
38+
}
39+
if changed {
40+
if run.Clients.Log != nil {
41+
run.Clients.Log.Infof(
42+
"concurrency-backend changed from %q to %q; restarting process so the queue backend is recreated",
43+
oldBackend, newBackend,
44+
)
45+
}
46+
terminateProcessForConfigChange()
47+
}
3248
},
3349
DeleteFunc: func(_ any) {
3450
// nothing to do
@@ -46,3 +62,13 @@ func StartConfigSync(ctx context.Context, run *Run) {
4662
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
4763
<-sig
4864
}
65+
66+
func updatePacConfigAndDetectBackendChange(ctx context.Context, run *Run) (string, string, bool, error) {
67+
oldBackend := run.Info.GetPacOpts().ConcurrencyBackend
68+
if err := run.UpdatePacConfig(ctx); err != nil {
69+
return oldBackend, oldBackend, false, err
70+
}
71+
72+
newBackend := run.Info.GetPacOpts().ConcurrencyBackend
73+
return oldBackend, newBackend, oldBackend != "" && newBackend != "" && oldBackend != newBackend, nil
74+
}

pkg/params/config_sync_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package params
2+
3+
import (
4+
"testing"
5+
6+
"github.com/openshift-pipelines/pipelines-as-code/pkg/consoleui"
7+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/clients"
8+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
9+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/settings"
10+
corev1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
kubefake "k8s.io/client-go/kubernetes/fake"
13+
rtesting "knative.dev/pkg/reconciler/testing"
14+
15+
"go.uber.org/zap"
16+
"gotest.tools/v3/assert"
17+
)
18+
19+
func TestUpdatePacConfigAndDetectBackendChange(t *testing.T) {
20+
tests := []struct {
21+
name string
22+
initialBackend string
23+
configData map[string]string
24+
wantOld string
25+
wantNew string
26+
wantChanged bool
27+
}{
28+
{
29+
name: "detects backend change",
30+
initialBackend: settings.ConcurrencyBackendMemory,
31+
configData: map[string]string{
32+
"concurrency-backend": settings.ConcurrencyBackendLease,
33+
"tekton-dashboard-url": "https://dashboard.example.test",
34+
},
35+
wantOld: settings.ConcurrencyBackendMemory,
36+
wantNew: settings.ConcurrencyBackendLease,
37+
wantChanged: true,
38+
},
39+
{
40+
name: "ignores unchanged backend",
41+
initialBackend: settings.ConcurrencyBackendLease,
42+
configData: map[string]string{
43+
"concurrency-backend": settings.ConcurrencyBackendLease,
44+
"tekton-dashboard-url": "https://dashboard.example.test",
45+
},
46+
wantOld: settings.ConcurrencyBackendLease,
47+
wantNew: settings.ConcurrencyBackendLease,
48+
wantChanged: false,
49+
},
50+
}
51+
52+
for _, tt := range tests {
53+
t.Run(tt.name, func(t *testing.T) {
54+
ctx, _ := rtesting.SetupFakeContext(t)
55+
ctx = info.StoreNS(ctx, "pac")
56+
57+
run := &Run{
58+
Clients: clients.Clients{
59+
Kube: kubefake.NewSimpleClientset(&corev1.ConfigMap{
60+
ObjectMeta: metav1.ObjectMeta{
61+
Name: "pac-config",
62+
Namespace: "pac",
63+
},
64+
Data: tt.configData,
65+
}),
66+
Log: zap.NewNop().Sugar(),
67+
},
68+
Info: info.Info{
69+
Pac: &info.PacOpts{
70+
Settings: settings.Settings{
71+
ConcurrencyBackend: tt.initialBackend,
72+
},
73+
},
74+
Controller: &info.ControllerInfo{
75+
Configmap: "pac-config",
76+
},
77+
},
78+
}
79+
run.Clients.InitClients()
80+
run.Clients.SetConsoleUI(consoleui.FallBackConsole{})
81+
82+
oldBackend, newBackend, changed, err := updatePacConfigAndDetectBackendChange(ctx, run)
83+
assert.NilError(t, err)
84+
assert.Equal(t, oldBackend, tt.wantOld)
85+
assert.Equal(t, newBackend, tt.wantNew)
86+
assert.Equal(t, changed, tt.wantChanged)
87+
})
88+
}
89+
}

pkg/params/settings/config.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515

1616
const (
1717
PACApplicationNameDefaultValue = "Pipelines as Code CI"
18+
ConcurrencyBackendMemory = "memory"
19+
ConcurrencyBackendLease = "lease"
1820

1921
HubURLKey = "hub-url"
2022
HubCatalogNameKey = "hub-catalog-name"
@@ -80,8 +82,9 @@ type Settings struct {
8082
CustomConsolePRTaskLog string `json:"custom-console-url-pr-tasklog"`
8183
CustomConsoleNamespaceURL string `json:"custom-console-url-namespace"`
8284

83-
RememberOKToTest bool `json:"remember-ok-to-test"`
84-
RequireOkToTestSHA bool `json:"require-ok-to-test-sha"`
85+
RememberOKToTest bool `json:"remember-ok-to-test"`
86+
RequireOkToTestSHA bool `json:"require-ok-to-test-sha"`
87+
ConcurrencyBackend string `default:"memory" json:"concurrency-backend"`
8588
}
8689

8790
func (s *Settings) DeepCopy(out *Settings) {
@@ -110,6 +113,7 @@ func DefaultValidators() map[string]func(string) error {
110113
"CustomConsoleURL": isValidURL,
111114
"CustomConsolePRTaskLog": startWithHTTPorHTTPS,
112115
"CustomConsolePRDetail": startWithHTTPorHTTPS,
116+
"ConcurrencyBackend": isValidConcurrencyBackend,
113117
}
114118
}
115119

@@ -159,3 +163,12 @@ func startWithHTTPorHTTPS(url string) error {
159163
}
160164
return nil
161165
}
166+
167+
func isValidConcurrencyBackend(backend string) error {
168+
switch backend {
169+
case "", ConcurrencyBackendMemory, ConcurrencyBackendLease:
170+
return nil
171+
default:
172+
return fmt.Errorf("invalid concurrency backend %q, must be one of %q or %q", backend, ConcurrencyBackendMemory, ConcurrencyBackendLease)
173+
}
174+
}

pkg/params/settings/config_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestSyncConfig(t *testing.T) {
4949
CustomConsolePRTaskLog: "",
5050
CustomConsoleNamespaceURL: "",
5151
RememberOKToTest: false,
52+
ConcurrencyBackend: ConcurrencyBackendMemory,
5253
},
5354
},
5455
{
@@ -79,6 +80,7 @@ func TestSyncConfig(t *testing.T) {
7980
"remember-ok-to-test": "false",
8081
"skip-push-event-for-pr-commits": "true",
8182
"require-ok-to-test-sha": "true",
83+
"concurrency-backend": "lease",
8284
},
8385
expectedStruct: Settings{
8486
ApplicationName: "pac-pac",
@@ -110,6 +112,7 @@ func TestSyncConfig(t *testing.T) {
110112
CustomConsoleNamespaceURL: "https://custom-console-namespace",
111113
RememberOKToTest: false,
112114
RequireOkToTestSHA: true,
115+
ConcurrencyBackend: ConcurrencyBackendLease,
113116
},
114117
},
115118
{
@@ -147,6 +150,13 @@ func TestSyncConfig(t *testing.T) {
147150
},
148151
expectedError: "custom validation failed for field CustomConsolePRTaskLog: invalid value, must start with http:// or https://",
149152
},
153+
{
154+
name: "invalid concurrency backend",
155+
configMap: map[string]string{
156+
"concurrency-backend": "sqlite",
157+
},
158+
expectedError: `custom validation failed for field ConcurrencyBackend: invalid concurrency backend "sqlite", must be one of "memory" or "lease"`,
159+
},
150160
}
151161

152162
for _, tc := range testCases {

0 commit comments

Comments
 (0)