|
| 1 | +package queue |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "strconv" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/openshift-pipelines/pipelines-as-code/pkg/action" |
| 10 | + "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys" |
| 11 | + "github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction" |
| 12 | + "github.com/openshift-pipelines/pipelines-as-code/pkg/params/settings" |
| 13 | + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" |
| 14 | + tektonclient "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" |
| 15 | + "go.uber.org/zap" |
| 16 | + apierrors "k8s.io/apimachinery/pkg/api/errors" |
| 17 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 18 | +) |
| 19 | + |
| 20 | +const ( |
| 21 | + QueueDecisionWaitingForSlot = "waiting_for_slot" |
| 22 | + QueueDecisionClaimActive = "claim_active" |
| 23 | + QueueDecisionClaimedForPromote = "claimed_for_promotion" |
| 24 | + QueueDecisionPromotionFailed = "promotion_failed" |
| 25 | + QueueDecisionRecoveryRequeued = "recovery_requeued" |
| 26 | + QueueDecisionMissingOrder = "missing_execution_order" |
| 27 | + QueueDecisionNotRecoverable = "not_recoverable" |
| 28 | +) |
| 29 | + |
| 30 | +const unknownQueueDebugValue = -1 |
| 31 | + |
| 32 | +type DebugSnapshot struct { |
| 33 | + Backend string |
| 34 | + RepoKey string |
| 35 | + Position int |
| 36 | + Running int |
| 37 | + Claimed int |
| 38 | + Queued int |
| 39 | + Limit int |
| 40 | + ClaimedBy string |
| 41 | + ClaimAge time.Duration |
| 42 | + LastDecision string |
| 43 | +} |
| 44 | + |
| 45 | +func (d DebugSnapshot) Summary() string { |
| 46 | + backend := d.Backend |
| 47 | + if backend == "" { |
| 48 | + backend = settings.ConcurrencyBackendLease |
| 49 | + } |
| 50 | + |
| 51 | + return fmt.Sprintf( |
| 52 | + "backend=%s repo=%s position=%s running=%s claimed=%s queued=%s limit=%s claimedBy=%s claimAge=%s lastDecision=%s", |
| 53 | + backend, |
| 54 | + formatQueueDebugString(d.RepoKey), |
| 55 | + formatQueueDebugInt(d.Position), |
| 56 | + formatQueueDebugInt(d.Running), |
| 57 | + formatQueueDebugInt(d.Claimed), |
| 58 | + formatQueueDebugInt(d.Queued), |
| 59 | + formatQueueDebugInt(d.Limit), |
| 60 | + formatQueueDebugString(d.ClaimedBy), |
| 61 | + formatQueueDebugDuration(d.ClaimAge), |
| 62 | + formatQueueDebugString(d.LastDecision), |
| 63 | + ) |
| 64 | +} |
| 65 | + |
| 66 | +func SyncQueueDebugAnnotations( |
| 67 | + ctx context.Context, |
| 68 | + logger *zap.SugaredLogger, |
| 69 | + tekton tektonclient.Interface, |
| 70 | + pr *tektonv1.PipelineRun, |
| 71 | + snapshot DebugSnapshot, |
| 72 | +) error { |
| 73 | + if tekton == nil || pr == nil { |
| 74 | + return nil |
| 75 | + } |
| 76 | + |
| 77 | + latest, err := tekton.TektonV1().PipelineRuns(pr.GetNamespace()).Get(ctx, pr.GetName(), metav1.GetOptions{}) |
| 78 | + if err != nil { |
| 79 | + if apierrors.IsNotFound(err) { |
| 80 | + return nil |
| 81 | + } |
| 82 | + return err |
| 83 | + } |
| 84 | + |
| 85 | + if !IsQueueOnlyAnnotationRelevant(latest) { |
| 86 | + if logger != nil { |
| 87 | + logger.Debugf( |
| 88 | + "skipping queue debug annotation update for pipelinerun %s because latest state=%s spec.status=%s done=%t cancelled=%t", |
| 89 | + PrKey(latest), latest.GetAnnotations()[keys.State], latest.Spec.Status, latest.IsDone(), latest.IsCancelled(), |
| 90 | + ) |
| 91 | + } |
| 92 | + if hasQueueDebugAnnotations(latest) { |
| 93 | + return ClearQueueDebugAnnotations(ctx, logger, tekton, latest) |
| 94 | + } |
| 95 | + return nil |
| 96 | + } |
| 97 | + |
| 98 | + summary := snapshot.Summary() |
| 99 | + currentAnnotations := latest.GetAnnotations() |
| 100 | + if currentAnnotations[keys.QueueDecision] == snapshot.LastDecision && |
| 101 | + currentAnnotations[keys.QueueDebugSummary] == summary { |
| 102 | + return nil |
| 103 | + } |
| 104 | + |
| 105 | + if logger != nil { |
| 106 | + logger.Debugf( |
| 107 | + "updating queue debug annotations for pipelinerun %s: decision=%s summary=%q", |
| 108 | + PrKey(pr), snapshot.LastDecision, summary, |
| 109 | + ) |
| 110 | + } |
| 111 | + |
| 112 | + _, err = action.PatchPipelineRun(ctx, logger, "queue debug", tekton, latest, map[string]any{ |
| 113 | + "metadata": map[string]any{ |
| 114 | + "annotations": map[string]any{ |
| 115 | + keys.QueueDecision: snapshot.LastDecision, |
| 116 | + keys.QueueDebugSummary: summary, |
| 117 | + }, |
| 118 | + }, |
| 119 | + }) |
| 120 | + return err |
| 121 | +} |
| 122 | + |
| 123 | +func ClearQueueDebugAnnotations( |
| 124 | + ctx context.Context, |
| 125 | + logger *zap.SugaredLogger, |
| 126 | + tekton tektonclient.Interface, |
| 127 | + pr *tektonv1.PipelineRun, |
| 128 | +) error { |
| 129 | + if tekton == nil || pr == nil { |
| 130 | + return nil |
| 131 | + } |
| 132 | + |
| 133 | + if !hasQueueDebugAnnotations(pr) { |
| 134 | + return nil |
| 135 | + } |
| 136 | + |
| 137 | + if logger != nil { |
| 138 | + logger.Debugf("clearing queue debug annotations for pipelinerun %s", PrKey(pr)) |
| 139 | + } |
| 140 | + |
| 141 | + _, err := action.PatchPipelineRun(ctx, logger, "queue debug cleanup", tekton, pr, map[string]any{ |
| 142 | + "metadata": map[string]any{ |
| 143 | + "annotations": map[string]any{ |
| 144 | + keys.QueueDecision: nil, |
| 145 | + keys.QueueDebugSummary: nil, |
| 146 | + }, |
| 147 | + }, |
| 148 | + }) |
| 149 | + return err |
| 150 | +} |
| 151 | + |
| 152 | +func LeaseQueueCleanupAnnotations() map[string]any { |
| 153 | + return map[string]any{ |
| 154 | + keys.QueueClaimedBy: nil, |
| 155 | + keys.QueueClaimedAt: nil, |
| 156 | + keys.QueueDecision: nil, |
| 157 | + keys.QueueDebugSummary: nil, |
| 158 | + keys.QueuePromotionRetries: nil, |
| 159 | + keys.QueuePromotionBlocked: nil, |
| 160 | + keys.QueuePromotionLastErr: nil, |
| 161 | + } |
| 162 | +} |
| 163 | + |
| 164 | +func hasQueueDebugAnnotations(pr *tektonv1.PipelineRun) bool { |
| 165 | + if pr == nil { |
| 166 | + return false |
| 167 | + } |
| 168 | + annotations := pr.GetAnnotations() |
| 169 | + return annotations[keys.QueueDecision] != "" || annotations[keys.QueueDebugSummary] != "" |
| 170 | +} |
| 171 | + |
| 172 | +func IsQueueOnlyAnnotationRelevant(pr *tektonv1.PipelineRun) bool { |
| 173 | + if pr == nil { |
| 174 | + return false |
| 175 | + } |
| 176 | + if pr.GetAnnotations()[keys.State] != kubeinteraction.StateQueued { |
| 177 | + return false |
| 178 | + } |
| 179 | + return pr.Spec.Status == tektonv1.PipelineRunSpecStatusPending && !pr.IsDone() && !pr.IsCancelled() |
| 180 | +} |
| 181 | + |
| 182 | +func LeaseQueueClaimInfo(pr *tektonv1.PipelineRun, now time.Time) (string, time.Duration) { |
| 183 | + if pr == nil { |
| 184 | + return "", unknownDuration() |
| 185 | + } |
| 186 | + |
| 187 | + annotations := pr.GetAnnotations() |
| 188 | + claimedBy := annotations[keys.QueueClaimedBy] |
| 189 | + claimedAt := annotations[keys.QueueClaimedAt] |
| 190 | + if claimedBy == "" || claimedAt == "" { |
| 191 | + return claimedBy, unknownDuration() |
| 192 | + } |
| 193 | + |
| 194 | + claimedTime, err := time.Parse(time.RFC3339Nano, claimedAt) |
| 195 | + if err != nil { |
| 196 | + return claimedBy, unknownDuration() |
| 197 | + } |
| 198 | + |
| 199 | + age := now.Sub(claimedTime) |
| 200 | + if age < 0 { |
| 201 | + age = 0 |
| 202 | + } |
| 203 | + return claimedBy, age |
| 204 | +} |
| 205 | + |
| 206 | +func formatQueueDebugInt(v int) string { |
| 207 | + if v == unknownQueueDebugValue { |
| 208 | + return "n/a" |
| 209 | + } |
| 210 | + return strconv.Itoa(v) |
| 211 | +} |
| 212 | + |
| 213 | +func formatQueueDebugString(v string) string { |
| 214 | + if v == "" { |
| 215 | + return "n/a" |
| 216 | + } |
| 217 | + return v |
| 218 | +} |
| 219 | + |
| 220 | +func formatQueueDebugDuration(v time.Duration) string { |
| 221 | + if v == unknownDuration() { |
| 222 | + return "n/a" |
| 223 | + } |
| 224 | + return v.Truncate(time.Second).String() |
| 225 | +} |
| 226 | + |
| 227 | +func unknownDuration() time.Duration { |
| 228 | + return -1 |
| 229 | +} |
0 commit comments