Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions client/resource_group/controller/global_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)

// ResourceGroupsController implements ResourceGroupKVInterceptor.
type ResourceGroupsController struct {
clientUniqueID uint64
provider ResourceGroupProvider
groupsController sync.Map
ruConfig *RUConfig
keyspaceID uint32
clientUniqueID uint64
provider ResourceGroupProvider
groupsController sync.Map
requestSourceStates sync.Map
ruConfig *RUConfig
keyspaceID uint32

loopCtx context.Context
loopCancel func()
Expand Down Expand Up @@ -337,6 +338,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
/* channels */
case <-c.loopCtx.Done():
metrics.ResourceGroupStatusGauge.Reset()
metrics.RequestSourceRUCounter.Reset()
return
case <-c.responseDeadlineCh:
c.run.inDegradedMode.Store(true)
Expand Down Expand Up @@ -387,7 +389,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
continue
}
// If the resource group is marked as tombstone before, re-create the resource group controller.
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
newGC, err := newGroupCostController(
group,
c.ruConfig,
c.lowTokenNotifyChan,
c.tokenBucketUpdateChan,
c.getOrCreateRequestSourceMetricsState(name),
)
if err != nil {
log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
Expand Down Expand Up @@ -472,6 +480,22 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g
return tmp.(*groupCostController), loaded
}

func (c *ResourceGroupsController) getOrCreateRequestSourceMetricsState(name string) *requestSourceMetricsState {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a race between create and cleanup

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed by switching to LoadAndDelete in cleanupRequestSourceMetricsState. Once LoadAndDelete succeeds, the entry is gone from the map, so getOrCreateRequestSourceMetricsState will never observe a closed state, and the next Load/LoadOrStore creates a fresh one. PTAL

if state, ok := c.requestSourceStates.Load(name); ok {
return state.(*requestSourceMetricsState)
}
state := newRequestSourceMetricsState(name)
actual, _ := c.requestSourceStates.LoadOrStore(name, state)
return actual.(*requestSourceMetricsState)
}

func (c *ResourceGroupsController) cleanupRequestSourceMetricsState(name string) {
if state, ok := c.requestSourceStates.Load(name); ok {
state.(*requestSourceMetricsState).cleanup()
c.requestSourceStates.Delete(name)
}
}

// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist.
// It's exported for testing.
func NewResourceGroupNotExistErr(name string) error {
Expand Down Expand Up @@ -521,7 +545,13 @@ func (c *ResourceGroupsController) tryGetResourceGroupController(
return gc, nil
}
// Initialize the resource group controller.
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(
group,
c.ruConfig,
c.lowTokenNotifyChan,
c.tokenBucketUpdateChan,
c.getOrCreateRequestSourceMetricsState(name),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -553,15 +583,23 @@ func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
log.Warn("[resource group controller] get default resource group meta for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group is not available.
c.cleanupRequestSourceMetricsState(name)
c.groupsController.Delete(name)
return
}
// Create a default resource group controller for the tombstone resource group independently.
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err := newGroupCostController(
defaultGC.getMeta(),
c.ruConfig,
c.lowTokenNotifyChan,
c.tokenBucketUpdateChan,
c.getOrCreateRequestSourceMetricsState(name),
)
if err != nil {
log.Warn("[resource group controller] create default resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group controller cannot be created.
c.cleanupRequestSourceMetricsState(name)
c.groupsController.Delete(name)
return
}
Expand All @@ -583,7 +621,9 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
gc.mu.Unlock()
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.inactive || gc.tombstone.Load() {
gc.metrics.cleanupRequestSourceMetrics()
c.groupsController.Delete(resourceGroupName)
c.requestSourceStates.Delete(resourceGroupName)
metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
return true
}
Expand Down
101 changes: 99 additions & 2 deletions client/resource_group/controller/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,43 @@ type groupMetricsCollection struct {
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer
sourceState *requestSourceMetricsState
}

func initMetrics(oldName, name string) *groupMetricsCollection {
type requestSourceMetrics struct {
rru prometheus.Counter
wru prometheus.Counter
}

type requestSourceMetricsState struct {
resourceGroupName string
mu sync.RWMutex
closed bool
items map[string]*requestSourceMetrics
}

func newRequestSourceMetricsState(resourceGroupName string) *requestSourceMetricsState {
return &requestSourceMetricsState{
resourceGroupName: resourceGroupName,
items: make(map[string]*requestSourceMetrics),
}
}

func (s *requestSourceMetricsState) cleanup() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
for requestSource := range s.items {
metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "rru")
metrics.RequestSourceRUCounter.DeleteLabelValues(s.resourceGroupName, requestSource, "wru")
delete(s.items, requestSource)
}
}

func initMetrics(oldName, name string, sourceState *requestSourceMetricsState) *groupMetricsCollection {
const (
otherType = "others"
throttledType = "throttled"
Expand All @@ -122,7 +156,60 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),
sourceState: sourceState,
}
}

func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(requestSource string) *requestSourceMetrics {
if mc.sourceState == nil {
return nil
}
mc.sourceState.mu.RLock()
sourceMetrics, ok := mc.sourceState.items[requestSource]
closed := mc.sourceState.closed
mc.sourceState.mu.RUnlock()
if ok {
return sourceMetrics
}
if closed {
return nil
}

mc.sourceState.mu.Lock()
defer mc.sourceState.mu.Unlock()
if mc.sourceState.closed {
return nil
}
sourceMetrics, ok = mc.sourceState.items[requestSource]
if ok {
return sourceMetrics
}
sourceMetrics = &requestSourceMetrics{
rru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "rru"),
wru: metrics.RequestSourceRUCounter.WithLabelValues(mc.sourceState.resourceGroupName, requestSource, "wru"),
}
mc.sourceState.items[requestSource] = sourceMetrics
return sourceMetrics
}

func (mc *groupMetricsCollection) addRequestSourceRU(requestSource string, consumption *rmpb.Consumption) {
if consumption == nil {
return
}
sourceMetrics := mc.getOrCreateRequestSourceMetrics(requestSource)
if sourceMetrics == nil {
return
}
if consumption.RRU > 0 {
sourceMetrics.rru.Add(consumption.RRU)
}
if consumption.WRU > 0 {
sourceMetrics.wru.Add(consumption.WRU)
}
}

func (mc *groupMetricsCollection) cleanupRequestSourceMetrics() {
mc.sourceState.cleanup()
}

type tokenCounter struct {
Expand Down Expand Up @@ -156,6 +243,7 @@ func newGroupCostController(
mainCfg *RUConfig,
lowRUNotifyChan chan notifyMsg,
tokenBucketUpdateChan chan *groupCostController,
sourceState *requestSourceMetricsState,
) (*groupCostController, error) {
switch group.Mode {
case rmpb.GroupMode_RUMode:
Expand All @@ -165,7 +253,10 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
if sourceState == nil {
sourceState = newRequestSourceMetricsState(group.Name)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sourceState created here is not registered in the requestSourceStates map, which means it will never be cleaned up by cleanupRequestSourceMetricsState. Currently, only tests pass nil; in production, the path always passes a non-nil value through getOrCreateRequestSourceMetricsState. I suggest adding a comment clarifying that this fallback is for test scenarios only, to avoid future misuse that could lead to metrics leaks.

}
ms := initMetrics(group.Name, group.Name, sourceState)
gc := &groupCostController{
meta: group,
name: group.Name,
Expand Down Expand Up @@ -577,6 +668,8 @@ func (gc *groupCostController) onRequestWaitImpl(
waitDuration += d
}

gc.metrics.addRequestSourceRU(info.RequestSource(), delta)

gc.mu.Lock()
// Calculate the penalty of the store
penalty = &rmpb.Consumption{}
Expand Down Expand Up @@ -622,6 +715,8 @@ func (gc *groupCostController) onResponseImpl(
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

gc.metrics.addRequestSourceRU(req.RequestSource(), delta)

return delta, nil
}

Expand Down Expand Up @@ -663,6 +758,8 @@ func (gc *groupCostController) onResponseWaitImpl(
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

gc.metrics.addRequestSourceRU(req.RequestSource(), delta)

return delta, waitDuration, nil
}

Expand Down
4 changes: 2 additions & 2 deletions client/resource_group/controller/group_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
}
ch1 := make(chan notifyMsg)
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2, nil)
re.NoError(err)
return gc
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestAcquireTokensSignalAwareWait(t *testing.T) {
cfg := DefaultRUConfig()
cfg.WaitRetryInterval = 5 * time.Second
cfg.WaitRetryTimes = 3
gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1))
gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1), nil)
re.NoError(err)

// Set fillRate=0 so reservation always fails with InfDuration,
Expand Down
12 changes: 12 additions & 0 deletions client/resource_group/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
LowTokenRequestNotifyCounter *prometheus.CounterVec
// TokenConsumedHistogram comments placeholder
TokenConsumedHistogram *prometheus.HistogramVec
// RequestSourceRUCounter comments placeholder
RequestSourceRUCounter *prometheus.CounterVec
// FailedTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time.
FailedTokenRequestDuration prometheus.Observer
// SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time.
Expand Down Expand Up @@ -153,6 +155,15 @@ func initMetrics(constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{newResourceGroupNameLabel})

RequestSourceRUCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "ru_total",
Help: "Counter of request RU consumption grouped by resource group and request source.",
ConstLabels: constLabels,
}, []string{newResourceGroupNameLabel, "request_source", errType})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although errType is also a "type," the semantics here are just a plain type and not related to err, right? It would be better to give errType a more generic name.


// WithLabelValues is a heavy operation, define variable to avoid call it every time.
FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail")
SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success")
Expand All @@ -171,4 +182,5 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) {
prometheus.MustRegister(ResourceGroupTokenRequestCounter)
prometheus.MustRegister(LowTokenRequestNotifyCounter)
prometheus.MustRegister(TokenConsumedHistogram)
prometheus.MustRegister(RequestSourceRUCounter)
}
1 change: 1 addition & 0 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RequestInfo interface {
StoreID() uint64
RequestSize() uint64
AccessLocationType() AccessLocationType
RequestSource() string
}

// ResponseInfo is the interface of the response information provider. A response should be
Expand Down
Loading
Loading