Skip to content

Commit 1dd52fd

Browse files
committed
client/resource_group: cache request source RU metrics
1 parent ec48604 commit 1dd52fd

File tree

6 files changed

+222
-5
lines changed

6 files changed

+222
-5
lines changed

client/resource_group/controller/global_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
337337
/* channels */
338338
case <-c.loopCtx.Done():
339339
metrics.ResourceGroupStatusGauge.Reset()
340+
metrics.RequestSourceRUCounter.Reset()
340341
return
341342
case <-c.responseDeadlineCh:
342343
c.run.inDegradedMode.Store(true)
@@ -583,6 +584,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
583584
gc.mu.Unlock()
584585
if equalRU(latestConsumption, *gc.run.consumption) {
585586
if gc.inactive || gc.tombstone.Load() {
587+
gc.metrics.cleanupRequestSourceMetrics(resourceGroupName)
586588
c.groupsController.Delete(resourceGroupName)
587589
metrics.ResourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
588590
return true

client/resource_group/controller/group_controller.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ type groupMetricsCollection struct {
106106
tokenRequestCounter prometheus.Counter
107107
runningKVRequestCounter prometheus.Gauge
108108
consumeTokenHistogram prometheus.Observer
109+
sourceMetricsMu sync.RWMutex
110+
sourceMetrics map[string]*requestSourceMetrics
111+
}
112+
113+
type requestSourceMetrics struct {
114+
rru prometheus.Counter
115+
wru prometheus.Counter
109116
}
110117

111118
func initMetrics(oldName, name string) *groupMetricsCollection {
@@ -122,6 +129,52 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
122129
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
123130
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
124131
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),
132+
sourceMetrics: make(map[string]*requestSourceMetrics),
133+
}
134+
}
135+
136+
func (mc *groupMetricsCollection) getOrCreateRequestSourceMetrics(resourceGroupName, requestSource string) *requestSourceMetrics {
137+
mc.sourceMetricsMu.RLock()
138+
sourceMetrics, ok := mc.sourceMetrics[requestSource]
139+
mc.sourceMetricsMu.RUnlock()
140+
if ok {
141+
return sourceMetrics
142+
}
143+
144+
mc.sourceMetricsMu.Lock()
145+
defer mc.sourceMetricsMu.Unlock()
146+
sourceMetrics, ok = mc.sourceMetrics[requestSource]
147+
if ok {
148+
return sourceMetrics
149+
}
150+
sourceMetrics = &requestSourceMetrics{
151+
rru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "rru"),
152+
wru: metrics.RequestSourceRUCounter.WithLabelValues(resourceGroupName, requestSource, "wru"),
153+
}
154+
mc.sourceMetrics[requestSource] = sourceMetrics
155+
return sourceMetrics
156+
}
157+
158+
func (mc *groupMetricsCollection) addRequestSourceRU(resourceGroupName, requestSource string, consumption *rmpb.Consumption) {
159+
if consumption == nil {
160+
return
161+
}
162+
sourceMetrics := mc.getOrCreateRequestSourceMetrics(resourceGroupName, requestSource)
163+
if consumption.RRU > 0 {
164+
sourceMetrics.rru.Add(consumption.RRU)
165+
}
166+
if consumption.WRU > 0 {
167+
sourceMetrics.wru.Add(consumption.WRU)
168+
}
169+
}
170+
171+
func (mc *groupMetricsCollection) cleanupRequestSourceMetrics(resourceGroupName string) {
172+
mc.sourceMetricsMu.Lock()
173+
defer mc.sourceMetricsMu.Unlock()
174+
for requestSource := range mc.sourceMetrics {
175+
metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "rru")
176+
metrics.RequestSourceRUCounter.DeleteLabelValues(resourceGroupName, requestSource, "wru")
177+
delete(mc.sourceMetrics, requestSource)
125178
}
126179
}
127180

@@ -577,6 +630,8 @@ func (gc *groupCostController) onRequestWaitImpl(
577630
waitDuration += d
578631
}
579632

633+
gc.metrics.addRequestSourceRU(gc.name, info.RequestSource(), delta)
634+
580635
gc.mu.Lock()
581636
// Calculate the penalty of the store
582637
penalty = &rmpb.Consumption{}
@@ -622,6 +677,8 @@ func (gc *groupCostController) onResponseImpl(
622677
add(gc.mu.globalCounter, count)
623678
gc.mu.Unlock()
624679

680+
gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta)
681+
625682
return delta, nil
626683
}
627684

@@ -663,6 +720,8 @@ func (gc *groupCostController) onResponseWaitImpl(
663720
add(gc.mu.globalCounter, count)
664721
gc.mu.Unlock()
665722

723+
gc.metrics.addRequestSourceRU(gc.name, req.RequestSource(), delta)
724+
666725
return delta, waitDuration, nil
667726
}
668727

client/resource_group/controller/metrics/metrics.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ var (
4949
LowTokenRequestNotifyCounter *prometheus.CounterVec
5050
// TokenConsumedHistogram comments placeholder
5151
TokenConsumedHistogram *prometheus.HistogramVec
52+
// RequestSourceRUCounter comments placeholder
53+
RequestSourceRUCounter *prometheus.CounterVec
5254
// FailedTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time.
5355
FailedTokenRequestDuration prometheus.Observer
5456
// SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time.
@@ -153,6 +155,15 @@ func initMetrics(constLabels prometheus.Labels) {
153155
ConstLabels: constLabels,
154156
}, []string{newResourceGroupNameLabel})
155157

158+
RequestSourceRUCounter = prometheus.NewCounterVec(
159+
prometheus.CounterOpts{
160+
Namespace: namespace,
161+
Subsystem: requestSubsystem,
162+
Name: "ru_total",
163+
Help: "Counter of request RU consumption grouped by resource group and request source.",
164+
ConstLabels: constLabels,
165+
}, []string{newResourceGroupNameLabel, "request_source", errType})
166+
156167
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
157168
FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail")
158169
SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success")
@@ -171,4 +182,5 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) {
171182
prometheus.MustRegister(ResourceGroupTokenRequestCounter)
172183
prometheus.MustRegister(LowTokenRequestNotifyCounter)
173184
prometheus.MustRegister(TokenConsumedHistogram)
185+
prometheus.MustRegister(RequestSourceRUCounter)
174186
}

client/resource_group/controller/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type RequestInfo interface {
5252
StoreID() uint64
5353
RequestSize() uint64
5454
AccessLocationType() AccessLocationType
55+
RequestSource() string
5556
}
5657

5758
// ResponseInfo is the interface of the response information provider. A response should be
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
dto "github.com/prometheus/client_model/go"
9+
"github.com/stretchr/testify/mock"
10+
"github.com/stretchr/testify/require"
11+
12+
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
13+
14+
controllerMetrics "github.com/tikv/pd/client/resource_group/controller/metrics"
15+
)
16+
17+
func counterValue(t *testing.T, metric prometheus.Counter) float64 {
18+
t.Helper()
19+
pb := &dto.Metric{}
20+
require.NoError(t, metric.Write(pb))
21+
return pb.GetCounter().GetValue()
22+
}
23+
24+
func collectorMetricCount(collector prometheus.Collector) int {
25+
ch := make(chan prometheus.Metric, 8)
26+
go func() {
27+
collector.Collect(ch)
28+
close(ch)
29+
}()
30+
count := 0
31+
for range ch {
32+
count++
33+
}
34+
return count
35+
}
36+
37+
func TestRequestSourceMetricsCachedByResourceGroup(t *testing.T) {
38+
re := require.New(t)
39+
gc := createTestGroupCostController(re)
40+
req := &TestRequestInfo{
41+
isWrite: true,
42+
writeBytes: 100,
43+
numReplicas: 1,
44+
storeID: 1,
45+
accessType: AccessUnknown,
46+
requestSource: "internal_gc_test",
47+
}
48+
resp := &TestResponseInfo{
49+
readBytes: 128,
50+
succeed: true,
51+
}
52+
53+
beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter)
54+
55+
reqConsumption, _, _, _, err := gc.onRequestWaitImpl(context.Background(), req)
56+
re.NoError(err)
57+
respConsumption, err := gc.onResponseImpl(req, resp)
58+
re.NoError(err)
59+
re.NotZero(reqConsumption.WRU)
60+
re.NotZero(respConsumption.RRU)
61+
62+
gc.metrics.sourceMetricsMu.RLock()
63+
sourceMetrics := gc.metrics.sourceMetrics[req.requestSource]
64+
cacheSize := len(gc.metrics.sourceMetrics)
65+
gc.metrics.sourceMetricsMu.RUnlock()
66+
67+
re.Equal(1, cacheSize)
68+
re.NotNil(sourceMetrics)
69+
re.Equal(reqConsumption.WRU, counterValue(t, sourceMetrics.wru))
70+
re.Equal(respConsumption.RRU, counterValue(t, sourceMetrics.rru))
71+
re.Equal(beforeCount+2, collectorMetricCount(controllerMetrics.RequestSourceRUCounter))
72+
73+
_, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req)
74+
re.NoError(err)
75+
gc.metrics.sourceMetricsMu.RLock()
76+
re.Equal(1, len(gc.metrics.sourceMetrics))
77+
re.Same(sourceMetrics, gc.metrics.sourceMetrics[req.requestSource])
78+
gc.metrics.sourceMetricsMu.RUnlock()
79+
80+
controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "rru")
81+
controllerMetrics.RequestSourceRUCounter.DeleteLabelValues(gc.name, req.requestSource, "wru")
82+
}
83+
84+
func TestCleanupResourceGroupRemovesRequestSourceMetrics(t *testing.T) {
85+
re := require.New(t)
86+
ctx, cancel := context.WithCancel(context.Background())
87+
defer cancel()
88+
89+
mockProvider := newMockResourceGroupProvider()
90+
controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, 0)
91+
re.NoError(err)
92+
93+
group := &rmpb.ResourceGroup{
94+
Name: "request-source-cleanup",
95+
Mode: rmpb.GroupMode_RUMode,
96+
RUSettings: &rmpb.GroupRequestUnitSettings{
97+
RU: &rmpb.TokenBucket{
98+
Settings: &rmpb.TokenLimitSettings{FillRate: 1000000},
99+
},
100+
},
101+
}
102+
mockProvider.On("GetResourceGroup", mock.Anything, group.Name, mock.Anything).Return(group, nil)
103+
104+
gc, err := controller.tryGetResourceGroupController(ctx, group.Name, false)
105+
re.NoError(err)
106+
107+
req := &TestRequestInfo{
108+
isWrite: true,
109+
writeBytes: 64,
110+
numReplicas: 1,
111+
storeID: 1,
112+
requestSource: "internal_gc_cleanup",
113+
}
114+
resp := &TestResponseInfo{readBytes: 64, succeed: true}
115+
beforeCount := collectorMetricCount(controllerMetrics.RequestSourceRUCounter)
116+
117+
_, _, _, _, err = gc.onRequestWaitImpl(context.Background(), req)
118+
re.NoError(err)
119+
_, err = gc.onResponseImpl(req, resp)
120+
re.NoError(err)
121+
122+
gc.mu.Lock()
123+
*gc.run.consumption = *gc.mu.consumption
124+
gc.mu.Unlock()
125+
gc.inactive = true
126+
127+
re.Greater(collectorMetricCount(controllerMetrics.RequestSourceRUCounter), beforeCount)
128+
129+
controller.cleanUpResourceGroup()
130+
131+
_, ok := controller.loadGroupController(group.Name)
132+
re.False(ok)
133+
gc.metrics.sourceMetricsMu.RLock()
134+
re.Empty(gc.metrics.sourceMetrics)
135+
gc.metrics.sourceMetricsMu.RUnlock()
136+
re.Equal(beforeCount, collectorMetricCount(controllerMetrics.RequestSourceRUCounter))
137+
}

client/resource_group/controller/testutil.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import "time"
2222

2323
// TestRequestInfo is used to test the request info interface.
2424
type TestRequestInfo struct {
25-
isWrite bool
26-
writeBytes uint64
27-
numReplicas int64
28-
storeID uint64
29-
accessType AccessLocationType
25+
isWrite bool
26+
writeBytes uint64
27+
numReplicas int64
28+
storeID uint64
29+
accessType AccessLocationType
30+
requestSource string
3031
}
3132

3233
// NewTestRequestInfo creates a new TestRequestInfo.
@@ -70,6 +71,11 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType {
7071
return tri.accessType
7172
}
7273

74+
// RequestSource implements the RequestInfo interface.
75+
func (tri *TestRequestInfo) RequestSource() string {
76+
return tri.requestSource
77+
}
78+
7379
// TestResponseInfo is used to test the response info interface.
7480
type TestResponseInfo struct {
7581
readBytes uint64

0 commit comments

Comments
 (0)