Skip to content

Commit 492c959

Browse files
committed
feat: Add k8 Lease backend for concurrency coord
Introduced an opt-in concurrency backend that used Kubernetes Lease objects and PipelineRun annotations for queue coordination. This addressed potential state drift and race conditions during watcher restarts or API delays by storing the queue state in the cluster instead of only in memory. Added a global configuration setting to choose between the legacy memory backend and the new lease-based logic. Updated the queue manager interface to support context-aware operations and ensured that stale claims were automatically reclaimed via TTL-based expiration. The system preserved existing lease objects during the release process instead of completely deleting them. This reused the same records across multiple acquisition cycles, which reduced excessive communication overhead with the cluster. AI-assisted-by: Cursor (Codex) Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 9fc27b5 commit 492c959

29 files changed

+3968
-90
lines changed

config/302-pac-configmap.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ data:
173173
# Default: true
174174
skip-push-event-for-pr-commits: "true"
175175

176+
# Selects the concurrency queue backend used by the watcher.
177+
# "memory" keeps the existing in-process queue state.
178+
# "lease" uses Kubernetes Leases plus PipelineRun claims to recover more safely
179+
# from watcher restarts and cluster/API timing issues.
180+
# Restart the watcher after changing this setting.
181+
concurrency-backend: "memory"
182+
176183
# Configure a custom console here, the driver support custom parameters from
177184
# Repo CR along a few other template variable, see documentation for more
178185
# details

docs/content/docs/advanced/concurrency.md

Lines changed: 125 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,133 @@ weight: 2
44
---
55
This page illustrates how Pipelines-as-Code manages concurrent PipelineRun execution. When you set a concurrency limit on a Repository CR, Pipelines-as-Code queues incoming PipelineRuns and starts them only when capacity allows.
66

7+
The watcher supports two queue backends controlled by the global `concurrency-backend` setting in the `pipelines-as-code` ConfigMap:
8+
9+
- `memory` keeps queue state in the watcher process. This is the historical behavior and remains the default.
10+
- `lease` stores queue coordination in Kubernetes using `Lease` objects and short-lived PipelineRun claims. This mode is more resilient when the watcher restarts or the cluster is slow to reconcile updates.
11+
12+
{{< tech_preview "Lease-backed concurrency backend" >}}
13+
714
## Flow diagram
815

916
```mermaid
10-
graph TD
11-
A1[Controller] --> B1(Validate & Process Event)
12-
B1 --> C1{Is concurrency defined?}
13-
C1 -->|Not Defined| D1[Create PipelineRun with state='started']
14-
C1 -->|Defined| E1[Create PipelineRun with pending status and state='queued']
15-
16-
Z[Pipelines-as-Code]
17-
18-
A[Watcher] --> B(PipelineRun Reconciler)
19-
B --> C{Check state}
20-
C --> |completed| F(Return, nothing to do!)
21-
C --> |queued| D(Create Queue for Repository)
22-
C --> |started| E{Is PipelineRun Done?}
23-
D --> O(Add PipelineRun in the queue)
24-
O --> P{If PipelineRuns running < concurrency_limit}
25-
P --> |Yes| Q(Start the top most PipelineRun in the Queue)
26-
Q --> P
27-
P --> |No| R[Return and wait for your turn]
28-
E --> |Yes| G(Report Status to provider)
29-
E --> |No| H(Requeue Request)
30-
H --> B
31-
G --> I(Update status in Repository)
32-
I --> J(Update state to 'completed')
33-
J --> K{Check if concurrency was defined?}
34-
K --> |Yes| L(Remove PipelineRun from Queue)
35-
L --> M(Start the next PipelineRun from Queue)
36-
M --> N[Done!]
37-
K --> |No| N
17+
flowchart TD
18+
A[Webhook event] --> B[Controller resolves Repository CR]
19+
B --> C{concurrency_limit set?}
20+
C -->|No| D[Create PipelineRun with state=started]
21+
C -->|Yes| E[Create PipelineRun with state=queued and spec.status=pending]
22+
23+
D --> F[Watcher reconciles started PipelineRun]
24+
E --> G[Watcher reconciles queued PipelineRun]
25+
26+
G --> H{Queue backend}
27+
H -->|memory| I[Use in-process semaphore]
28+
H -->|lease| J[Acquire per-repository Lease and inspect live PipelineRuns]
29+
30+
I --> K{Capacity available?}
31+
J --> K
32+
K -->|No| L[Keep PipelineRun queued]
33+
K -->|Yes| M[Claim candidate and patch state=started]
34+
35+
M --> F
36+
F --> N{PipelineRun done?}
37+
N -->|No| F
38+
N -->|Yes| O[Report final status]
39+
O --> P[Release slot and try next queued run]
40+
P --> G
41+
```
42+
43+
## Backend selection
44+
45+
To enable the Kubernetes-backed queue coordination, set the global config to:
3846

47+
```yaml
48+
data:
49+
concurrency-backend: "lease"
3950
```
51+
52+
Restart the watcher after changing `concurrency-backend`; the backend is selected at startup.
53+
54+
When `lease` mode is enabled, Pipelines-as-Code still uses the existing `queued`, `started`, and `completed` PipelineRun states. The difference is that promotion of the next queued PipelineRun is serialized with a per-repository `Lease`, which reduces queue drift during cluster/API instability.
55+
56+
## How lease promotion works
57+
58+
When the watcher reconciles a queued PipelineRun under the `lease` backend, it follows this sequence:
59+
60+
1. Acquire the per-repository Kubernetes Lease (retry up to 20 times with 100 ms delay).
61+
2. List live PipelineRuns for that repository.
62+
3. Separate them into running, claimed, and claimable queued runs.
63+
4. Compute available capacity: `concurrency_limit - running - claimed`.
64+
5. Patch one or more queued runs with short-lived claim annotations (`queue-claimed-by`, `queue-claimed-at`).
65+
6. Release the repository Lease.
66+
7. Re-fetch the claimed run and patch it to `started`.
67+
68+
If promotion fails at step 7, the watcher records the failure on the PipelineRun, clears the claim, and another reconcile retries later.
69+
70+
Claims expire after **30 seconds**. If a watcher crashes or stalls before completing promotion, another instance can pick up the run once the claim expires.
71+
72+
## Recovery loop
73+
74+
When the `lease` backend is active, the watcher starts a background recovery loop that runs every **31 seconds** (claim TTL + 1 s buffer). It looks for repositories where:
75+
76+
- there is no started PipelineRun
77+
- there is no queued PipelineRun with an active (unexpired) claim
78+
- there is still at least one recoverable queued PipelineRun
79+
80+
A queued PipelineRun is recoverable when it has `state=queued`, `spec.status=Pending`, is not done or cancelled, and has a valid `execution-order` annotation.
81+
82+
When a candidate is found, the recovery loop clears stale debug annotations and re-enqueues the oldest recoverable run so normal promotion logic runs again.
83+
84+
## Debugging the Lease Backend
85+
86+
When `concurrency-backend: "lease"` is enabled, queued `PipelineRun`s expose queue debugging state directly in annotations:
87+
88+
- `pipelinesascode.tekton.dev/queue-decision`
89+
- `pipelinesascode.tekton.dev/queue-debug-summary`
90+
- `pipelinesascode.tekton.dev/queue-claimed-by`
91+
- `pipelinesascode.tekton.dev/queue-claimed-at`
92+
- `pipelinesascode.tekton.dev/queue-promotion-retries`
93+
- `pipelinesascode.tekton.dev/queue-promotion-last-error`
94+
95+
This makes it possible to diagnose most queue issues with `kubectl` before looking at watcher logs.
96+
97+
### Useful commands
98+
99+
```bash
100+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-decision}{"\n"}'
101+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-debug-summary}{"\n"}'
102+
kubectl describe pipelinerun -n <namespace> <name>
103+
kubectl get events -n <namespace> --field-selector involvedObject.kind=Repository
104+
```
105+
106+
### Queue decisions
107+
108+
- `waiting_for_slot`: the run is queued and waiting for repository capacity.
109+
- `claim_active`: another watcher already holds an active short-lived claim on this run.
110+
- `claimed_for_promotion`: this run has been claimed and is being promoted to `started`.
111+
- `promotion_failed`: the watcher failed while promoting the run to `started`.
112+
- `recovery_requeued`: the lease recovery loop noticed this run and enqueued it again.
113+
- `missing_execution_order`: the run is queued but its execution order annotation does not include itself.
114+
- `not_recoverable`: the run is still `queued` but is no longer eligible for lease recovery.
115+
116+
### Events
117+
118+
The watcher also emits repository-scoped Kubernetes events for the most important transitions:
119+
120+
- `QueueClaimedForPromotion`
121+
- `QueuePromotionFailed`
122+
- `QueueRecoveryRequeued`
123+
- `QueueLeaseAcquireTimeout`
124+
125+
### Troubleshooting
126+
127+
| Symptom | Queue decision | Likely cause | Action |
128+
| --- | --- | --- | --- |
129+
| Run stuck queued, nothing running | `waiting_for_slot` | Completed run was not cleaned up or finalizer is stuck | Check if a `started` PipelineRun still exists for the repo. If it is done but state was not updated, delete it or patch its state to `completed`. |
130+
| Run stuck queued, another run is running | `waiting_for_slot` | Normal — the run is waiting for the active run to finish. | No action needed unless the running PipelineRun is itself stuck. |
131+
| Run keeps cycling between queued and claimed | `claim_active` | Two watcher replicas are contending for the same run. | Wait for the claim to expire (30 s). If it persists, check watcher logs for lease acquisition errors. |
132+
| Run shows promotion failures | `promotion_failed` | The watcher failed to patch the run to `started` (API error, webhook, or admission rejection). | Check `queue-promotion-last-error` and `queue-promotion-retries` annotations. Resolve the underlying API or admission error. |
133+
| Run was recovered but is stuck again | `recovery_requeued` | The recovery loop re-enqueued the run but promotion failed again on the next attempt. | Check for repeated `QueuePromotionFailed` events on the repository. The underlying issue (permissions, resource quota, webhook) must be fixed. |
134+
| Run is queued but marked not recoverable | `not_recoverable` | The run was cancelled, completed, or lost its `execution-order` annotation. | Inspect the PipelineRun — if it should still run, re-apply the `execution-order` annotation manually. |
135+
136+
If the queue decision and events do not explain the behavior, switch the watcher to debug logging and grep for the repository key and PipelineRun key. The lease backend logs include lease acquisition attempts, active claim evaluation, queue-state snapshots, and recovery loop selections.

docs/content/docs/api/configmap.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,20 @@ skip-push-event-for-pr-commits: "true"
345345

346346
{{< /param >}}
347347

348+
{{< param name="concurrency-backend" type="string" default="memory" id="param-concurrency-backend" >}}
349+
Selects the queue coordination backend used by the watcher. Supported values:
350+
351+
- `memory`: in-process queue tracking. This is the default and matches the historical behavior.
352+
- `lease`: Kubernetes-backed coordination using `Lease` objects and short-lived PipelineRun claims for improved recovery during watcher restarts or API instability. This backend is Technology Preview.
353+
354+
Restart the watcher after changing this setting.
355+
356+
```yaml
357+
concurrency-backend: "memory"
358+
```
359+
360+
{{< /param >}}
361+
348362
## Complete Example
349363

350364
```yaml
@@ -381,6 +395,7 @@ data:
381395
remember-ok-to-test: "true"
382396
require-ok-to-test-sha: "false"
383397
skip-push-event-for-pr-commits: "true"
398+
concurrency-backend: "memory"
384399
```
385400

386401
## Updating configuration

docs/content/docs/guides/repository-crd/concurrency.md

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,113 @@ title: Concurrency
33
weight: 2
44
---
55

6-
This page explains how to limit the number of concurrent PipelineRuns for a Repository CR and how to integrate with Kueue for Kubernetes-native job queueing. Use concurrency limits when you need to control resource consumption or prevent PipelineRuns from overwhelming your cluster.
6+
Use `spec.concurrency_limit` on a Repository CR to cap how many `PipelineRun`s may run at once for that repository.
7+
This is useful when you need to control cluster usage, preserve ordering for related runs, or avoid a burst of webhook events starting too many `PipelineRun`s at once.
78

8-
Set the `concurrency_limit` field to define the maximum number of PipelineRuns running at any time for a Repository CR. This prevents resource exhaustion and ensures predictable scheduling when multiple events arrive in rapid succession.
9+
## Repository setting
10+
11+
Set the `concurrency_limit` field on the Repository CR:
912

1013
```yaml
1114
spec:
1215
concurrency_limit: <number>
1316
```
1417
15-
When multiple PipelineRuns match the event, Pipelines-as-Code starts them in alphabetical order by PipelineRun name.
18+
When a webhook event produces multiple `PipelineRun`s for the same repository:
19+
20+
- the controller creates them with an `execution-order` annotation
21+
- runs that cannot start immediately are created as `state=queued` with Tekton `spec.status=pending`
22+
- the watcher promotes queued runs to `state=started` only when repository capacity is available
23+
24+
If `concurrency_limit: 1`, only one run for that repository is active at a time and the rest stay queued until the watcher promotes them.
25+
26+
## End-to-end flow
27+
28+
1. The controller decides whether the repository is concurrency-limited.
29+
2. If there is no limit, it creates `PipelineRun`s directly in `started`.
30+
3. If there is a limit, it creates `PipelineRun`s in `queued` and records `execution-order`.
31+
4. The watcher reconciles every `PipelineRun` that has a Pipelines-as-Code state annotation.
32+
5. For queued runs, the watcher asks the selected queue backend whether a slot is available.
33+
6. If a run is selected, the watcher patches it to `started`.
34+
7. When a started run finishes, the watcher reports status and asks the backend for the next queued candidate.
35+
36+
## Queue flow diagram
37+
38+
```mermaid
39+
flowchart TD
40+
A[Webhook event] --> B[Controller resolves Repository CR]
41+
B --> C{concurrency_limit set?}
42+
C -->|No| D[Create PipelineRun with state=started]
43+
C -->|Yes| E[Create PipelineRun with state=queued and spec.status=pending]
44+
45+
D --> F[Watcher reconciles started PipelineRun]
46+
E --> G[Watcher reconciles queued PipelineRun]
47+
48+
G --> H{Queue backend}
49+
H -->|memory| I[Use in-process semaphore]
50+
H -->|lease| J[Acquire per-repository Lease and inspect live PipelineRuns]
51+
52+
I --> K{Capacity available?}
53+
J --> K
54+
K -->|No| L[Keep PipelineRun queued]
55+
K -->|Yes| M[Claim candidate and patch state=started]
56+
57+
M --> F
58+
F --> N{PipelineRun done?}
59+
N -->|No| F
60+
N -->|Yes| O[Report final status]
61+
O --> P[Release slot and try next queued run]
62+
P --> G
63+
```
64+
65+
## Backend behavior
66+
67+
The watcher supports two queue backends controlled by the global `concurrency-backend` setting in the `pipelines-as-code` ConfigMap.
68+
69+
### `memory` backend
70+
71+
This is the default.
1672

17-
Example:
73+
- Each repository gets an in-memory semaphore in the watcher process.
74+
- The watcher keeps separate running and pending queues.
75+
- Startup rebuilds queue state from existing `started` and `queued` `PipelineRun`s.
76+
- Coordination is local to that watcher process.
77+
78+
This backend is simple and fast, but it depends on watcher-local state remaining in sync with the cluster view.
79+
80+
### `lease` backend
81+
82+
{{< tech_preview "Lease-backed concurrency backend" >}}
83+
84+
- Each repository uses a Kubernetes `Lease` as a short critical section.
85+
- The watcher recomputes queue state from live `PipelineRun`s rather than trusting only process memory.
86+
- A queued run is considered temporarily reserved when it carries short-lived claim annotations (`queue-claimed-by` and `queue-claimed-at`). If the watcher crashes or stalls, another instance can recover after the claim expires.
87+
- The watcher sorts candidates using the recorded `execution-order`, then falls back to creation time.
88+
- A background recovery loop re-enqueues the oldest recoverable queued run when a repository has no active started run and no active claim.
89+
90+
This backend is designed for environments where the watcher may restart, the API server is slow, or promotion to `started` can fail transiently.
91+
92+
For debugging annotations, queue decisions, events, and the full promotion flow see [Advanced Concurrency]({{< relref "/docs/advanced/concurrency" >}}).
93+
94+
## Choosing the backend
95+
96+
Select the global backend in the Pipelines-as-Code ConfigMap:
97+
98+
```yaml
99+
data:
100+
concurrency-backend: "memory"
101+
```
102+
103+
or:
104+
105+
```yaml
106+
data:
107+
concurrency-backend: "lease"
108+
```
18109

19-
If you have three PipelineRuns in your `.tekton/` directory and you create a pull
20-
request with a `concurrency_limit` of 1 in the repository configuration,
21-
Pipelines-as-Code executes all PipelineRuns in alphabetical order, one after the
22-
other. At any given time, only one PipelineRun is in the running state,
23-
while the rest are queued.
110+
Changing this setting requires restarting the watcher so it can recreate the queue manager with the new backend.
24111

25-
For additional concurrency strategies and global configuration options, see [Advanced Concurrency]({{< relref "/docs/advanced/concurrency" >}}).
112+
For the global `concurrency-backend` setting itself, see [ConfigMap Reference]({{< relref "/docs/api/configmap" >}}).
26113

27114
## Kueue - Kubernetes-native Job Queueing
28115

hack/gh-workflow-ci.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ create_pac_github_app_secret() {
2121
kubectl patch configmap -n pipelines-as-code -p "{\"data\":{\"bitbucket-cloud-check-source-ip\": \"false\"}}" \
2222
--type merge pipelines-as-code
2323

24+
# Enable lease backend concurrency
25+
kubectl patch configmap -n pipelines-as-code pipelines-as-code -p '{"data":{"concurrency-backend":"lease"}}'
26+
2427
# restart controller
2528
kubectl -n pipelines-as-code delete pod -l app.kubernetes.io/name=controller
2629

pkg/apis/pipelinesascode/keys/keys.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ const (
6161
LogURL = pipelinesascode.GroupName + "/log-url"
6262
ExecutionOrder = pipelinesascode.GroupName + "/execution-order"
6363
SCMReportingPLRStarted = pipelinesascode.GroupName + "/scm-reporting-plr-started"
64+
QueueClaimedBy = pipelinesascode.GroupName + "/queue-claimed-by"
65+
QueueClaimedAt = pipelinesascode.GroupName + "/queue-claimed-at"
66+
QueueDecision = pipelinesascode.GroupName + "/queue-decision"
67+
QueueDebugSummary = pipelinesascode.GroupName + "/queue-debug-summary"
68+
QueuePromotionRetries = pipelinesascode.GroupName + "/queue-promotion-retries"
69+
QueuePromotionBlocked = pipelinesascode.GroupName + "/queue-promotion-blocked"
70+
QueuePromotionLastErr = pipelinesascode.GroupName + "/queue-promotion-last-error"
6471
SecretCreated = pipelinesascode.GroupName + "/secret-created"
6572
CloneURL = pipelinesascode.GroupName + "/clone-url"
6673
// PublicGithubAPIURL default is "https://api.github.com" but it can be overridden by X-GitHub-Enterprise-Host header.

pkg/params/config_sync.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"knative.dev/pkg/system"
1414
)
1515

16+
var terminateProcessForConfigChange = func() {
17+
_ = syscall.Kill(os.Getpid(), syscall.SIGTERM)
18+
}
19+
1620
func StartConfigSync(ctx context.Context, run *Run) {
1721
// init pac config
1822
_ = run.UpdatePacConfig(ctx)
@@ -28,7 +32,19 @@ func StartConfigSync(ctx context.Context, run *Run) {
2832
// nothing to do
2933
},
3034
UpdateFunc: func(_, _ any) {
31-
_ = run.UpdatePacConfig(ctx)
35+
oldBackend, newBackend, changed, err := updatePacConfigAndDetectBackendChange(ctx, run)
36+
if err != nil {
37+
return
38+
}
39+
if changed {
40+
if run.Clients.Log != nil {
41+
run.Clients.Log.Infof(
42+
"concurrency-backend changed from %q to %q; restarting process so the queue backend is recreated",
43+
oldBackend, newBackend,
44+
)
45+
}
46+
terminateProcessForConfigChange()
47+
}
3248
},
3349
DeleteFunc: func(_ any) {
3450
// nothing to do
@@ -46,3 +62,13 @@ func StartConfigSync(ctx context.Context, run *Run) {
4662
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
4763
<-sig
4864
}
65+
66+
func updatePacConfigAndDetectBackendChange(ctx context.Context, run *Run) (string, string, bool, error) {
67+
oldBackend := run.Info.GetPacOpts().ConcurrencyBackend
68+
if err := run.UpdatePacConfig(ctx); err != nil {
69+
return oldBackend, oldBackend, false, err
70+
}
71+
72+
newBackend := run.Info.GetPacOpts().ConcurrencyBackend
73+
return oldBackend, newBackend, oldBackend != "" && newBackend != "" && oldBackend != newBackend, nil
74+
}

0 commit comments

Comments
 (0)