Skip to content

Commit 376a386

Browse files
committed
fix: opt-in async watcher status computation
The ObjectStatusReporter's informer event handler was calling readStatusFromObject synchronously, which makes API server calls (e.g., fetching ReplicaSets/Pods for Deployments). Since client-go's SharedIndexInformer processes notifications serially, this blocked the entire notification pipeline for all resources of the same GroupKind. When many resources are upgraded simultaneously (e.g., ~20 Deployments via Helm), events queue up and each resource's status is reported with a growing delay of 1-3+ minutes behind the API server's actual state. This change dispatches status computation to bounded async goroutines so the informer notification pipeline is never blocked. A sync.Map tracks the latest resource version per object; goroutines that finish after a newer event has arrived detect staleness and drop their result. By default, StatusComputeWorkers <= 1 preserves the original synchronous behavior for backward compatibility. When set to a value > 1, the async goroutine path is used. Key changes: - Split eventHandler into syncEventHandler (original serial behavior) and asyncEventHandler (bounded concurrent goroutines), selected at informer startup based on StatusComputeWorkers - AddFunc/UpdateFunc dispatch readStatusFromObject to goroutines - Bounded concurrency via semaphore (configurable StatusComputeWorkers) - Stale event detection via resource version tracking - sync.WaitGroup ensures clean shutdown before closing event channels - DeleteFunc invalidates in-flight async computations Fixes: helm/helm#31824 Signed-off-by: maplemiao <maplemiao@tencent.com> Made-with: Cursor
1 parent cadc443 commit 376a386

3 files changed

Lines changed: 390 additions & 25 deletions

File tree

pkg/kstatus/watcher/default_status_watcher.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ type DefaultStatusWatcher struct {
4343
// required for computing parent object status, to compensate for
4444
// controllers that aren't following status conventions.
4545
ClusterReader engine.ClusterReader
46+
47+
// StatusComputeWorkers is the maximum number of concurrent goroutines
48+
// used to compute object status per informer. When set to a value > 1,
49+
// status computation is dispatched to async goroutines, preventing the
50+
// informer notification pipeline from being blocked by slow API calls.
51+
// Defaults to 0 (synchronous, same as original behavior).
52+
// Set to a higher value (e.g., 8) to enable concurrent status computation.
53+
StatusComputeWorkers int
4654
}
4755

4856
var _ StatusWatcher = &DefaultStatusWatcher{}
@@ -88,13 +96,14 @@ func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadata
8896
}
8997

9098
informer := &ObjectStatusReporter{
91-
InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
92-
Mapper: w.Mapper,
93-
StatusReader: w.StatusReader,
94-
ClusterReader: w.ClusterReader,
95-
Targets: targets,
96-
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
97-
RESTScope: scope,
99+
InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
100+
Mapper: w.Mapper,
101+
StatusReader: w.StatusReader,
102+
ClusterReader: w.ClusterReader,
103+
StatusComputeWorkers: w.StatusComputeWorkers,
104+
Targets: targets,
105+
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
106+
RESTScope: scope,
98107
}
99108
return informer.Start(ctx)
100109
}

pkg/kstatus/watcher/default_status_watcher_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ package watcher
55

66
import (
77
"context"
8+
"fmt"
89
"testing"
910
"time"
1011

12+
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
1113
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
1214
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
1315
"github.com/fluxcd/cli-utils/pkg/object"
1416
"github.com/fluxcd/cli-utils/pkg/testutil"
17+
"github.com/stretchr/testify/assert"
1518
"github.com/stretchr/testify/require"
1619
appsv1 "k8s.io/api/apps/v1"
1720
v1 "k8s.io/api/core/v1"
@@ -888,3 +891,125 @@ func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured
888891
require.NoError(t, err)
889892
return mapping.Resource
890893
}
894+
895+
// slowStatusReader simulates a StatusReader that takes a fixed amount of time
896+
// to compute status, mimicking the synchronous API calls made by the real
897+
// deployment/replicaset status readers when fetching generated resources.
898+
type slowStatusReader struct {
899+
delay time.Duration
900+
}
901+
902+
func (s *slowStatusReader) Supports(schema.GroupKind) bool {
903+
return true
904+
}
905+
906+
func (s *slowStatusReader) ReadStatus(ctx context.Context, _ engine.ClusterReader, id object.ObjMetadata) (*event.ResourceStatus, error) {
907+
select {
908+
case <-time.After(s.delay):
909+
case <-ctx.Done():
910+
return nil, ctx.Err()
911+
}
912+
return &event.ResourceStatus{
913+
Identifier: id,
914+
Status: status.CurrentStatus,
915+
Message: "Current",
916+
}, nil
917+
}
918+
919+
func (s *slowStatusReader) ReadStatusForObject(ctx context.Context, _ engine.ClusterReader, obj *unstructured.Unstructured) (*event.ResourceStatus, error) {
920+
select {
921+
case <-time.After(s.delay):
922+
case <-ctx.Done():
923+
return nil, ctx.Err()
924+
}
925+
id := object.UnstructuredToObjMetadata(obj)
926+
return &event.ResourceStatus{
927+
Identifier: id,
928+
Status: status.CurrentStatus,
929+
Resource: obj,
930+
Message: "Current",
931+
}, nil
932+
}
933+
934+
// TestEventHandlerConcurrency verifies that the informer event handler does not
935+
// block the entire notification pipeline when status computation is slow.
936+
//
937+
// When multiple objects share the same informer (same GroupKind + namespace),
938+
// events should be processed concurrently rather than serially. With serial
939+
// processing, N objects each taking D time would result in ~N*D total latency.
940+
// With concurrent processing, total latency should be close to D.
941+
//
942+
// This test will FAIL before the fix (serial handler) and PASS after (async handler).
943+
func TestEventHandlerConcurrency(t *testing.T) {
944+
const numPods = 10
945+
const statusDelay = 100 * time.Millisecond
946+
947+
fakeMapper := testutil.NewFakeRESTMapper(
948+
v1.SchemeGroupVersion.WithKind("Pod"),
949+
)
950+
951+
pods := make([]*unstructured.Unstructured, numPods)
952+
ids := make(object.ObjMetadataSet, numPods)
953+
for i := 0; i < numPods; i++ {
954+
pod := &unstructured.Unstructured{}
955+
pod.SetGroupVersionKind(v1.SchemeGroupVersion.WithKind("Pod"))
956+
pod.SetNamespace("default")
957+
pod.SetName(fmt.Sprintf("pod-%d", i))
958+
pods[i] = pod
959+
ids[i] = object.UnstructuredToObjMetadata(pod)
960+
}
961+
962+
podGVR := getGVR(t, fakeMapper, pods[0])
963+
964+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
965+
defer cancel()
966+
967+
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
968+
969+
statusWatcher := NewDefaultStatusWatcher(fakeClient, fakeMapper)
970+
statusWatcher.StatusReader = &slowStatusReader{delay: statusDelay}
971+
statusWatcher.StatusComputeWorkers = 8
972+
973+
eventCh := statusWatcher.Watch(ctx, ids, Options{})
974+
975+
// Wait for the SyncEvent indicating the informer has started watching.
976+
syncReceived := false
977+
for e := range eventCh {
978+
if e.Type == event.SyncEvent {
979+
syncReceived = true
980+
break
981+
}
982+
}
983+
require.True(t, syncReceived, "expected SyncEvent before any resource events")
984+
985+
// Create all pods at once to queue up events in the informer.
986+
for _, pod := range pods {
987+
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod.DeepCopy(), pod.GetNamespace()))
988+
}
989+
990+
// Collect all ResourceUpdateEvents and measure total elapsed time.
991+
start := time.Now()
992+
received := 0
993+
for received < numPods {
994+
select {
995+
case e, ok := <-eventCh:
996+
if !ok {
997+
t.Fatalf("event channel closed after receiving %d/%d events", received, numPods)
998+
}
999+
if e.Type == event.ResourceUpdateEvent {
1000+
received++
1001+
}
1002+
case <-ctx.Done():
1003+
t.Fatalf("timed out waiting for events: received %d/%d", received, numPods)
1004+
}
1005+
}
1006+
elapsed := time.Since(start)
1007+
cancel()
1008+
1009+
serialTime := time.Duration(numPods) * statusDelay
1010+
t.Logf("Received %d events in %v (serial estimate: %v)", numPods, elapsed, serialTime)
1011+
1012+
assert.Less(t, elapsed, serialTime/2,
1013+
"Event handler appears to be blocking serially: took %v, expected well under %v for concurrent processing",
1014+
elapsed, serialTime/2)
1015+
}

0 commit comments

Comments
 (0)