Skip to content

Commit 3fe18d6

Browse files
committed
client/resource_group: keep request-source metrics state per group
Signed-off-by: Yuhao Zhang <yhzhang00@outlook.com>
1 parent 492976a commit 3fe18d6

File tree

4 files changed

+387
-50
lines changed

4 files changed

+387
-50
lines changed

client/resource_group/controller/global_controller.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,12 @@ var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)
142142

143143
// ResourceGroupsController implements ResourceGroupKVInterceptor.
144144
type ResourceGroupsController struct {
145-
clientUniqueID uint64
146-
provider ResourceGroupProvider
147-
groupsController sync.Map
148-
ruConfig *RUConfig
149-
keyspaceID uint32
145+
clientUniqueID uint64
146+
provider ResourceGroupProvider
147+
groupsController sync.Map
148+
requestSourceStates sync.Map
149+
ruConfig *RUConfig
150+
keyspaceID uint32
150151

151152
loopCtx context.Context
152153
loopCancel func()
@@ -388,7 +389,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
388389
continue
389390
}
390391
// If the resource group is marked as tombstone before, re-create the resource group controller.
391-
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
392+
newGC, err := newGroupCostController(
393+
group,
394+
c.ruConfig,
395+
c.lowTokenNotifyChan,
396+
c.tokenBucketUpdateChan,
397+
c.getOrCreateRequestSourceMetricsState(name),
398+
)
392399
if err != nil {
393400
log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed",
394401
zap.String("name", name), zap.Error(err))
@@ -473,6 +480,22 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g
473480
return tmp.(*groupCostController), loaded
474481
}
475482

483+
func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name string) *requestSourceMetricsState {
484+
if state, ok := c.requestSourceStates.Load(name); ok {
485+
return state.(*requestSourceMetricsState)
486+
}
487+
state := newRequestSourceMetricsState(name)
488+
actual, _ := c.requestSourceStates.LoadOrStore(name, state)
489+
return actual.(*requestSourceMetricsState)
490+
}
491+
492+
func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) {
493+
if state, ok := c.requestSourceStates.Load(name); ok {
494+
state.(*requestSourceMetricsState).cleanup()
495+
c.requestSourceStates.Delete(name)
496+
}
497+
}
498+
476499
// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist.
477500
// It's exported for testing.
478501
func NewResourceGroupNotExistErr(name string) error {
@@ -522,7 +545,13 @@ func (c *ResourceGroupsController) tryGetResourceGroupController(
522545
return gc, nil
523546
}
524547
// Initialize the resource group controller.
525-
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
548+
gc, err = newGroupCostController(
549+
group,
550+
c.ruConfig,
551+
c.lowTokenNotifyChan,
552+
c.tokenBucketUpdateChan,
553+
c.getOrCreateRequestSourceMetricsState(name),
554+
)
526555
if err != nil {
527556
return nil, err
528557
}
@@ -554,15 +583,23 @@ func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
554583
log.Warn("[resource group controller] get default resource group meta for tombstone failed",
555584
zap.String("name", name), zap.Error(err))
556585
// Directly delete the resource group controller if the default group is not available.
586+
c.cleanupRequestSourceMetricsState(name)
557587
c.groupsController.Delete(name)
558588
return
559589
}
560590
// Create a default resource group controller for the tombstone resource group independently.
561-
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
591+
gc, err := newGroupCostController(
592+
defaultGC.getMeta(),
593+
c.ruConfig,
594+
c.lowTokenNotifyChan,
595+
c.tokenBucketUpdateChan,
596+
c.getOrCreateRequestSourceMetricsState(name),
597+
)
562598
if err != nil {
563599
log.Warn("[resource group controller] create default resource group cost controller for tombstone failed",
564600
zap.String("name", name), zap.Error(err))
565601
// Directly delete the resource group controller if the default group controller cannot be created.
602+
c.cleanupRequestSourceMetricsState(name)
566603
c.groupsController.Delete(name)
567604
return
568605
}
@@ -584,8 +621,9 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
584621
gc.mu.Unlock()
585622
if equalRU(latestConsumption, *gc.run.consumption) {
586623
if gc.inactive || gc.tombstone.Load() {
587-
gc.metrics.cleanupRequestSourceMetrics(resourceGroupName)
624+
gc.metrics.cleanupRequestSourceMetrics()
588625
c.groupsController.Delete(resourceGroupName)
626+
c.requestSourceStates.Delete(resourceGroupName)
589627
metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
590628
return true
591629
}

client/resource_group/controller/group_controller.go

Lines changed: 66 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,43 @@ type groupMetricsCollection struct {
106106
tokenRequestCounter prometheus.Counter
107107
runningKVRequestCounter prometheus.Gauge
108108
consumeTokenHistogram prometheus.Observer
109-
sourceMetricsMu sync.RWMutex
110-
sourceMetrics map[string]*requestSourceMetrics
109+
sourceState *requestSourceMetricsState
111110
}
112111

113112
type requestSourceMetrics struct {
114113
rru prometheus.Counter
115114
wru prometheus.Counter
116115
}
117116

118-
func initMetrics(oldName, name string) *groupMetricsCollection {
117+
type requestSourceMetricsState struct {
118+
resourceGroupName string
119+
mu sync.RWMutex
120+
closed bool
121+
items map[string]*requestSourceMetrics
122+
}
123+
124+
func newRequestSourceMetricsState(resourceGroupName string) *requestSourceMetricsState {
125+
return &requestSourceMetricsState{
126+
resourceGroupName: resourceGroupName,
127+
items: make(map[string]*requestSourceMetrics),
128+
}
129+
}
130+
131+
func (s *requestSourceMetricsState) cleanup() {
132+
if s == nil {
133+
return
134+
}
135+
s.mu.Lock()
136+
defer s.mu.Unlock()
137+
s.closed = true
138+
for requestSource := range s.items {
139+
metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "rru")
140+
metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "wru")
141+
delete(s.items, requestSource)
142+
}
143+
}
144+
145+
func initMetrics(oldName, name string, sourceState *requestSourceMetricsState) *groupMetricsCollection {
119146
const (
120147
otherType = "others"
121148
throttledType = "throttled"
@@ -129,37 +156,50 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
129156
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
130157
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
131158
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),
132-
sourceMetrics: make(map[string]*requestSourceMetrics),
159+
sourceState: sourceState,
133160
}
134161
}
135162

136-
func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(resourceGroupName, requestSource string) *requestSourceMetrics {
137-
mc.sourceMetricsMu.RLock()
138-
sourceMetrics, ok := mc.sourceMetrics[requestSource]
139-
mc.sourceMetricsMu.RUnlock()
163+
func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(requestSource string) *requestSourceMetrics {
164+
if mc.sourceState == nil {
165+
return nil
166+
}
167+
mc.sourceState.mu.RLock()
168+
sourceMetrics, ok := mc.sourceState.items[requestSource]
169+
closed := mc.sourceState.closed
170+
mc.sourceState.mu.RUnlock()
140171
if ok {
141172
return sourceMetrics
142173
}
174+
if closed {
175+
return nil
176+
}
143177

144-
mc.sourceMetricsMu.Lock()
145-
defer mc.sourceMetricsMu.Unlock()
146-
sourceMetrics, ok = mc.sourceMetrics[requestSource]
178+
mc.sourceState.mu.Lock()
179+
defer mc.sourceState.mu.Unlock()
180+
if mc.sourceState.closed {
181+
return nil
182+
}
183+
sourceMetrics, ok = mc.sourceState.items[requestSource]
147184
if ok {
148185
return sourceMetrics
149186
}
150187
sourceMetrics = &requestSourceMetrics{
151-
rru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "rru"),
152-
wru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "wru"),
188+
rru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "rru"),
189+
wru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "wru"),
153190
}
154-
mc.sourceMetrics[requestSource] = sourceMetrics
191+
mc.sourceState.items[requestSource] = sourceMetrics
155192
return sourceMetrics
156193
}
157194

158-
func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestSource string, consumption *rmpb.Consumption) {
195+
func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consumption *rmpb.Consumption) {
159196
if consumption == nil {
160197
return
161198
}
162-
sourceMetrics := mc.getOrCreateRequestSourceMetrics(resourceGroupName, requestSource)
199+
sourceMetrics := mc.getOrCreateRequestSourceMetrics(requestSource)
200+
if sourceMetrics == nil {
201+
return
202+
}
163203
if consumption.RRU > 0 {
164204
sourceMetrics.rru.Add(consumption.RRU)
165205
}
@@ -168,14 +208,8 @@ func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestS
168208
}
169209
}
170210

171-
func (mc *groupMetricsCollection) cleanupRequestSourceMetrics(resourceGroupName string) {
172-
mc.sourceMetricsMu.Lock()
173-
defer mc.sourceMetricsMu.Unlock()
174-
for requestSource := range mc.sourceMetrics {
175-
metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "rru")
176-
metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "wru")
177-
delete(mc.sourceMetrics, requestSource)
178-
}
211+
func (mc *groupMetricsCollection) cleanupRequestSourceMetrics() {
212+
mc.sourceState.cleanup()
179213
}
180214

181215
type tokenCounter struct {
@@ -209,6 +243,7 @@ func newGroupCostController(
209243
mainCfg *RUConfig,
210244
lowRUNotifyChan chan notifyMsg,
211245
tokenBucketUpdateChan chan *groupCostController,
246+
sourceState *requestSourceMetricsState,
212247
) (*groupCostController, error) {
213248
switch group.Mode {
214249
case rmpb.GroupMode_RUMode:
@@ -218,7 +253,10 @@ func newGroupCostController(
218253
default:
219254
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
220255
}
221-
ms := initMetrics(group.Name, group.Name)
256+
if sourceState == nil {
257+
sourceState = newRequestSourceMetricsState(group.Name)
258+
}
259+
ms := initMetrics(group.Name, group.Name, sourceState)
222260
gc := &groupCostController{
223261
meta: group,
224262
name: group.Name,
@@ -630,7 +668,7 @@ func (gc *groupCostController) onRequestWaitImpl(
630668
waitDuration += d
631669
}
632670

633-
gc.metrics.addRequestSourceRU(gc.name, info.RequestSource(), delta)
671+
gc.metrics.addRequestSourceRU(info.RequestSource(), delta)
634672

635673
gc.mu.Lock()
636674
// Calculate the penalty of the store
@@ -677,7 +715,7 @@ func (gc *groupCostController) onResponseImpl(
677715
add(gc.mu.globalCounter, count)
678716
gc.mu.Unlock()
679717

680-
gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta)
718+
gc.metrics.addRequestSourceRU(req.RequestSource(), delta)
681719

682720
return delta, nil
683721
}
@@ -720,7 +758,7 @@ func (gc *groupCostController) onResponseWaitImpl(
720758
add(gc.mu.globalCounter, count)
721759
gc.mu.Unlock()
722760

723-
gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta)
761+
gc.metrics.addRequestSourceRU(req.RequestSource(), delta)
724762

725763
return delta, waitDuration, nil
726764
}

client/resource_group/controller/group_controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
4545
}
4646
ch1 := make(chan notifyMsg)
4747
ch2 := make(chan *groupCostController)
48-
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
48+
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, nil)
4949
re.NoError(err)
5050
return gc
5151
}
@@ -239,7 +239,7 @@ func TestAcquireTokensSignalAwareWait(t *testing.T) {
239239
cfg := DefaultRUConfig()
240240
cfg.WaitRetryInterval = 5 * time.Second
241241
cfg.WaitRetryTimes = 3
242-
gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1))
242+
gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), nil)
243243
re.NoError(err)
244244

245245
// Set fillRate=0 so reservation always fails with InfDuration,

0 commit comments

Comments
 (0)