diff --git a/pkg/informer/transform/transform.go b/pkg/informer/transform/transform.go new file mode 100644 index 0000000000..0c74c45ac7 --- /dev/null +++ b/pkg/informer/transform/transform.go @@ -0,0 +1,97 @@ +// Package transform provides cache transform functions for reducing memory +// usage in the PAC watcher informer caches. +// +// Transform functions are applied to objects before they are stored in the +// informer cache, allowing us to strip large, unnecessary fields while +// preserving the data needed for reconciliation. +// +// DEVELOPER WARNING: +// If you add new reconciliation logic that reads a field from cached objects +// (via listers), you MUST verify that field is not stripped by these transforms. +// Fields stripped from cached objects will be nil/empty even though they exist +// in etcd. If you need a stripped field, fetch the full object via the API +// client instead of the lister. +package transform + +import ( + pacv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1" + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "k8s.io/client-go/tools/cache" +) + +// RepositoryForCache strips fields from Repository objects before they are +// stored in the informer cache to reduce memory usage. +// +// Fields stripped: +// - ManagedFields: written by the API server, not needed for reconciliation +// - Annotations: no reconciler logic reads Repository annotations from the +// lister; the largest annotation is kubectl.kubernetes.io/last-applied-configuration +// - Status: the reconciler always fetches Repository.Status via a direct API +// call before updating it; it is never read from the lister +func RepositoryForCache(obj any) (any, error) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + transformed, err := RepositoryForCache(tombstone.Obj) + if err != nil { + return obj, nil //nolint:nilerr // return original on error for graceful degradation + } + return cache.DeletedFinalStateUnknown{Key: tombstone.Key, Obj: transformed}, nil + } + + repo, ok := obj.(*pacv1alpha1.Repository) + if !ok { + return obj, nil + } + + repo.ManagedFields = nil + repo.Annotations = nil + repo.Status = nil + + return repo, nil +} + +// PipelineRunForCache strips fields from PipelineRun objects before they are +// stored in the informer cache to reduce memory usage. +// +// Fields the PAC watcher reads from cached PipelineRuns: +// - ObjectMeta: name, namespace, labels, annotations (PAC state/repo keys), +// finalizers, deletionTimestamp +// - Spec.Status: checked for PipelineRunSpecStatusPending +// - Status.Conditions: checked for completion state and reason +// - Status.StartTime, Status.CompletionTime: used for metrics +// +// All other Spec and Status fields are stripped. When the reconciler needs +// the full object (e.g. postFinalStatus, GetStatusFromTaskStatusOrFromAsking), +// it fetches it directly from the API server. +func PipelineRunForCache(obj any) (any, error) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + transformed, err := PipelineRunForCache(tombstone.Obj) + if err != nil { + return obj, nil //nolint:nilerr // return original on error for graceful degradation + } + return cache.DeletedFinalStateUnknown{Key: tombstone.Key, Obj: transformed}, nil + } + + pr, ok := obj.(*tektonv1.PipelineRun) + if !ok { + return obj, nil + } + + pr.ManagedFields = nil + + // Strip large Spec fields — watcher only checks Spec.Status (pending state) + pr.Spec.PipelineRef = nil + pr.Spec.PipelineSpec = nil + pr.Spec.Params = nil + pr.Spec.Workspaces = nil + pr.Spec.TaskRunSpecs = nil + pr.Spec.TaskRunTemplate = tektonv1.PipelineTaskRunTemplate{} + pr.Spec.Timeouts = nil + + // Strip large Status fields — watcher only reads Conditions, StartTime, CompletionTime + pr.Status.PipelineSpec = nil + pr.Status.ChildReferences = nil + pr.Status.Provenance = nil + pr.Status.SpanContext = nil + + return pr, nil +} diff --git a/pkg/informer/transform/transform_benchmark_test.go b/pkg/informer/transform/transform_benchmark_test.go new file mode 100644 index 0000000000..e7515a25de --- /dev/null +++ b/pkg/informer/transform/transform_benchmark_test.go @@ -0,0 +1,494 @@ +package transform + +import ( + "encoding/json" + "fmt" + "runtime" + "testing" + "time" + + pacv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1" + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +// createRealisticRepository creates a Repository with realistic field sizes +// that would be seen in production environments. +func createRealisticRepository(name string) *pacv1alpha1.Repository { + // Realistic managedFields (typically 500-2000 bytes). + fieldsV1Data1 := []byte(`{"f:metadata":{"f:annotations":{"f:kubectl.kubernetes.io/last-applied-configuration":{}},"f:labels":{}},"f:spec":{"f:url":{},"f:git_provider":{"f:url":{},"f:type":{},"f:secret":{"f:name":{},"f:key":{}},"f:webhook_secret":{"f:name":{},"f:key":{}}},"f:settings":{"f:pipelinerun_provenance":{},"f:policy":{"f:ok_to_test":{},"f:pull_request":{}}},"f:params":{}}}`) + fieldsV1Data2 := []byte(`{"f:pipelinerun_status":{}}`) + + now := metav1.NewTime(time.Now()) + + // Realistic last-applied-configuration value: the applied Repository spec as JSON (typically 500-2000 bytes). + lastAppliedJSON := []byte(`{"apiVersion":"pipelinesascode.tekton.dev/v1alpha1","kind":"Repository","metadata":{"name":"` + name + `","namespace":"pipelines"},"spec":{"url":"https://github.com/org/repo","git_provider":{"url":"https://api.github.com","type":"github","secret":{"name":"github-token","key":"token"},"webhook_secret":{"name":"github-webhook-secret","key":"secret"}},"settings":{"pipelinerun_provenance":"source","policy":{"ok_to_test":["user1","user2","user3"],"pull_request":["user4","user5"]}},"params":[{"name":"deploy-env","value":"staging"},{"name":"registry","value":"quay.io/myorg"},{"name":"image-tag","value":"latest"}]}}`) + + // PAC stores the last 5 pipeline run statuses in the Repository status. + status := make([]pacv1alpha1.RepositoryRunStatus, 5) + for i := range 5 { + sha := fmt.Sprintf("abc%d%06d", i, i) + shaURL := fmt.Sprintf("https://github.com/org/repo/commit/%s", sha) + logURL := fmt.Sprintf("https://console.example.com/k8s/ns/pipelines/tekton.dev~v1~PipelineRun/pr-%s/logs", sha) + branch := "main" + eventType := "push" + title := fmt.Sprintf("feat: add feature number %d", i) + taskInfos := map[string]pacv1alpha1.TaskInfos{ + "build": { + Name: "build", + DisplayName: "Build Image", + Reason: "Error", + Message: "container image build failed with exit code 1", + LogSnippet: "error: exit status 1\nbuild failed: cannot find package", + }, + } + status[i] = pacv1alpha1.RepositoryRunStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: "True", + Reason: "Succeeded", + }}, + }, + PipelineRunName: fmt.Sprintf("pr-%s", sha), + StartTime: &now, + CompletionTime: &now, + SHA: &sha, + SHAURL: &shaURL, + Title: &title, + LogURL: &logURL, + TargetBranch: &branch, + EventType: &eventType, + CollectedTaskInfos: &taskInfos, + } + } + + return &pacv1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "pipelines", + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "kubectl-client-side-apply", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "pipelinesascode.tekton.dev/v1alpha1", + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{Raw: fieldsV1Data1}, + }, + { + Manager: "pipelines-as-code-controller", + Operation: metav1.ManagedFieldsOperationUpdate, + APIVersion: "pipelinesascode.tekton.dev/v1alpha1", + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{Raw: fieldsV1Data2}, + }, + }, + Annotations: map[string]string{ + "kubectl.kubernetes.io/last-applied-configuration": string(lastAppliedJSON), + "app.kubernetes.io/managed-by": "pipelines-as-code", + }, + Labels: map[string]string{ + "app.kubernetes.io/part-of": "pipelines-as-code", + }, + }, + Spec: pacv1alpha1.RepositorySpec{ + URL: "https://github.com/org/repo", + GitProvider: &pacv1alpha1.GitProvider{ + URL: "https://api.github.com", + Type: "github", + Secret: &pacv1alpha1.Secret{ + Name: "github-token", + Key: "token", + }, + WebhookSecret: &pacv1alpha1.Secret{ + Name: "github-webhook-secret", + Key: "secret", + }, + }, + Settings: &pacv1alpha1.Settings{ + PipelineRunProvenance: "source", + Policy: &pacv1alpha1.Policy{ + OkToTest: []string{"user1", "user2", "user3"}, + PullRequest: []string{"user4", "user5"}, + }, + }, + Params: &[]pacv1alpha1.Params{ + {Name: "deploy-env", Value: "staging"}, + {Name: "registry", Value: "quay.io/myorg"}, + {Name: "image-tag", Value: "latest"}, + }, + }, + Status: status, + } +} + +// measureObjectSize returns the approximate JSON size of an object in bytes. +func measureObjectSize(obj any) int { + data, _ := json.Marshal(obj) //nolint:errchkjson // test helper + return len(data) +} + +// BenchmarkRepoTransformMemorySavings benchmarks the transform function +// and reports the JSON size reduction it achieves. +func BenchmarkRepoTransformMemorySavings(b *testing.B) { //nolint:dupl // parallel structure for Repo and PipelineRun benchmarks is intentional + original := createRealisticRepository("benchmark-repo") + originalSize := measureObjectSize(original) + + b.Run("Original", func(b *testing.B) { + b.ReportAllocs() + for i := range b.N { + repo := createRealisticRepository(fmt.Sprintf("repo-%d", i)) + _ = measureObjectSize(repo) + } + }) + + b.Run("Transformed", func(b *testing.B) { + b.ReportAllocs() + for i := range b.N { + repo := createRealisticRepository(fmt.Sprintf("repo-%d", i)) + transformed, _ := RepositoryForCache(repo) + _ = measureObjectSize(transformed) + } + }) + + transformed, _ := RepositoryForCache(original.DeepCopy()) + transformedSize := measureObjectSize(transformed) + reduction := float64(originalSize-transformedSize) / float64(originalSize) * 100 + b.Logf("Original size: %d bytes", originalSize) + b.Logf("Transformed size: %d bytes", transformedSize) + b.Logf("Reduction: %.1f%%", reduction) +} + +// BenchmarkRepoTransformCacheMemoryUsage simulates caching many Repositories +// and measures the heap impact with and without the transform applied. +func BenchmarkRepoTransformCacheMemoryUsage(b *testing.B) { //nolint:dupl // parallel structure for Repo and PipelineRun benchmarks is intentional + const numObjects = 1000 + + b.Run("WithoutTransform", func(b *testing.B) { + b.ReportAllocs() + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + repos := make([]*pacv1alpha1.Repository, numObjects) + for i := range numObjects { + repos[i] = createRealisticRepository(fmt.Sprintf("repo-%d", i)) + } + + runtime.GC() + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + memUsed := memAfter.HeapAlloc - memBefore.HeapAlloc + b.Logf("Memory for %d Repositories (no transform): %d bytes (%.2f KB each)", + numObjects, memUsed, float64(memUsed)/float64(numObjects)/1024) + + runtime.KeepAlive(repos) + }) + + b.Run("WithTransform", func(b *testing.B) { + b.ReportAllocs() + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + repos := make([]any, numObjects) + for i := range numObjects { + repo := createRealisticRepository(fmt.Sprintf("repo-%d", i)) + repos[i], _ = RepositoryForCache(repo) + } + + runtime.GC() + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + memUsed := memAfter.HeapAlloc - memBefore.HeapAlloc + b.Logf("Memory for %d Repositories (with transform): %d bytes (%.2f KB each)", + numObjects, memUsed, float64(memUsed)/float64(numObjects)/1024) + + runtime.KeepAlive(repos) + }) +} + +// TestMeasureRepoTransformSavings reports the memory savings from +// the cache transform. Run with: go test -v -run TestMeasureRepoTransformSavings. +func TestMeasureRepoTransformSavings(t *testing.T) { + original := createRealisticRepository("test-repo") + transformed, _ := RepositoryForCache(original.DeepCopy()) + + originalSize := measureObjectSize(original) + transformedSize := measureObjectSize(transformed) + + if originalSize == 0 { + t.Fatal("Original size is 0, JSON marshaling failed") + } + + jsonSavings := float64(originalSize-transformedSize) / float64(originalSize) * 100 + + t.Logf("\n=== JSON Size Report ===") + t.Logf("Original JSON size: %d bytes", originalSize) + t.Logf("Transformed JSON size: %d bytes", transformedSize) + t.Logf("Size saved: %d bytes (%.1f%%)", originalSize-transformedSize, jsonSavings) + t.Logf("=======================") + + transformedRepo, _ := transformed.(*pacv1alpha1.Repository) + t.Logf("\n=== Fields Stripped ===") + t.Logf("ManagedFields: %v (was %d entries)", transformedRepo.ManagedFields == nil, len(original.ManagedFields)) + t.Logf("Annotations: %v (was %d entries)", transformedRepo.Annotations == nil, len(original.Annotations)) + t.Logf("Status: %v (was %d entries)", transformedRepo.Status == nil, len(original.Status)) + t.Logf("=======================") + + t.Logf("\n=== Individual Field Sizes ===") + t.Logf("ManagedFields: %d bytes", measureObjectSize(original.ManagedFields)) + t.Logf("Annotations: %d bytes", measureObjectSize(original.Annotations)) + t.Logf("Status: %d bytes", measureObjectSize(original.Status)) + t.Logf("==============================") + + if jsonSavings < 20 { + t.Errorf("Expected at least 20%% size reduction, got %.1f%%", jsonSavings) + } +} + +// createRealisticPipelineRun creates a PipelineRun with realistic field sizes +// matching production data (must-gather avg: ~36KB per PR without managedFields; +// status.pipelineSpec is the dominant field at ~20KB each). +func createRealisticPipelineRun(name string) *tektonv1.PipelineRun { + now := metav1.NewTime(time.Now()) + + fieldsV1Data1 := []byte(`{"f:metadata":{"f:annotations":{"f:pipelinesascode.tekton.dev/state":{},"f:pipelinesascode.tekton.dev/repository":{}},"f:labels":{}},"f:spec":{"f:pipelineRef":{"f:name":{}},"f:params":{},"f:workspaces":{}}}`) + fieldsV1Data2 := []byte(`{"f:status":{"f:conditions":{},"f:pipelineSpec":{},"f:childReferences":{},"f:startTime":{},"f:completionTime":{},"f:provenance":{},"f:spanContext":{}}}`) + + // Build a realistic PipelineSpec (~20KB) — each PR stores the full pipeline + // definition in status.pipelineSpec as it was at execution time. + tasks := make([]tektonv1.PipelineTask, 15) + for i := range 15 { + tasks[i] = tektonv1.PipelineTask{ + Name: fmt.Sprintf("task-%02d", i), + TaskRef: &tektonv1.TaskRef{ + Name: fmt.Sprintf("my-task-%02d", i), + }, + Params: tektonv1.Params{ + {Name: "image", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "$(params.image)"}}, + {Name: "revision", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "$(params.revision)"}}, + {Name: "context", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: fmt.Sprintf("/workspace/source/service-%02d", i)}}, + }, + } + if i > 0 { + tasks[i].RunAfter = []string{fmt.Sprintf("task-%02d", i-1)} + } + } + pipelineSpec := &tektonv1.PipelineSpec{ + Description: "Production pipeline with build, test, and deploy stages for a microservices application", + Params: []tektonv1.ParamSpec{ + {Name: "image", Type: tektonv1.ParamTypeString}, + {Name: "revision", Type: tektonv1.ParamTypeString}, + {Name: "repo-url", Type: tektonv1.ParamTypeString}, + {Name: "deploy-env", Type: tektonv1.ParamTypeString, Default: &tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "staging"}}, + }, + Tasks: tasks, + } + + // Realistic ChildReferences (one per task run) + childRefs := make([]tektonv1.ChildStatusReference, len(tasks)) + for i, task := range tasks { + childRefs[i] = tektonv1.ChildStatusReference{ + Name: fmt.Sprintf("%s-taskrun-%d", task.Name, i), + DisplayName: task.Name, + PipelineTaskName: task.Name, + } + } + + return &tektonv1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "pipelines", + Annotations: map[string]string{ + "pipelinesascode.tekton.dev/state": "completed", + "pipelinesascode.tekton.dev/repository": "my-repo", + "pipelinesascode.tekton.dev/sha": "abc123def456", + "kubectl.kubernetes.io/last-applied-configuration": `{"apiVersion":"tekton.dev/v1","kind":"PipelineRun","metadata":{"name":"` + name + `"}}`, + }, + Labels: map[string]string{ + "pipelinesascode.tekton.dev/repository": "my-repo", + "tekton.dev/pipeline": "my-pipeline", + }, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "kubectl-client-side-apply", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "tekton.dev/v1", + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{Raw: fieldsV1Data1}, + }, + { + Manager: "tekton-pipelines-controller", + Operation: metav1.ManagedFieldsOperationUpdate, + APIVersion: "tekton.dev/v1", + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{Raw: fieldsV1Data2}, + }, + }, + }, + Spec: tektonv1.PipelineRunSpec{ + PipelineRef: &tektonv1.PipelineRef{Name: "my-pipeline"}, + PipelineSpec: pipelineSpec, + Params: tektonv1.Params{ + {Name: "image", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "quay.io/myorg/myapp:abc123"}}, + {Name: "revision", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "abc123def456"}}, + {Name: "repo-url", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "https://github.com/org/repo"}}, + {Name: "deploy-env", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "production"}}, + }, + Workspaces: []tektonv1.WorkspaceBinding{ + {Name: "source", EmptyDir: &corev1.EmptyDirVolumeSource{}}, + {Name: "cache", EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }, + }, + Status: tektonv1.PipelineRunStatus{ + PipelineRunStatusFields: tektonv1.PipelineRunStatusFields{ + StartTime: &now, + CompletionTime: &now, + PipelineSpec: pipelineSpec, + ChildReferences: childRefs, + Provenance: &tektonv1.Provenance{ + RefSource: &tektonv1.RefSource{ + URI: "https://github.com/org/repo.git", + Digest: map[string]string{"sha1": "abc123def456"}, + }, + }, + SpanContext: map[string]string{"traceID": "abc123", "spanID": "def456"}, + }, + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: "True", + Reason: "Succeeded", + }}, + }, + }, + } +} + +// BenchmarkPipelineRunTransformMemorySavings benchmarks the transform function +// and reports the JSON size reduction it achieves. +func BenchmarkPipelineRunTransformMemorySavings(b *testing.B) { //nolint:dupl // parallel structure for Repo and PipelineRun benchmarks is intentional + original := createRealisticPipelineRun("benchmark-pr") + originalSize := measureObjectSize(original) + + b.Run("Original", func(b *testing.B) { + b.ReportAllocs() + for i := range b.N { + pr := createRealisticPipelineRun(fmt.Sprintf("pr-%d", i)) + _ = measureObjectSize(pr) + } + }) + + b.Run("Transformed", func(b *testing.B) { + b.ReportAllocs() + for i := range b.N { + pr := createRealisticPipelineRun(fmt.Sprintf("pr-%d", i)) + transformed, _ := PipelineRunForCache(pr) + _ = measureObjectSize(transformed) + } + }) + + transformed, _ := PipelineRunForCache(original.DeepCopy()) + transformedSize := measureObjectSize(transformed) + reduction := float64(originalSize-transformedSize) / float64(originalSize) * 100 + b.Logf("Original size: %d bytes", originalSize) + b.Logf("Transformed size: %d bytes", transformedSize) + b.Logf("Reduction: %.1f%%", reduction) +} + +// BenchmarkPipelineRunTransformCacheMemoryUsage simulates caching many +// PipelineRuns and measures the heap impact with and without the transform. +func BenchmarkPipelineRunTransformCacheMemoryUsage(b *testing.B) { + const numObjects = 683 // matches production must-gather count + + b.Run("WithoutTransform", func(b *testing.B) { + b.ReportAllocs() + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + prs := make([]*tektonv1.PipelineRun, numObjects) + for i := range numObjects { + prs[i] = createRealisticPipelineRun(fmt.Sprintf("pr-%d", i)) + } + + runtime.GC() + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + memUsed := memAfter.HeapAlloc - memBefore.HeapAlloc + b.Logf("Memory for %d PipelineRuns (no transform): %d bytes (%.1f KB each)", + numObjects, memUsed, float64(memUsed)/float64(numObjects)/1024) + + runtime.KeepAlive(prs) + }) + + b.Run("WithTransform", func(b *testing.B) { + b.ReportAllocs() + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + prs := make([]any, numObjects) + for i := range numObjects { + pr := createRealisticPipelineRun(fmt.Sprintf("pr-%d", i)) + prs[i], _ = PipelineRunForCache(pr) + } + + runtime.GC() + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + memUsed := memAfter.HeapAlloc - memBefore.HeapAlloc + b.Logf("Memory for %d PipelineRuns (with transform): %d bytes (%.1f KB each)", + numObjects, memUsed, float64(memUsed)/float64(numObjects)/1024) + + runtime.KeepAlive(prs) + }) +} + +// TestMeasurePipelineRunTransformSavings reports the memory savings from +// the cache transform. Run with: go test -v -run TestMeasurePipelineRunTransformSavings. +func TestMeasurePipelineRunTransformSavings(t *testing.T) { + original := createRealisticPipelineRun("test-pr") + transformed, _ := PipelineRunForCache(original.DeepCopy()) + + originalSize := measureObjectSize(original) + transformedSize := measureObjectSize(transformed) + + if originalSize == 0 { + t.Fatal("Original size is 0, JSON marshaling failed") + } + + jsonSavings := float64(originalSize-transformedSize) / float64(originalSize) * 100 + + t.Logf("\n=== JSON Size Report ===") + t.Logf("Original JSON size: %d bytes", originalSize) + t.Logf("Transformed JSON size: %d bytes", transformedSize) + t.Logf("Size saved: %d bytes (%.1f%%)", originalSize-transformedSize, jsonSavings) + t.Logf("=======================") + + t.Logf("\n=== Individual Field Sizes ===") + t.Logf("ManagedFields: %d bytes", measureObjectSize(original.ManagedFields)) + t.Logf("Status.PipelineSpec: %d bytes", measureObjectSize(original.Status.PipelineSpec)) + t.Logf("Spec.PipelineSpec: %d bytes", measureObjectSize(original.Spec.PipelineSpec)) + t.Logf("Status.ChildReferences: %d bytes", measureObjectSize(original.Status.ChildReferences)) + t.Logf("Spec.Params: %d bytes", measureObjectSize(original.Spec.Params)) + t.Logf("Spec.Workspaces: %d bytes", measureObjectSize(original.Spec.Workspaces)) + t.Logf("Status.Provenance: %d bytes", measureObjectSize(original.Status.Provenance)) + t.Logf("==============================") + + if jsonSavings < 50 { + t.Errorf("Expected at least 50%% size reduction, got %.1f%%", jsonSavings) + } +} diff --git a/pkg/informer/transform/transform_test.go b/pkg/informer/transform/transform_test.go new file mode 100644 index 0000000000..d7b9df84a3 --- /dev/null +++ b/pkg/informer/transform/transform_test.go @@ -0,0 +1,189 @@ +package transform + +import ( + "testing" + + pacv1alpha1 "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1" + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "gotest.tools/v3/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +func makeRepo(annotations map[string]string, managedFields []metav1.ManagedFieldsEntry, status []pacv1alpha1.RepositoryRunStatus) *pacv1alpha1.Repository { + return &pacv1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-repo", + Namespace: "test-ns", + Annotations: annotations, + ManagedFields: managedFields, + }, + Spec: pacv1alpha1.RepositorySpec{URL: "https://github.com/org/repo"}, + Status: status, + } +} + +func TestRepositoryForCache(t *testing.T) { + tests := []struct { + name string + input any + wantStatusNil bool + wantSpecURL string + }{ + { + name: "strips managedFields, annotations, and status, keeps spec", + input: makeRepo( + map[string]string{"keep-me": "yes"}, + []metav1.ManagedFieldsEntry{{Manager: "kubectl"}}, + []pacv1alpha1.RepositoryRunStatus{{PipelineRunName: "pr-1"}}, + ), + wantStatusNil: true, + wantSpecURL: "https://github.com/org/repo", + }, + { + name: "nil annotations handled safely", + input: makeRepo( + nil, + []metav1.ManagedFieldsEntry{{Manager: "controller"}}, + nil, + ), + wantStatusNil: true, + wantSpecURL: "https://github.com/org/repo", + }, + { + name: "non-Repository object passed through unchanged", + input: struct{ Name string }{"something-else"}, + }, + { + name: "tombstone wrapping Repository is unwrapped and transformed", + input: cache.DeletedFinalStateUnknown{ + Key: "test-ns/test-repo", + Obj: makeRepo( + map[string]string{"kubectl.kubernetes.io/last-applied-configuration": "data"}, + []metav1.ManagedFieldsEntry{{Manager: "kubectl"}}, + []pacv1alpha1.RepositoryRunStatus{{PipelineRunName: "pr-1"}}, + ), + }, + wantStatusNil: true, + wantSpecURL: "https://github.com/org/repo", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := RepositoryForCache(tt.input) + assert.NilError(t, err) + + switch v := result.(type) { + case *pacv1alpha1.Repository: + assert.Assert(t, v.ManagedFields == nil, "ManagedFields should be nil") + assert.Assert(t, v.Annotations == nil, "Annotations should be nil") + assert.Equal(t, v.Spec.URL, tt.wantSpecURL) + if tt.wantStatusNil { + assert.Assert(t, v.Status == nil, "Status should be nil") + } + case cache.DeletedFinalStateUnknown: + repo, ok := v.Obj.(*pacv1alpha1.Repository) + assert.Assert(t, ok, "tombstone Obj should be *Repository") + assert.Assert(t, repo.ManagedFields == nil, "ManagedFields should be nil after tombstone transform") + assert.Assert(t, repo.Annotations == nil, "Annotations should be nil after tombstone transform") + assert.Assert(t, repo.Status == nil, "Status should be nil after tombstone transform") + assert.Equal(t, repo.Spec.URL, tt.wantSpecURL) + default: + // non-Repository pass-through: just verify no error + } + }) + } +} + +func makePR(annotations map[string]string, managedFields []metav1.ManagedFieldsEntry) *tektonv1.PipelineRun { + pipelineSpec := &tektonv1.PipelineSpec{Description: "full spec"} + return &tektonv1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pr-1", + Namespace: "test-ns", + Annotations: annotations, + ManagedFields: managedFields, + }, + Spec: tektonv1.PipelineRunSpec{ + PipelineRef: &tektonv1.PipelineRef{Name: "my-pipeline"}, + PipelineSpec: pipelineSpec, + Params: tektonv1.Params{{Name: "key", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: "val"}}}, + Workspaces: []tektonv1.WorkspaceBinding{{Name: "ws"}}, + Timeouts: &tektonv1.TimeoutFields{}, + }, + Status: tektonv1.PipelineRunStatus{ + PipelineRunStatusFields: tektonv1.PipelineRunStatusFields{ + PipelineSpec: pipelineSpec, + ChildReferences: []tektonv1.ChildStatusReference{{Name: "tr-1"}}, + SpanContext: map[string]string{"traceID": "abc"}, + }, + }, + } +} + +func TestPipelineRunForCache(t *testing.T) { + tests := []struct { + name string + input any + }{ + { + name: "strips large spec and status fields, keeps conditions and timing", + input: makePR( + map[string]string{"pipelinesascode.tekton.dev/state": "started"}, + []metav1.ManagedFieldsEntry{{Manager: "kubectl"}}, + ), + }, + { + name: "non-PipelineRun passed through unchanged", + input: struct{ Name string }{"something"}, + }, + { + name: "tombstone wrapping PipelineRun is unwrapped and transformed", + input: cache.DeletedFinalStateUnknown{ + Key: "test-ns/pr-1", + Obj: makePR( + map[string]string{"pipelinesascode.tekton.dev/state": "started"}, + []metav1.ManagedFieldsEntry{{Manager: "kubectl"}}, + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := PipelineRunForCache(tt.input) + assert.NilError(t, err) + + checkPR := func(pr *tektonv1.PipelineRun) { + assert.Assert(t, pr.ManagedFields == nil, "ManagedFields should be nil") + // annotations preserved (PAC reads state/repo annotations from cache) + assert.Assert(t, pr.Annotations != nil, "Annotations should be preserved") + // large spec fields stripped + assert.Assert(t, pr.Spec.PipelineRef == nil, "Spec.PipelineRef should be nil") + assert.Assert(t, pr.Spec.PipelineSpec == nil, "Spec.PipelineSpec should be nil") + assert.Assert(t, pr.Spec.Params == nil, "Spec.Params should be nil") + assert.Assert(t, pr.Spec.Workspaces == nil, "Spec.Workspaces should be nil") + assert.Assert(t, pr.Spec.Timeouts == nil, "Spec.Timeouts should be nil") + // large status fields stripped + assert.Assert(t, pr.Status.PipelineSpec == nil, "Status.PipelineSpec should be nil") + assert.Assert(t, pr.Status.ChildReferences == nil, "Status.ChildReferences should be nil") + assert.Assert(t, pr.Status.SpanContext == nil, "Status.SpanContext should be nil") + // name and namespace preserved + assert.Equal(t, pr.Name, "pr-1") + assert.Equal(t, pr.Namespace, "test-ns") + } + + switch v := result.(type) { + case *tektonv1.PipelineRun: + checkPR(v) + case cache.DeletedFinalStateUnknown: + pr, ok := v.Obj.(*tektonv1.PipelineRun) + assert.Assert(t, ok, "tombstone Obj should be *PipelineRun") + checkPR(pr) + default: + // non-PipelineRun pass-through + } + }) + } +} diff --git a/pkg/reconciler/controller.go b/pkg/reconciler/controller.go index b00881a321..eaad4fe046 100644 --- a/pkg/reconciler/controller.go +++ b/pkg/reconciler/controller.go @@ -8,6 +8,7 @@ import ( "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys" "github.com/openshift-pipelines/pipelines-as-code/pkg/events" "github.com/openshift-pipelines/pipelines-as-code/pkg/generated/injection/informers/pipelinesascode/v1alpha1/repository" + "github.com/openshift-pipelines/pipelines-as-code/pkg/informer/transform" "github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction" "github.com/openshift-pipelines/pipelines-as-code/pkg/params" "github.com/openshift-pipelines/pipelines-as-code/pkg/params/info" @@ -43,7 +44,16 @@ func NewController() func(context.Context, configmap.Watcher) *controller.Impl { // Start pac config syncer go params.StartConfigSync(ctx, run) + repoInformer := repository.Get(ctx) + if err := repoInformer.Informer().SetTransform(transform.RepositoryForCache); err != nil { + log.Fatal("failed to set transform on repository informer: ", err) + } + pipelineRunInformer := tektonPipelineRunInformerv1.Get(ctx) + if err := pipelineRunInformer.Informer().SetTransform(transform.PipelineRunForCache); err != nil { + log.Fatal("failed to set transform on pipelinerun informer: ", err) + } + metrics, err := prmetrics.NewRecorder() if err != nil { log.Fatalf("Failed to create pipeline as code metrics recorder %v", err) @@ -53,7 +63,7 @@ func NewController() func(context.Context, configmap.Watcher) *controller.Impl { run: run, kinteract: kinteract, pipelineRunLister: pipelineRunInformer.Lister(), - repoLister: repository.Get(ctx).Lister(), + repoLister: repoInformer.Lister(), qm: queuepkg.NewManager(run.Clients.Log), metrics: metrics, eventEmitter: events.NewEventEmitter(run.Clients.Kube, run.Clients.Log),