Skip to content

Commit daef237

Browse files
committed
client: reload RM snapshot before watch retry
Signed-off-by: bufferflies <1045931706@qq.com>
1 parent 4ad4e84 commit daef237

File tree

2 files changed

+117
-3
lines changed

2 files changed

+117
-3
lines changed

client/resource_group/controller/global_controller.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
357357
}
358358
case <-watchRetryTimer.C:
359359
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
360-
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
361-
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
362-
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithPrefix(), opt.WithPrevKV())
360+
watchMetaChannel, err = c.reloadResourceGroupMetaWatch(ctx)
363361
if err != nil {
364362
log.Warn("watch resource group meta failed", zap.Error(err))
365363
watchRetryTimer.Reset(watchRetryInterval)
@@ -491,6 +489,57 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
491489
}()
492490
}
493491

492+
func (c *ResourceGroupsController) reloadResourceGroupMetaWatch(
493+
ctx context.Context,
494+
) (chan []*meta_storagepb.Event, error) {
495+
groups, revision, err := c.provider.LoadResourceGroups(ctx)
496+
if err != nil {
497+
return nil, err
498+
}
499+
c.syncResourceGroupSnapshot(groups)
500+
// Start from the next revision after the freshly loaded snapshot so reconnects
501+
// keep the cache in sync without replaying the snapshot itself as watch events.
502+
return c.provider.Watch(
503+
ctx,
504+
pd.GroupSettingsPathPrefixBytes(c.keyspaceID),
505+
opt.WithRev(revision+1),
506+
opt.WithPrefix(),
507+
opt.WithPrevKV(),
508+
)
509+
}
510+
511+
func (c *ResourceGroupsController) syncResourceGroupSnapshot(groups []*rmpb.ResourceGroup) {
512+
groupMap := make(map[string]*rmpb.ResourceGroup, len(groups))
513+
for _, group := range groups {
514+
groupMap[group.GetName()] = group
515+
}
516+
517+
c.groupsController.Range(func(key, value any) bool {
518+
name := key.(string)
519+
group, exists := groupMap[name]
520+
if !exists {
521+
c.tombstoneGroupCostController(name)
522+
return true
523+
}
524+
gc := value.(*groupCostController)
525+
if !gc.tombstone.Load() {
526+
gc.modifyMeta(group)
527+
return true
528+
}
529+
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
530+
if err != nil {
531+
log.Warn("[resource group controller] re-create resource group cost controller from snapshot failed",
532+
zap.String("name", name), zap.Error(err))
533+
return true
534+
}
535+
if c.groupsController.CompareAndSwap(name, gc, newGC) {
536+
log.Info("[resource group controller] re-create resource group cost controller from snapshot",
537+
zap.String("name", name))
538+
}
539+
return true
540+
})
541+
}
542+
494543
// Stop stops ResourceGroupController service.
495544
func (c *ResourceGroupsController) Stop() error {
496545
if c.loopCancel == nil {

client/resource_group/controller/global_controller_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,71 @@ func TestGetResourceGroup(t *testing.T) {
381381
re.Nil(gc02)
382382
}
383383

384+
func TestReloadResourceGroupMetaWatch(t *testing.T) {
385+
re := require.New(t)
386+
ctx, cancel := context.WithCancel(context.Background())
387+
defer cancel()
388+
389+
mockProvider := &MockResourceGroupProvider{}
390+
mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
391+
controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
392+
re.NoError(err)
393+
394+
defaultGroup := &rmpb.ResourceGroup{
395+
Name: defaultResourceGroupName,
396+
Mode: rmpb.GroupMode_RUMode,
397+
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}},
398+
}
399+
staleGroup := &rmpb.ResourceGroup{
400+
Name: "stale-group",
401+
Mode: rmpb.GroupMode_RUMode,
402+
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}}},
403+
}
404+
updatedDefaultGroup := &rmpb.ResourceGroup{
405+
Name: defaultResourceGroupName,
406+
Mode: rmpb.GroupMode_RUMode,
407+
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 2000}}},
408+
}
409+
410+
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil)
411+
mockProvider.On("GetResourceGroup", mock.Anything, "stale-group", mock.Anything).Return(staleGroup, nil)
412+
413+
defaultGC, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
414+
re.NoError(err)
415+
re.NotNil(defaultGC)
416+
417+
staleGC, err := controller.tryGetResourceGroupController(ctx, "stale-group", false)
418+
re.NoError(err)
419+
re.NotNil(staleGC)
420+
re.False(staleGC.tombstone.Load())
421+
422+
watchCh := make(chan []*meta_storagepb.Event)
423+
mockProvider.On("LoadResourceGroups", mock.Anything).
424+
Return([]*rmpb.ResourceGroup{updatedDefaultGroup}, int64(88), nil).
425+
Once()
426+
mockProvider.On("Watch", mock.Anything, pd.GroupSettingsPathPrefixBytes(constants.NullKeyspaceID), mock.Anything).
427+
Run(func(args mock.Arguments) {
428+
opts := args.Get(2).([]opt.MetaStorageOption)
429+
metaOp := &opt.MetaStorageOp{}
430+
for _, apply := range opts {
431+
apply(metaOp)
432+
}
433+
re.Equal(int64(89), metaOp.Revision)
434+
re.True(metaOp.IsOptsWithPrefix)
435+
re.True(metaOp.PrevKv)
436+
}).
437+
Return(watchCh, nil).
438+
Once()
439+
440+
reloadedWatchCh, err := controller.reloadResourceGroupMetaWatch(ctx)
441+
re.NoError(err)
442+
re.Equal(watchCh, reloadedWatchCh)
443+
re.Equal(updatedDefaultGroup, defaultGC.getMeta())
444+
staleGC, err = controller.tryGetResourceGroupController(ctx, "stale-group", true)
445+
re.NoError(err)
446+
re.True(staleGC.tombstone.Load())
447+
}
448+
384449
func TestTokenBucketsRequestWithKeyspaceID(t *testing.T) {
385450
re := require.New(t)
386451
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)