Skip to content

Commit f221e2a

Browse files
committed
feat: Expose queue status via annotations for lease backend
Introduced debugging annotations for PipelineRuns using the lease concurrency backend to improve observability. Added state tracking to provide context on why a run is waiting, including details like position in the queue, running capacity, and claim age. Emitted Kubernetes events for critical queue transitions such as promotion failures, lease acquisition timeouts, and recovery re-queues. Ensured that these transient debugging annotations are automatically cleared when a PipelineRun transitions to active or finished states. Documentation was updated to include useful troubleshooting commands and an explanation of the new values. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent e0fcbb5 commit f221e2a

File tree

13 files changed

+924
-42
lines changed

13 files changed

+924
-42
lines changed

docs/content/docs/advanced/concurrency.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,46 @@ data:
5555
Restart the watcher after changing `concurrency-backend`; the backend is selected at startup.
5656

5757
When `lease` mode is enabled, Pipelines-as-Code still uses the existing `queued`, `started`, and `completed` PipelineRun states. The difference is that promotion of the next queued PipelineRun is serialized with a per-repository `Lease`, which reduces queue drift during cluster/API instability.
58+
59+
## Debugging the Lease Backend
60+
61+
When `concurrency-backend: "lease"` is enabled, queued `PipelineRun`s expose queue debugging state directly in annotations:
62+
63+
- `pipelinesascode.tekton.dev/queue-decision`
64+
- `pipelinesascode.tekton.dev/queue-debug-summary`
65+
- `pipelinesascode.tekton.dev/queue-claimed-by`
66+
- `pipelinesascode.tekton.dev/queue-claimed-at`
67+
- `pipelinesascode.tekton.dev/queue-promotion-retries`
68+
- `pipelinesascode.tekton.dev/queue-promotion-last-error`
69+
70+
This makes it possible to diagnose most queue issues with `kubectl` before looking at watcher logs.
71+
72+
### Useful commands
73+
74+
```bash
75+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-decision}{"\n"}'
76+
kubectl get pipelinerun -n <namespace> <name> -o jsonpath='{.metadata.annotations.pipelinesascode\.tekton\.dev/queue-debug-summary}{"\n"}'
77+
kubectl describe pipelinerun -n <namespace> <name>
78+
kubectl get events -n <namespace> --field-selector involvedObject.kind=Repository
79+
```
80+
81+
### Queue decisions
82+
83+
- `waiting_for_slot`: the run is queued and waiting for repository capacity.
84+
- `claim_active`: another watcher already holds an active short-lived claim on this run.
85+
- `claimed_for_promotion`: this run has been claimed and is being promoted to `started`.
86+
- `promotion_failed`: the watcher failed while promoting the run to `started`.
87+
- `recovery_requeued`: the lease recovery loop noticed this run and enqueued it again.
88+
- `missing_execution_order`: the run is queued but its execution order annotation does not include itself.
89+
- `not_recoverable`: the run is still `queued` but is no longer eligible for lease recovery.
90+
91+
### Events
92+
93+
The watcher also emits repository-scoped Kubernetes events for the most important transitions:
94+
95+
- `QueueClaimedForPromotion`
96+
- `QueuePromotionFailed`
97+
- `QueueRecoveryRequeued`
98+
- `QueueLeaseAcquireTimeout`
99+
100+
If the queue decision and events do not explain the behavior, switch the watcher to debug logging and grep for the repository key and PipelineRun key. The lease backend logs include lease acquisition attempts, active claim evaluation, queue-state snapshots, and recovery loop selections.

pkg/apis/pipelinesascode/keys/keys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ const (
6363
SCMReportingPLRStarted = pipelinesascode.GroupName + "/scm-reporting-plr-started"
6464
QueueClaimedBy = pipelinesascode.GroupName + "/queue-claimed-by"
6565
QueueClaimedAt = pipelinesascode.GroupName + "/queue-claimed-at"
66+
QueueDecision = pipelinesascode.GroupName + "/queue-decision"
67+
QueueDebugSummary = pipelinesascode.GroupName + "/queue-debug-summary"
6668
QueuePromotionRetries = pipelinesascode.GroupName + "/queue-promotion-retries"
6769
QueuePromotionBlocked = pipelinesascode.GroupName + "/queue-promotion-blocked"
6870
QueuePromotionLastErr = pipelinesascode.GroupName + "/queue-promotion-last-error"

pkg/queue/common.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ func IsRecoverableQueuedPipelineRun(pr *tektonv1.PipelineRun) bool {
4242
return ok
4343
}
4444

45-
func executionOrderList(pr *tektonv1.PipelineRun) []string {
45+
func ExecutionOrderList(pr *tektonv1.PipelineRun) []string {
4646
order := pr.GetAnnotations()[keys.ExecutionOrder]
4747
if order == "" {
4848
return nil
4949
}
5050
return strings.Split(order, ",")
5151
}
5252

53-
func executionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
54-
order := executionOrderList(pr)
53+
func ExecutionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
54+
order := ExecutionOrderList(pr)
5555
if len(order) == 0 {
5656
return 0, false
5757
}
@@ -63,3 +63,11 @@ func executionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
6363
}
6464
return index, true
6565
}
66+
67+
func executionOrderList(pr *tektonv1.PipelineRun) []string {
68+
return ExecutionOrderList(pr)
69+
}
70+
71+
func executionOrderIndex(pr *tektonv1.PipelineRun) (int, bool) {
72+
return ExecutionOrderIndex(pr)
73+
}

pkg/queue/debug_info.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"github.com/openshift-pipelines/pipelines-as-code/pkg/action"
10+
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
11+
"github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction"
12+
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/settings"
13+
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
14+
tektonclient "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
15+
"go.uber.org/zap"
16+
apierrors "k8s.io/apimachinery/pkg/api/errors"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
)
19+
20+
const (
21+
QueueDecisionWaitingForSlot = "waiting_for_slot"
22+
QueueDecisionClaimActive = "claim_active"
23+
QueueDecisionClaimedForPromote = "claimed_for_promotion"
24+
QueueDecisionPromotionFailed = "promotion_failed"
25+
QueueDecisionRecoveryRequeued = "recovery_requeued"
26+
QueueDecisionMissingOrder = "missing_execution_order"
27+
QueueDecisionNotRecoverable = "not_recoverable"
28+
)
29+
30+
const unknownQueueDebugValue = -1
31+
32+
type DebugSnapshot struct {
33+
Backend string
34+
RepoKey string
35+
Position int
36+
Running int
37+
Claimed int
38+
Queued int
39+
Limit int
40+
ClaimedBy string
41+
ClaimAge time.Duration
42+
LastDecision string
43+
}
44+
45+
func (d DebugSnapshot) Summary() string {
46+
backend := d.Backend
47+
if backend == "" {
48+
backend = settings.ConcurrencyBackendLease
49+
}
50+
51+
return fmt.Sprintf(
52+
"backend=%s repo=%s position=%s running=%s claimed=%s queued=%s limit=%s claimedBy=%s claimAge=%s lastDecision=%s",
53+
backend,
54+
formatQueueDebugString(d.RepoKey),
55+
formatQueueDebugInt(d.Position),
56+
formatQueueDebugInt(d.Running),
57+
formatQueueDebugInt(d.Claimed),
58+
formatQueueDebugInt(d.Queued),
59+
formatQueueDebugInt(d.Limit),
60+
formatQueueDebugString(d.ClaimedBy),
61+
formatQueueDebugDuration(d.ClaimAge),
62+
formatQueueDebugString(d.LastDecision),
63+
)
64+
}
65+
66+
func SyncQueueDebugAnnotations(
67+
ctx context.Context,
68+
logger *zap.SugaredLogger,
69+
tekton tektonclient.Interface,
70+
pr *tektonv1.PipelineRun,
71+
snapshot DebugSnapshot,
72+
) error {
73+
if tekton == nil || pr == nil {
74+
return nil
75+
}
76+
77+
latest, err := tekton.TektonV1().PipelineRuns(pr.GetNamespace()).Get(ctx, pr.GetName(), metav1.GetOptions{})
78+
if err != nil {
79+
if apierrors.IsNotFound(err) {
80+
return nil
81+
}
82+
return err
83+
}
84+
85+
if !IsQueueOnlyAnnotationRelevant(latest) {
86+
if logger != nil {
87+
logger.Debugf(
88+
"skipping queue debug annotation update for pipelinerun %s because latest state=%s spec.status=%s done=%t cancelled=%t",
89+
PrKey(latest), latest.GetAnnotations()[keys.State], latest.Spec.Status, latest.IsDone(), latest.IsCancelled(),
90+
)
91+
}
92+
if hasQueueDebugAnnotations(latest) {
93+
return ClearQueueDebugAnnotations(ctx, logger, tekton, latest)
94+
}
95+
return nil
96+
}
97+
98+
summary := snapshot.Summary()
99+
currentAnnotations := latest.GetAnnotations()
100+
if currentAnnotations[keys.QueueDecision] == snapshot.LastDecision &&
101+
currentAnnotations[keys.QueueDebugSummary] == summary {
102+
return nil
103+
}
104+
105+
if logger != nil {
106+
logger.Debugf(
107+
"updating queue debug annotations for pipelinerun %s: decision=%s summary=%q",
108+
PrKey(pr), snapshot.LastDecision, summary,
109+
)
110+
}
111+
112+
_, err = action.PatchPipelineRun(ctx, logger, "queue debug", tekton, latest, map[string]any{
113+
"metadata": map[string]any{
114+
"annotations": map[string]any{
115+
keys.QueueDecision: snapshot.LastDecision,
116+
keys.QueueDebugSummary: summary,
117+
},
118+
},
119+
})
120+
return err
121+
}
122+
123+
func ClearQueueDebugAnnotations(
124+
ctx context.Context,
125+
logger *zap.SugaredLogger,
126+
tekton tektonclient.Interface,
127+
pr *tektonv1.PipelineRun,
128+
) error {
129+
if tekton == nil || pr == nil {
130+
return nil
131+
}
132+
133+
if !hasQueueDebugAnnotations(pr) {
134+
return nil
135+
}
136+
137+
if logger != nil {
138+
logger.Debugf("clearing queue debug annotations for pipelinerun %s", PrKey(pr))
139+
}
140+
141+
_, err := action.PatchPipelineRun(ctx, logger, "queue debug cleanup", tekton, pr, map[string]any{
142+
"metadata": map[string]any{
143+
"annotations": map[string]any{
144+
keys.QueueDecision: nil,
145+
keys.QueueDebugSummary: nil,
146+
},
147+
},
148+
})
149+
return err
150+
}
151+
152+
func LeaseQueueCleanupAnnotations() map[string]any {
153+
return map[string]any{
154+
keys.QueueClaimedBy: nil,
155+
keys.QueueClaimedAt: nil,
156+
keys.QueueDecision: nil,
157+
keys.QueueDebugSummary: nil,
158+
keys.QueuePromotionRetries: nil,
159+
keys.QueuePromotionBlocked: nil,
160+
keys.QueuePromotionLastErr: nil,
161+
}
162+
}
163+
164+
func hasQueueDebugAnnotations(pr *tektonv1.PipelineRun) bool {
165+
if pr == nil {
166+
return false
167+
}
168+
annotations := pr.GetAnnotations()
169+
return annotations[keys.QueueDecision] != "" || annotations[keys.QueueDebugSummary] != ""
170+
}
171+
172+
func IsQueueOnlyAnnotationRelevant(pr *tektonv1.PipelineRun) bool {
173+
if pr == nil {
174+
return false
175+
}
176+
if pr.GetAnnotations()[keys.State] != kubeinteraction.StateQueued {
177+
return false
178+
}
179+
return pr.Spec.Status == tektonv1.PipelineRunSpecStatusPending && !pr.IsDone() && !pr.IsCancelled()
180+
}
181+
182+
func LeaseQueueClaimInfo(pr *tektonv1.PipelineRun, now time.Time) (string, time.Duration) {
183+
if pr == nil {
184+
return "", unknownDuration()
185+
}
186+
187+
annotations := pr.GetAnnotations()
188+
claimedBy := annotations[keys.QueueClaimedBy]
189+
claimedAt := annotations[keys.QueueClaimedAt]
190+
if claimedBy == "" || claimedAt == "" {
191+
return claimedBy, unknownDuration()
192+
}
193+
194+
claimedTime, err := time.Parse(time.RFC3339Nano, claimedAt)
195+
if err != nil {
196+
return claimedBy, unknownDuration()
197+
}
198+
199+
age := now.Sub(claimedTime)
200+
if age < 0 {
201+
age = 0
202+
}
203+
return claimedBy, age
204+
}
205+
206+
func formatQueueDebugInt(v int) string {
207+
if v == unknownQueueDebugValue {
208+
return "n/a"
209+
}
210+
return strconv.Itoa(v)
211+
}
212+
213+
func formatQueueDebugString(v string) string {
214+
if v == "" {
215+
return "n/a"
216+
}
217+
return v
218+
}
219+
220+
func formatQueueDebugDuration(v time.Duration) string {
221+
if v == unknownDuration() {
222+
return "n/a"
223+
}
224+
return v.Truncate(time.Second).String()
225+
}
226+
227+
func unknownDuration() time.Duration {
228+
return -1
229+
}

0 commit comments

Comments
 (0)