Skip to content

Commit 11e1a6c

Browse files
committed
client/resource_group: fix race between cleanup and getOrCreate for request-source metrics state
Use LoadAndDelete in cleanupRequestSourceMetricsState so the map entry is removed atomically before the state is closed. Any hot-path goroutine still holding the old reference no-ops via the closed check, and the next getOrCreateRequestSourceMetricsState allocates a fresh state instead of returning a closed one. Addresses rleungx's review comment. Signed-off-by: Yuhao Zhang <yhzhang00@outlook.com>
1 parent 3fe18d6 commit 11e1a6c

File tree

3 files changed

+161
-9
lines changed

3 files changed

+161
-9
lines changed

client/resource_group/controller/global_controller.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -490,9 +490,8 @@ func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name str
490490
}
491491

492492
func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) {
493-
if state, ok := c.requestSourceStates.Load(name); ok {
494-
state.(*requestSourceMetricsState).cleanup()
495-
c.requestSourceStates.Delete(name)
493+
if v, loaded := c.requestSourceStates.LoadAndDelete(name); loaded {
494+
v.(*requestSourceMetricsState).cleanup()
496495
}
497496
}
498497

@@ -621,9 +620,8 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
621620
gc.mu.Unlock()
622621
if equalRU(latestConsumption, *gc.run.consumption) {
623622
if gc.inactive || gc.tombstone.Load() {
624-
gc.metrics.cleanupRequestSourceMetrics()
623+
c.cleanupRequestSourceMetricsState(resourceGroupName)
625624
c.groupsController.Delete(resourceGroupName)
626-
c.requestSourceStates.Delete(resourceGroupName)
627625
metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
628626
return true
629627
}

client/resource_group/controller/group_controller.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,6 @@ func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consu
208208
}
209209
}
210210

211-
func (mc *groupMetricsCollection) cleanupRequestSourceMetrics() {
212-
mc.sourceState.cleanup()
213-
}
214-
215211
type tokenCounter struct {
216212
fillRate uint64
217213

client/resource_group/controller/request_source_metrics_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,3 +396,161 @@ func TestRevivedResourceGroupCleanupRemovesExistingRequestSourceMetrics(t *testi
396396
re.False(ok)
397397
re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter))
398398
}
399+
400+
// TestGetOrCreateAfterCleanupReturnsFreshState verifies the fix for the race
401+
// rleungx identified: after cleanupRequestSourceMetricsState runs,
402+
// getOrCreateRequestSourceMetricsState must return a fresh, non-closed state
403+
// so that a newly created gc can record metrics normally.
404+
func TestGetOrCreateAfterCleanupReturnsFreshState(t *testing.T) {
405+
re := require.New(t)
406+
ctx, cancel := context.WithCancel(context.Background())
407+
defer cancel()
408+
409+
mockProvider := newMockResourceGroupProvider()
410+
controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0)
411+
re.NoError(err)
412+
413+
group := &rmpb.ResourceGroup{
414+
Name: "request-source-create-after-cleanup",
415+
Mode: rmpb.GroupMode_RUMode,
416+
RUSettings: &rmpb.GroupRequestUnitSettings{
417+
RU: &rmpb.TokenBucket{
418+
Settings: &rmpb.TokenLimitSettings{FillRate: 1000000},
419+
},
420+
},
421+
}
422+
mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil)
423+
424+
// Phase 1: create a gc, record some metrics, then clean up.
425+
gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false)
426+
re.NoError(err)
427+
428+
req := &TestRequestInfo{
429+
isWrite: true,
430+
writeBytes: 64,
431+
numReplicas: 1,
432+
storeID: 1,
433+
requestSource: "internal_create_after_cleanup",
434+
}
435+
resp := &TestResponseInfo{readBytes: 64, succeed: true}
436+
437+
_, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req)
438+
re.NoError(err)
439+
_, err = gc.onResponseImpl(req, resp)
440+
re.NoError(err)
441+
442+
gc.mu.Lock()
443+
*gc.run.consumption = *gc.mu.consumption
444+
gc.mu.Unlock()
445+
gc.inactive = true
446+
controller.cleanUpResourceGroup()
447+
448+
_, loaded := controller.loadGroupController(group.Name)
449+
re.False(loaded)
450+
451+
// Phase 2: getOrCreateRequestSourceMetricsState must return a fresh,
452+
// non-closed state after cleanup has removed the old one.
453+
state := controller.getOrCreateRequestSourceMetricsState(group.Name)
454+
re.NotNil(state)
455+
456+
state.mu.RLock()
457+
re.False(state.closed)
458+
state.mu.RUnlock()
459+
460+
// Phase 3: create a new gc with this state and verify it can record metrics.
461+
newGC, err := newGroupCostController(
462+
group,
463+
controller.ruConfig,
464+
controller.lowTokenNotifyChan,
465+
controller.tokenBucketUpdateChan,
466+
state,
467+
)
468+
re.NoError(err)
469+
470+
beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter)
471+
_, _, _, _, err = newGC.onRequestWaitImpl(context.Background(), req)
472+
re.NoError(err)
473+
_, err = newGC.onResponseImpl(req, resp)
474+
re.NoError(err)
475+
476+
sourceMetrics, cacheSize := requestSourceStateSnapshot(t, newGC, req.requestSource)
477+
re.Equal(1, cacheSize)
478+
re.NotNil(sourceMetrics)
479+
re.Greater(counterValue(t, sourceMetrics.wru), float64(0))
480+
re.Greater(counterValue(t, sourceMetrics.rru), float64(0))
481+
re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter))
482+
483+
// Cleanup for this test.
484+
state.cleanup()
485+
}
486+
487+
// TestCleanupThenRecreateViaFullPath exercises the full end-to-end path:
488+
// cleanup a group, then tryGetResourceGroupController re-creates it, and
489+
// the new gc records metrics successfully.
490+
func TestCleanupThenRecreateViaFullPath(t *testing.T) {
491+
re := require.New(t)
492+
ctx, cancel := context.WithCancel(context.Background())
493+
defer cancel()
494+
495+
mockProvider := newMockResourceGroupProvider()
496+
controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0)
497+
re.NoError(err)
498+
499+
group := &rmpb.ResourceGroup{
500+
Name: "request-source-full-recreate",
501+
Mode: rmpb.GroupMode_RUMode,
502+
RUSettings: &rmpb.GroupRequestUnitSettings{
503+
RU: &rmpb.TokenBucket{
504+
Settings: &rmpb.TokenLimitSettings{FillRate: 1000000},
505+
},
506+
},
507+
}
508+
mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil)
509+
510+
req := &TestRequestInfo{
511+
isWrite: true,
512+
writeBytes: 64,
513+
numReplicas: 1,
514+
storeID: 1,
515+
requestSource: "internal_full_recreate",
516+
}
517+
resp := &TestResponseInfo{readBytes: 64, succeed: true}
518+
519+
// Create, use, and clean up the group.
520+
gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false)
521+
re.NoError(err)
522+
_, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req)
523+
re.NoError(err)
524+
_, err = gc.onResponseImpl(req, resp)
525+
re.NoError(err)
526+
527+
gc.mu.Lock()
528+
*gc.run.consumption = *gc.mu.consumption
529+
gc.mu.Unlock()
530+
gc.inactive = true
531+
controller.cleanUpResourceGroup()
532+
533+
_, loaded := controller.loadGroupController(group.Name)
534+
re.False(loaded)
535+
536+
// Re-create through the normal path (simulates a new request arriving
537+
// after the group was cleaned up).
538+
gc2, err := controller.tryGetResourceGroupController(ctx, group.Name, false)
539+
re.NoError(err)
540+
541+
beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter)
542+
_, _, _, _, err = gc2.onRequestWaitImpl(context.Background(), req)
543+
re.NoError(err)
544+
_, err = gc2.onResponseImpl(req, resp)
545+
re.NoError(err)
546+
547+
sourceMetrics, cacheSize := requestSourceStateSnapshot(t, gc2, req.requestSource)
548+
re.Equal(1, cacheSize)
549+
re.NotNil(sourceMetrics)
550+
re.Greater(counterValue(t, sourceMetrics.wru), float64(0))
551+
re.Greater(counterValue(t, sourceMetrics.rru), float64(0))
552+
re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter))
553+
554+
// Cleanup for this test.
555+
gc2.metrics.sourceState.cleanup()
556+
}

0 commit comments

Comments
 (0)