Skip to content

Commit d7f278d

Browse files
committed
chore: add detailed debug logging for lease manager and reconciler
Enhanced observability across the concurrency lease manager and pipeline reconciler by adding debug-level logs. These logs tracked key operations such as lease acquisition, queue claims, promotion attempts, and reconciliation states to simplify troubleshooting of queue behavior. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
1 parent 8c74257 commit d7f278d

File tree

4 files changed

+113
-3
lines changed

4 files changed

+113
-3
lines changed

pkg/queue/lease_manager.go

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ func NewManagerForBackend(
5353
leaseNamespace, backend string,
5454
) ManagerInterface {
5555
if backend == settings.ConcurrencyBackendLease {
56+
logger.Debugf("initializing lease-backed concurrency manager in namespace %s", leaseNamespace)
5657
return NewLeaseManager(logger, kube, tekton, leaseNamespace)
5758
}
59+
logger.Debugf("initializing in-memory concurrency manager for backend %q", backend)
5860
return NewManager(logger)
5961
}
6062

@@ -93,6 +95,7 @@ func (m *LeaseManager) RemoveRepository(repo *v1alpha1.Repository) {
9395
releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
9496
defer cancel()
9597

98+
m.logger.Debugf("deleting concurrency lease for repository %s", RepoKey(repo))
9699
if err := m.kube.CoordinationV1().Leases(m.leaseNamespace).Delete(releaseCtx, repoLeaseName(RepoKey(repo)), metav1.DeleteOptions{}); err != nil &&
97100
!apierrors.IsNotFound(err) {
98101
m.logger.Warnf("failed to delete queue lease for repository %s: %v", RepoKey(repo), err)
@@ -138,8 +141,13 @@ func (m *LeaseManager) RunningPipelineRuns(repo *v1alpha1.Repository) []string {
138141

139142
func (m *LeaseManager) AddListToRunningQueue(ctx context.Context, repo *v1alpha1.Repository, orderedList []string) ([]string, error) {
140143
if repo.Spec.ConcurrencyLimit == nil || *repo.Spec.ConcurrencyLimit == 0 {
144+
m.logger.Debugf("skipping lease queue claim for repository %s because concurrency limit is disabled", RepoKey(repo))
141145
return []string{}, nil
142146
}
147+
m.logger.Debugf(
148+
"attempting to claim queued pipelineruns for repository %s with concurrency limit %d and preferred order %v",
149+
RepoKey(repo), *repo.Spec.ConcurrencyLimit, orderedList,
150+
)
143151
return m.claimNextQueued(ctx, repo, orderedList, "")
144152
}
145153

@@ -148,6 +156,7 @@ func (m *LeaseManager) AddToPendingQueue(*v1alpha1.Repository, []string) error {
148156
}
149157

150158
func (m *LeaseManager) RemoveFromQueue(ctx context.Context, repo *v1alpha1.Repository, prKey string) bool {
159+
m.logger.Debugf("clearing queue claim for pipelinerun %s in repository %s", prKey, RepoKey(repo))
151160
if err := m.clearClaim(ctx, repo, prKey); err != nil {
152161
m.logger.Warnf("failed to clear queue claim for %s: %v", prKey, err)
153162
return false
@@ -157,18 +166,25 @@ func (m *LeaseManager) RemoveFromQueue(ctx context.Context, repo *v1alpha1.Repos
157166

158167
func (m *LeaseManager) RemoveAndTakeItemFromQueue(ctx context.Context, repo *v1alpha1.Repository, run *tektonv1.PipelineRun) string {
159168
if repo.Spec.ConcurrencyLimit == nil || *repo.Spec.ConcurrencyLimit == 0 {
169+
m.logger.Debugf("not attempting queue handoff for repository %s because concurrency limit is disabled", RepoKey(repo))
160170
return ""
161171
}
162172

163173
orderedList := executionOrderList(run)
174+
m.logger.Debugf(
175+
"removing pipelinerun %s from repository %s and attempting to claim next queued item from order %v",
176+
PrKey(run), RepoKey(repo), orderedList,
177+
)
164178
claimed, err := m.claimNextQueued(ctx, repo, orderedList, PrKey(run))
165179
if err != nil {
166180
m.logger.Warnf("failed to claim next queued pipelinerun for repository %s: %v", RepoKey(repo), err)
167181
return ""
168182
}
169183
if len(claimed) == 0 {
184+
m.logger.Debugf("no queued pipelinerun available to claim for repository %s after removing %s", RepoKey(repo), PrKey(run))
170185
return ""
171186
}
187+
m.logger.Debugf("claimed next queued pipelinerun %s for repository %s", claimed[0], RepoKey(repo))
172188
return claimed[0]
173189
}
174190

@@ -194,27 +210,42 @@ func (m *LeaseManager) claimNextQueued(ctx context.Context, repo *v1alpha1.Repos
194210

195211
occupied := len(state.running) + len(state.claimed)
196212
available := *repo.Spec.ConcurrencyLimit - occupied
213+
m.logger.Debugf(
214+
"lease queue state for repository %s: running=%d claimed=%d queued=%d occupied=%d available=%d exclude=%q preferred=%v",
215+
RepoKey(repo), len(state.running), len(state.claimed), len(state.queued), occupied, available, excludeKey, preferredOrder,
216+
)
197217
if available <= 0 {
218+
m.logger.Debugf("repository %s has no available concurrency slots", RepoKey(repo))
198219
return nil
199220
}
200221

201222
for _, pr := range state.queued {
202223
if available == 0 {
203224
break
204225
}
226+
m.logger.Debugf("attempting to claim queued pipelinerun %s for repository %s", PrKey(&pr), RepoKey(repo))
205227
ok, err := m.claimPipelineRun(lockCtx, &pr)
206228
if err != nil {
207229
return err
208230
}
209231
if ok {
210232
claimed = append(claimed, PrKey(&pr))
211233
available--
234+
m.logger.Debugf(
235+
"claimed queued pipelinerun %s for repository %s; remaining available slots %d",
236+
PrKey(&pr), RepoKey(repo), available,
237+
)
238+
continue
212239
}
240+
m.logger.Debugf("pipelinerun %s could not be claimed for repository %s", PrKey(&pr), RepoKey(repo))
213241
}
214242

215243
return nil
216244
})
217245

246+
if err == nil {
247+
m.logger.Debugf("finished lease queue claim for repository %s; claimed=%v", RepoKey(repo), claimed)
248+
}
218249
return claimed, err
219250
}
220251

@@ -264,6 +295,10 @@ func (m *LeaseManager) getRepoQueueState(ctx context.Context, repo *v1alpha1.Rep
264295
}
265296
case kubeinteraction.StateQueued:
266297
if !IsRecoverableQueuedPipelineRun(&pr) {
298+
m.logger.Debugf(
299+
"skipping queued pipelinerun %s for repository %s because it is not recoverable",
300+
PrKey(&pr), RepoKey(repo),
301+
)
267302
continue
268303
}
269304
position, ok := executionOrderIndex(&pr)
@@ -276,8 +311,10 @@ func (m *LeaseManager) getRepoQueueState(ctx context.Context, repo *v1alpha1.Rep
276311
position: position,
277312
}
278313
if m.hasActiveClaim(&pr, now) {
314+
m.logger.Debugf("queued pipelinerun %s for repository %s already has an active claim", PrKey(&pr), RepoKey(repo))
279315
state.claimed = append(state.claimed, pr)
280316
} else {
317+
m.logger.Debugf("queued pipelinerun %s for repository %s is available for claiming", PrKey(&pr), RepoKey(repo))
281318
state.queued = append(state.queued, pr)
282319
}
283320
}
@@ -293,6 +330,10 @@ func (m *LeaseManager) getRepoQueueState(ctx context.Context, repo *v1alpha1.Rep
293330
return compareQueueCandidates(&state.queued[i], &state.queued[j], preferredIndex, orderMeta)
294331
})
295332

333+
m.logger.Debugf(
334+
"computed lease queue state for repository %s with running=%v claimed=%v queued=%v",
335+
RepoKey(repo), pipelineRunKeys(state.running), pipelineRunKeys(state.claimed), pipelineRunKeys(state.queued),
336+
)
296337
return state, nil
297338
}
298339

@@ -347,28 +388,39 @@ func (m *LeaseManager) hasActiveClaim(pr *tektonv1.PipelineRun, now time.Time) b
347388

348389
claimedTime, err := time.Parse(time.RFC3339Nano, claimedAt)
349390
if err != nil {
391+
m.logger.Debugf("ignoring invalid queue claim timestamp %q on pipelinerun %s: %v", claimedAt, PrKey(pr), err)
350392
return false
351393
}
352394

353-
return now.Sub(claimedTime) <= m.claimTTL
395+
age := now.Sub(claimedTime)
396+
active := age <= m.claimTTL
397+
m.logger.Debugf(
398+
"evaluated queue claim for pipelinerun %s: claimedBy=%s age=%s ttl=%s active=%t",
399+
PrKey(pr), claimedBy, age, m.claimTTL, active,
400+
)
401+
return active
354402
}
355403

356404
func (m *LeaseManager) claimPipelineRun(ctx context.Context, pr *tektonv1.PipelineRun) (bool, error) {
405+
claimedAt := m.now().UTC().Format(time.RFC3339Nano)
406+
m.logger.Debugf("patching pipelinerun %s with queue claim owned by %s at %s", PrKey(pr), m.identity, claimedAt)
357407
mergePatch := map[string]any{
358408
"metadata": map[string]any{
359409
"annotations": map[string]any{
360410
keys.QueueClaimedBy: m.identity,
361-
keys.QueueClaimedAt: m.now().UTC().Format(time.RFC3339Nano),
411+
keys.QueueClaimedAt: claimedAt,
362412
},
363413
},
364414
}
365415

366416
if _, err := action.PatchPipelineRun(ctx, m.logger, "queue claim", m.tekton, pr, mergePatch); err != nil {
367417
if apierrors.IsNotFound(err) {
418+
m.logger.Debugf("pipelinerun %s disappeared before queue claim could be recorded", PrKey(pr))
368419
return false, nil
369420
}
370421
return false, err
371422
}
423+
m.logger.Debugf("successfully recorded queue claim for pipelinerun %s", PrKey(pr))
372424
return true, nil
373425
}
374426

@@ -381,15 +433,21 @@ func (m *LeaseManager) clearClaim(ctx context.Context, repo *v1alpha1.Repository
381433
pr, err := m.tekton.TektonV1().PipelineRuns(nameParts[0]).Get(ctx, nameParts[1], metav1.GetOptions{})
382434
if err != nil {
383435
if apierrors.IsNotFound(err) {
436+
m.logger.Debugf("pipelinerun %s was already deleted before queue claim cleanup", prKey)
384437
return nil
385438
}
386439
return err
387440
}
388441

389442
if pr.GetAnnotations()[keys.Repository] != repo.Name {
443+
m.logger.Debugf(
444+
"skipping queue claim cleanup for pipelinerun %s because it belongs to repository %s instead of %s",
445+
prKey, pr.GetAnnotations()[keys.Repository], repo.Name,
446+
)
390447
return nil
391448
}
392449

450+
m.logger.Debugf("removing queue claim annotations from pipelinerun %s", prKey)
393451
mergePatch := map[string]any{
394452
"metadata": map[string]any{
395453
"annotations": map[string]any{
@@ -403,21 +461,28 @@ func (m *LeaseManager) clearClaim(ctx context.Context, repo *v1alpha1.Repository
403461
if err != nil && !apierrors.IsNotFound(err) {
404462
return err
405463
}
464+
m.logger.Debugf("queue claim cleanup completed for pipelinerun %s", prKey)
406465
return nil
407466
}
408467

409468
func (m *LeaseManager) withRepoLease(ctx context.Context, repo *v1alpha1.Repository, fn func(context.Context) error) error {
410469
leaseName := repoLeaseName(RepoKey(repo))
411470

412471
for attempt := 0; attempt < defaultLeaseAcquireRetries; attempt++ {
472+
m.logger.Debugf(
473+
"attempting to acquire concurrency lease %s for repository %s (attempt %d/%d)",
474+
leaseName, RepoKey(repo), attempt+1, defaultLeaseAcquireRetries,
475+
)
413476
acquired, err := m.tryAcquireLease(ctx, leaseName)
414477
if err != nil {
415478
return err
416479
}
417480
if acquired {
481+
m.logger.Debugf("acquired concurrency lease %s for repository %s", leaseName, RepoKey(repo))
418482
defer m.releaseLease(leaseName)
419483
return fn(ctx)
420484
}
485+
m.logger.Debugf("concurrency lease %s for repository %s is currently held by another watcher", leaseName, RepoKey(repo))
421486

422487
select {
423488
case <-ctx.Done():
@@ -435,6 +500,7 @@ func (m *LeaseManager) tryAcquireLease(ctx context.Context, leaseName string) (b
435500

436501
lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
437502
if apierrors.IsNotFound(err) {
503+
m.logger.Debugf("creating new concurrency lease %s for identity %s", leaseName, m.identity)
438504
_, err = leases.Create(ctx, &coordinationv1.Lease{
439505
ObjectMeta: metav1.ObjectMeta{
440506
Name: leaseName,
@@ -448,9 +514,11 @@ func (m *LeaseManager) tryAcquireLease(ctx context.Context, leaseName string) (b
448514
},
449515
}, metav1.CreateOptions{})
450516
if err == nil {
517+
m.logger.Debugf("created and acquired concurrency lease %s", leaseName)
451518
return true, nil
452519
}
453520
if apierrors.IsAlreadyExists(err) {
521+
m.logger.Debugf("concurrency lease %s was created by another watcher before acquisition completed", leaseName)
454522
return false, nil
455523
}
456524
return false, err
@@ -460,6 +528,11 @@ func (m *LeaseManager) tryAcquireLease(ctx context.Context, leaseName string) (b
460528
}
461529

462530
if !m.canTakeLease(lease, now.Time) {
531+
holder := ""
532+
if lease.Spec.HolderIdentity != nil {
533+
holder = *lease.Spec.HolderIdentity
534+
}
535+
m.logger.Debugf("cannot acquire concurrency lease %s because it is still held by %s", leaseName, holder)
463536
return false, nil
464537
}
465538

@@ -473,11 +546,13 @@ func (m *LeaseManager) tryAcquireLease(ctx context.Context, leaseName string) (b
473546

474547
if _, err := leases.Update(ctx, updated, metav1.UpdateOptions{}); err != nil {
475548
if apierrors.IsConflict(err) {
549+
m.logger.Debugf("conflict while updating concurrency lease %s; another watcher won the race", leaseName)
476550
return false, nil
477551
}
478552
return false, err
479553
}
480554

555+
m.logger.Debugf("updated concurrency lease %s to holder %s", leaseName, m.identity)
481556
return true, nil
482557
}
483558

@@ -549,3 +624,11 @@ func repoLeaseName(repoKey string) string {
549624
sum := sha256.Sum256([]byte(repoKey))
550625
return fmt.Sprintf("pac-concurrency-%s", hex.EncodeToString(sum[:8]))
551626
}
627+
628+
func pipelineRunKeys(prs []tektonv1.PipelineRun) []string {
629+
keys := make([]string, 0, len(prs))
630+
for i := range prs {
631+
keys = append(keys, PrKey(&prs[i]))
632+
}
633+
return keys
634+
}

pkg/reconciler/controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,10 @@ func runLeaseQueueRecovery(logger *zap.SugaredLogger, impl *controller.Impl, lis
124124
logger.Warnf("failed to list queued PipelineRuns for lease recovery: %v", err)
125125
return
126126
}
127+
logger.Debugf("lease queue recovery selected %d repository candidate(s): %v", len(recoveryKeys), recoveryKeys)
127128

128129
for _, key := range recoveryKeys {
130+
logger.Debugf("enqueuing queued pipelinerun %s for lease recovery", key.String())
129131
impl.EnqueueKey(key)
130132
}
131133
}

pkg/reconciler/queue_pipelineruns.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
5757
// if concurrency was set and later removed or changed to zero
5858
// then remove pipelineRun from Queue and update pending state to running
5959
if repo.Spec.ConcurrencyLimit != nil && *repo.Spec.ConcurrencyLimit == 0 {
60+
logger.Debugf("concurrency disabled for repository %s; promoting queued pipelinerun %s immediately", repo.GetName(), pr.GetName())
6061
_ = r.qm.RemoveAndTakeItemFromQueue(ctx, repo, pr)
6162
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
6263
return fmt.Errorf("failed to update PipelineRun to in_progress: %w", err)
@@ -69,21 +70,29 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
6970
maxIterations := 5
7071

7172
orderedList := queuepkg.FilterPipelineRunByState(ctx, r.run.Clients.Tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
73+
logger.Debugf(
74+
"processing queued pipelinerun %s/%s for repository %s with concurrency limit %v and ordered candidates %v",
75+
pr.Namespace, pr.Name, repo.GetName(), repo.Spec.ConcurrencyLimit, orderedList,
76+
)
7277
for {
78+
logger.Debugf("attempting queue acquisition loop %d for repository %s and pipelinerun %s/%s", itered+1, repo.GetName(), pr.Namespace, pr.Name)
7379
acquired, err := r.qm.AddListToRunningQueue(ctx, repo, orderedList)
7480
if err != nil {
7581
return fmt.Errorf("failed to add to queue: %s: %w", pr.GetName(), err)
7682
}
83+
logger.Debugf("queue acquisition for repository %s returned candidates %v", repo.GetName(), acquired)
7784
if len(acquired) == 0 {
7885
logger.Infof("no new PipelineRun acquired for repo %s", repo.GetName())
7986
break
8087
}
8188

8289
for _, prKeys := range acquired {
90+
logger.Debugf("attempting to promote queued pipelinerun %s for repository %s", prKeys, repo.GetName())
8391
nsName := strings.Split(prKeys, "/")
8492
pr, err = r.run.Clients.Tekton.TektonV1().PipelineRuns(nsName[0]).Get(ctx, nsName[1], metav1.GetOptions{})
8593
if err != nil {
8694
logger.Info("failed to get pr with namespace and name: ", nsName[0], nsName[1])
95+
logger.Debugf("clearing queue claim for missing pipelinerun %s after failed fetch", prKeys)
8796
_ = r.qm.RemoveFromQueue(ctx, repo, prKeys)
8897
} else {
8998
if err := r.updatePipelineRunToInProgress(ctx, logger, repo, pr); err != nil {
@@ -97,13 +106,16 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo
97106
continue
98107
}
99108
logger.Errorf("failed to update pipelineRun to in_progress: %w", err)
109+
logger.Debugf("recording queue promotion failure for pipelinerun %s after promotion error", queuepkg.PrKey(pr))
100110
retryErr := r.recordQueuePromotionFailure(ctx, logger, pr, err)
101111
if retryErr != nil {
102112
return fmt.Errorf("failed to record queue promotion failure for %s after promotion error: %w", pr.GetName(), retryErr)
103113
}
114+
logger.Debugf("removing queue claim for pipelinerun %s after promotion failure", prKeys)
104115
_ = r.qm.RemoveFromQueue(ctx, repo, prKeys)
105116
return fmt.Errorf("failed to update pipelineRun to in_progress: %w", err)
106117
}
118+
logger.Debugf("successfully promoted queued pipelinerun %s for repository %s", queuepkg.PrKey(pr), repo.GetName())
107119
processed = true
108120
}
109121
}
@@ -123,7 +135,10 @@ func (r *Reconciler) pipelineRunReachedStartedState(ctx context.Context, pr *tek
123135
if err != nil {
124136
return false, err
125137
}
126-
return latest.GetAnnotations()[keys.State] == kubeinteraction.StateStarted, nil
138+
started := latest.GetAnnotations()[keys.State] == kubeinteraction.StateStarted
139+
r.run.Clients.Log.Debugf("checked latest state for pipelinerun %s/%s while handling queue promotion: state=%s started=%t",
140+
pr.Namespace, pr.Name, latest.GetAnnotations()[keys.State], started)
141+
return started, nil
127142
}
128143

129144
func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *zap.SugaredLogger, pr *tektonv1.PipelineRun, cause error) error {
@@ -135,6 +150,10 @@ func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *za
135150
}
136151
}
137152
retries++
153+
logger.Debugf(
154+
"recording queue promotion failure for pipelinerun %s/%s with retry=%d cause=%v",
155+
pr.Namespace, pr.Name, retries, cause,
156+
)
138157

139158
annotations := map[string]any{
140159
keys.QueuePromotionRetries: strconv.Itoa(retries),
@@ -149,5 +168,6 @@ func (r *Reconciler) recordQueuePromotionFailure(ctx context.Context, logger *za
149168
if err != nil {
150169
return err
151170
}
171+
logger.Debugf("recorded queue promotion failure annotations for pipelinerun %s/%s", pr.Namespace, pr.Name)
152172
return nil
153173
}

0 commit comments

Comments
 (0)