@@ -5,13 +5,16 @@ package watcher
55
66import (
77 "context"
8+ "fmt"
89 "testing"
910 "time"
1011
12+ "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
1113 "github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
1214 "github.com/fluxcd/cli-utils/pkg/kstatus/status"
1315 "github.com/fluxcd/cli-utils/pkg/object"
1416 "github.com/fluxcd/cli-utils/pkg/testutil"
17+ "github.com/stretchr/testify/assert"
1518 "github.com/stretchr/testify/require"
1619 appsv1 "k8s.io/api/apps/v1"
1720 v1 "k8s.io/api/core/v1"
@@ -888,3 +891,125 @@ func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured
888891 require .NoError (t , err )
889892 return mapping .Resource
890893}
894+
895+ // slowStatusReader simulates a StatusReader that takes a fixed amount of time
896+ // to compute status, mimicking the synchronous API calls made by the real
897+ // deployment/replicaset status readers when fetching generated resources.
898+ type slowStatusReader struct {
899+ delay time.Duration
900+ }
901+
902+ func (s * slowStatusReader ) Supports (schema.GroupKind ) bool {
903+ return true
904+ }
905+
906+ func (s * slowStatusReader ) ReadStatus (ctx context.Context , _ engine.ClusterReader , id object.ObjMetadata ) (* event.ResourceStatus , error ) {
907+ select {
908+ case <- time .After (s .delay ):
909+ case <- ctx .Done ():
910+ return nil , ctx .Err ()
911+ }
912+ return & event.ResourceStatus {
913+ Identifier : id ,
914+ Status : status .CurrentStatus ,
915+ Message : "Current" ,
916+ }, nil
917+ }
918+
919+ func (s * slowStatusReader ) ReadStatusForObject (ctx context.Context , _ engine.ClusterReader , obj * unstructured.Unstructured ) (* event.ResourceStatus , error ) {
920+ select {
921+ case <- time .After (s .delay ):
922+ case <- ctx .Done ():
923+ return nil , ctx .Err ()
924+ }
925+ id := object .UnstructuredToObjMetadata (obj )
926+ return & event.ResourceStatus {
927+ Identifier : id ,
928+ Status : status .CurrentStatus ,
929+ Resource : obj ,
930+ Message : "Current" ,
931+ }, nil
932+ }
933+
934+ // TestEventHandlerConcurrency verifies that the informer event handler does not
935+ // block the entire notification pipeline when status computation is slow.
936+ //
937+ // When multiple objects share the same informer (same GroupKind + namespace),
938+ // events should be processed concurrently rather than serially. With serial
939+ // processing, N objects each taking D time would result in ~N*D total latency.
940+ // With concurrent processing, total latency should be close to D.
941+ //
942+ // This test will FAIL before the fix (serial handler) and PASS after (async handler).
943+ func TestEventHandlerConcurrency (t * testing.T ) {
944+ const numPods = 10
945+ const statusDelay = 100 * time .Millisecond
946+
947+ fakeMapper := testutil .NewFakeRESTMapper (
948+ v1 .SchemeGroupVersion .WithKind ("Pod" ),
949+ )
950+
951+ pods := make ([]* unstructured.Unstructured , numPods )
952+ ids := make (object.ObjMetadataSet , numPods )
953+ for i := 0 ; i < numPods ; i ++ {
954+ pod := & unstructured.Unstructured {}
955+ pod .SetGroupVersionKind (v1 .SchemeGroupVersion .WithKind ("Pod" ))
956+ pod .SetNamespace ("default" )
957+ pod .SetName (fmt .Sprintf ("pod-%d" , i ))
958+ pods [i ] = pod
959+ ids [i ] = object .UnstructuredToObjMetadata (pod )
960+ }
961+
962+ podGVR := getGVR (t , fakeMapper , pods [0 ])
963+
964+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
965+ defer cancel ()
966+
967+ fakeClient := dynamicfake .NewSimpleDynamicClient (scheme .Scheme )
968+
969+ statusWatcher := NewDefaultStatusWatcher (fakeClient , fakeMapper )
970+ statusWatcher .StatusReader = & slowStatusReader {delay : statusDelay }
971+ statusWatcher .StatusComputeWorkers = 8
972+
973+ eventCh := statusWatcher .Watch (ctx , ids , Options {})
974+
975+ // Wait for the SyncEvent indicating the informer has started watching.
976+ syncReceived := false
977+ for e := range eventCh {
978+ if e .Type == event .SyncEvent {
979+ syncReceived = true
980+ break
981+ }
982+ }
983+ require .True (t , syncReceived , "expected SyncEvent before any resource events" )
984+
985+ // Create all pods at once to queue up events in the informer.
986+ for _ , pod := range pods {
987+ require .NoError (t , fakeClient .Tracker ().Create (podGVR , pod .DeepCopy (), pod .GetNamespace ()))
988+ }
989+
990+ // Collect all ResourceUpdateEvents and measure total elapsed time.
991+ start := time .Now ()
992+ received := 0
993+ for received < numPods {
994+ select {
995+ case e , ok := <- eventCh :
996+ if ! ok {
997+ t .Fatalf ("event channel closed after receiving %d/%d events" , received , numPods )
998+ }
999+ if e .Type == event .ResourceUpdateEvent {
1000+ received ++
1001+ }
1002+ case <- ctx .Done ():
1003+ t .Fatalf ("timed out waiting for events: received %d/%d" , received , numPods )
1004+ }
1005+ }
1006+ elapsed := time .Since (start )
1007+ cancel ()
1008+
1009+ serialTime := time .Duration (numPods ) * statusDelay
1010+ t .Logf ("Received %d events in %v (serial estimate: %v)" , numPods , elapsed , serialTime )
1011+
1012+ assert .Less (t , elapsed , serialTime / 2 ,
1013+ "Event handler appears to be blocking serially: took %v, expected well under %v for concurrent processing" ,
1014+ elapsed , serialTime / 2 )
1015+ }
0 commit comments