Skip to content

Commit 0e7dfc2

Browse files
committed
ruv2: collect TiKV raw details in RUDetails
Signed-off-by: disksing <i@disksing.com>
1 parent 3805cb7 commit 0e7dfc2

File tree

7 files changed

+193
-14
lines changed

7 files changed

+193
-14
lines changed

internal/client/client.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,14 +343,17 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
343343
elapsed := time.Since(start)
344344
connPool.updateRPCMetrics(req, resp, elapsed)
345345
writeRPCCount := completedTiKVWriteRPCCount(req)
346+
readRPCCount, writeRPCDetailCount := completedTiKVRUV2RPCCount(req)
347+
if bypass {
348+
writeRPCCount = 0
349+
}
346350
if resp != nil {
347351
switch resp.Resp.(type) {
348352
case *tikvrpc.CopStreamResponse, *tikvrpc.BatchCopStreamResponse:
349353
// Stream responses are handled in Recv().
350354
default:
351-
if !bypass {
352-
config.UpdateTiKVRUV2FromExecDetailsV2(ctx, resp.GetExecDetailsV2(), writeRPCCount)
353-
}
355+
util.UpdateTiKVRUV2RawDetails(ctx, resp.GetExecDetailsV2(), readRPCCount, writeRPCDetailCount)
356+
config.UpdateTiKVRUV2FromExecDetailsV2(ctx, resp.GetExecDetailsV2(), writeRPCCount)
354357
}
355358
}
356359

@@ -414,6 +417,16 @@ func completedTiKVWriteRPCCount(req *tikvrpc.Request) float64 {
414417
return 0
415418
}
416419

420+
func completedTiKVRUV2RPCCount(req *tikvrpc.Request) (readRPCCount, writeRPCCount int64) {
421+
if req == nil || req.StoreTp != tikvrpc.TiKV || req.IsDebugReq() {
422+
return 0, 0
423+
}
424+
if req.IsTxnWriteRequest() || req.IsRawWriteRequest() {
425+
return 0, 1
426+
}
427+
return 1, 0
428+
}
429+
417430
// SendRequest sends a Request to server and receives Response.
418431
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
419432
// In unit test, the option or codec may be nil. Here should skip the encode/decode process.
@@ -452,6 +465,7 @@ func (c *RPCClient) getCopStreamResponse(ctx context.Context, client tikvpb.Tikv
452465
copStream.Cancel = cancel
453466
copStream.Ctx = ctx
454467
copStream.Bypass = resourcecontrol.MakeRequestInfo(req).Bypass()
468+
copStream.CountResourceManagerRPC = true
455469
connPool.streamTimeout <- &copStream.Lease
456470

457471
// Read the first streaming response to get CopStreamResponse.
@@ -487,6 +501,8 @@ func (c *RPCClient) getBatchCopStreamResponse(ctx context.Context, client tikvpb
487501
copStream := resp.Resp.(*tikvrpc.BatchCopStreamResponse)
488502
copStream.Timeout = timeout
489503
copStream.Cancel = cancel
504+
copStream.Ctx = ctx
505+
copStream.CountResourceManagerRPC = true
490506
connPool.streamTimeout <- &copStream.Lease
491507

492508
// Read the first streaming response to get CopStreamResponse.

internal/client/client_async.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,12 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
122122

123123
// rpc metrics
124124
connPool.updateRPCMetrics(req, resp, elapsed)
125-
if resp != nil && !resourcecontrol.MakeRequestInfo(req).Bypass() {
126-
config.UpdateTiKVRUV2FromExecDetailsV2(ctx, resp.GetExecDetailsV2(), writeRPCCount)
125+
if resp != nil {
126+
readRPCCount, writeRPCDetailCount := completedTiKVRUV2RPCCount(req)
127+
util.UpdateTiKVRUV2RawDetails(ctx, resp.GetExecDetailsV2(), readRPCCount, writeRPCDetailCount)
128+
if !resourcecontrol.MakeRequestInfo(req).Bypass() {
129+
config.UpdateTiKVRUV2FromExecDetailsV2(ctx, resp.GetExecDetailsV2(), writeRPCCount)
130+
}
127131
}
128132

129133
// tracing

internal/client/client_async_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ func TestSendRequestAsyncUpdateTiKVRUV2(t *testing.T) {
208208

209209
expected := (weights.ResourceManagerWriteCntTiKV + weights.TiKVKVEngineCacheMiss) * weights.RUScale
210210
require.InDelta(t, expected, ruDetails.TiKVRUV2(), 1e-9)
211+
require.Equal(t, int64(0), ruDetails.ResourceManagerReadCnt())
212+
require.Equal(t, int64(1), ruDetails.ResourceManagerWriteCnt())
211213

212214
bypassDetails := util.NewRUDetails()
213215
bypassCtx := context.WithValue(ctx, util.RUDetailsCtxKey, bypassDetails)
@@ -226,6 +228,8 @@ func TestSendRequestAsyncUpdateTiKVRUV2(t *testing.T) {
226228
rl.Exec(ctx)
227229
require.True(t, called)
228230
require.Zero(t, bypassDetails.TiKVRUV2())
231+
require.Equal(t, int64(0), bypassDetails.ResourceManagerReadCnt())
232+
require.Equal(t, int64(1), bypassDetails.ResourceManagerWriteCnt())
229233
}
230234

231235
func TestSendRequestAsyncTimeout(t *testing.T) {

tikvrpc/tikvrpc.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/tikv/client-go/v2/config"
5151
"github.com/tikv/client-go/v2/kv"
5252
"github.com/tikv/client-go/v2/oracle"
53+
"github.com/tikv/client-go/v2/util"
5354
)
5455

5556
// CmdType represents the concrete request type in Request or response type in Response.
@@ -818,19 +819,22 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res
818819
// to be handled in Recv() function. This struct facilitates the error handling.
819820
type CopStreamResponse struct {
820821
tikvpb.Tikv_CoprocessorStreamClient
821-
*coprocessor.Response // The first result of Recv()
822-
Timeout time.Duration
823-
Lease // Shared by this object and a background goroutine.
824-
Ctx context.Context
825-
Bypass bool
822+
*coprocessor.Response // The first result of Recv()
823+
Timeout time.Duration
824+
Lease // Shared by this object and a background goroutine.
825+
Ctx context.Context
826+
Bypass bool
827+
CountResourceManagerRPC bool
826828
}
827829

828830
// BatchCopStreamResponse comprises the BatchCoprocessorClient , the first result and timeout detector.
829831
type BatchCopStreamResponse struct {
830832
tikvpb.Tikv_BatchCoprocessorClient
831833
*coprocessor.BatchResponse
832-
Timeout time.Duration
833-
Lease // Shared by this object and a background goroutine.
834+
Timeout time.Duration
835+
Lease // Shared by this object and a background goroutine.
836+
Ctx context.Context
837+
CountResourceManagerRPC bool
834838
}
835839

836840
// MPPStreamResponse is indeed a wrapped client that can receive data packet from tiflash mpp server.
@@ -1320,6 +1324,12 @@ func (resp *CopStreamResponse) Recv() (*coprocessor.Response, error) {
13201324
atomic.StoreInt64(&resp.deadline, 0) // Stop the lease check.
13211325
if ret != nil {
13221326
resp.Response = ret
1327+
readRPCCount := int64(0)
1328+
if resp.CountResourceManagerRPC {
1329+
readRPCCount = 1
1330+
resp.CountResourceManagerRPC = false
1331+
}
1332+
util.UpdateTiKVRUV2RawDetails(resp.Ctx, ret.GetExecDetailsV2(), readRPCCount, 0)
13231333
if !resp.Bypass {
13241334
config.UpdateTiKVRUV2FromExecDetailsV2(resp.Ctx, ret.GetExecDetailsV2(), 0)
13251335
}
@@ -1347,6 +1357,12 @@ func (resp *BatchCopStreamResponse) Recv() (*coprocessor.BatchResponse, error) {
13471357
atomic.StoreInt64(&resp.deadline, 0) // Stop the lease check.
13481358
if ret != nil {
13491359
resp.BatchResponse = ret
1360+
readRPCCount := int64(0)
1361+
if resp.CountResourceManagerRPC {
1362+
readRPCCount = 1
1363+
resp.CountResourceManagerRPC = false
1364+
}
1365+
util.UpdateTiKVRUV2RawDetails(resp.Ctx, nil, readRPCCount, 0)
13501366
}
13511367
return ret, errors.WithStack(err)
13521368
}

tikvrpc/tikvrpc_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,11 @@ func TestCopStreamResponseRecvBypass(t *testing.T) {
171171
makeResponse := func() *coprocessor.Response {
172172
return &coprocessor.Response{
173173
ExecDetailsV2: &kvrpcpb.ExecDetailsV2{
174-
RuV2: &kvrpcpb.RUV2{KvEngineCacheMiss: 1},
174+
RuV2: &kvrpcpb.RUV2{
175+
KvEngineCacheMiss: 1,
176+
StorageProcessedKeysGet: 2,
177+
StorageProcessedKeysBatchGet: 3,
178+
},
175179
},
176180
}
177181
}
@@ -182,10 +186,15 @@ func TestCopStreamResponseRecvBypass(t *testing.T) {
182186
resp := &CopStreamResponse{
183187
Tikv_CoprocessorStreamClient: &mockCoprocessorStreamClient{resp: makeResponse()},
184188
Ctx: ctx,
189+
CountResourceManagerRPC: true,
185190
}
186191
_, err := resp.Recv()
187192
require.NoError(t, err)
188193
require.Greater(t, ruDetails.TiKVRUV2(), 0.0)
194+
require.Equal(t, int64(1), ruDetails.ResourceManagerReadCnt())
195+
require.Equal(t, int64(0), ruDetails.ResourceManagerWriteCnt())
196+
require.Equal(t, int64(2), ruDetails.TiKVStorageProcessedKeysGet())
197+
require.Equal(t, int64(3), ruDetails.TiKVStorageProcessedKeysBatchGet())
189198
})
190199

191200
t.Run("bypass stream skips tikv ruv2", func(t *testing.T) {
@@ -195,9 +204,13 @@ func TestCopStreamResponseRecvBypass(t *testing.T) {
195204
Tikv_CoprocessorStreamClient: &mockCoprocessorStreamClient{resp: makeResponse()},
196205
Ctx: ctx,
197206
Bypass: true,
207+
CountResourceManagerRPC: true,
198208
}
199209
_, err := resp.Recv()
200210
require.NoError(t, err)
201211
require.Zero(t, ruDetails.TiKVRUV2())
212+
require.Equal(t, int64(1), ruDetails.ResourceManagerReadCnt())
213+
require.Equal(t, int64(2), ruDetails.TiKVStorageProcessedKeysGet())
214+
require.Equal(t, int64(3), ruDetails.TiKVStorageProcessedKeysBatchGet())
202215
})
203216
}

util/execdetails.go

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,12 @@ type RUDetails struct {
816816
tiflashRU *uatomic.Float64
817817
// tikvRUV2 stores TiKV RU v2 value in scaled units.
818818
tikvRUV2 *uatomic.Float64
819+
// The following fields preserve TiKV-side RUv2 raw counters that TiDB
820+
// needs for statement-level detail output and TiDB-side RUv2 calculation.
821+
resourceManagerReadCnt int64
822+
resourceManagerWriteCnt int64
823+
tikvStorageProcessedKeysGet int64
824+
tikvStorageProcessedKeysBatchGet int64
819825
}
820826

821827
// NewRUDetails creates a new RUDetails.
@@ -843,13 +849,18 @@ func NewRUDetailsWith(rru, wru float64, waitDur time.Duration) *RUDetails {
843849

844850
// Clone implements the RuntimeStats interface.
845851
func (rd *RUDetails) Clone() *RUDetails {
846-
return &RUDetails{
852+
cloned := &RUDetails{
847853
readRU: uatomic.NewFloat64(rd.readRU.Load()),
848854
writeRU: uatomic.NewFloat64(rd.writeRU.Load()),
849855
ruWaitDuration: uatomic.NewDuration(rd.ruWaitDuration.Load()),
850856
tiflashRU: uatomic.NewFloat64(rd.tiflashRU.Load()),
851857
tikvRUV2: uatomic.NewFloat64(rd.tikvRUV2.Load()),
852858
}
859+
atomic.StoreInt64(&cloned.resourceManagerReadCnt, atomic.LoadInt64(&rd.resourceManagerReadCnt))
860+
atomic.StoreInt64(&cloned.resourceManagerWriteCnt, atomic.LoadInt64(&rd.resourceManagerWriteCnt))
861+
atomic.StoreInt64(&cloned.tikvStorageProcessedKeysGet, atomic.LoadInt64(&rd.tikvStorageProcessedKeysGet))
862+
atomic.StoreInt64(&cloned.tikvStorageProcessedKeysBatchGet, atomic.LoadInt64(&rd.tikvStorageProcessedKeysBatchGet))
863+
return cloned
853864
}
854865

855866
// Merge implements the RuntimeStats interface.
@@ -859,6 +870,10 @@ func (rd *RUDetails) Merge(other *RUDetails) {
859870
rd.ruWaitDuration.Add(other.ruWaitDuration.Load())
860871
rd.tiflashRU.Add(other.tiflashRU.Load())
861872
rd.tikvRUV2.Add(other.tikvRUV2.Load())
873+
atomic.AddInt64(&rd.resourceManagerReadCnt, other.ResourceManagerReadCnt())
874+
atomic.AddInt64(&rd.resourceManagerWriteCnt, other.ResourceManagerWriteCnt())
875+
atomic.AddInt64(&rd.tikvStorageProcessedKeysGet, other.TiKVStorageProcessedKeysGet())
876+
atomic.AddInt64(&rd.tikvStorageProcessedKeysBatchGet, other.TiKVStorageProcessedKeysBatchGet())
862877
}
863878

864879
// String implements fmt.Stringer interface.
@@ -904,6 +919,90 @@ func (rd *RUDetails) AddTiKVRUV2(delta float64) {
904919
rd.tikvRUV2.Add(delta)
905920
}
906921

922+
// AddResourceManagerReadCnt records TiKV read RPCs charged to resource management.
923+
func (rd *RUDetails) AddResourceManagerReadCnt(delta int64) {
924+
if rd == nil || delta == 0 {
925+
return
926+
}
927+
atomic.AddInt64(&rd.resourceManagerReadCnt, delta)
928+
}
929+
930+
// ResourceManagerReadCnt returns TiKV read RPCs charged to resource management.
931+
func (rd *RUDetails) ResourceManagerReadCnt() int64 {
932+
if rd == nil {
933+
return 0
934+
}
935+
return atomic.LoadInt64(&rd.resourceManagerReadCnt)
936+
}
937+
938+
// AddResourceManagerWriteCnt records TiKV write RPCs charged to resource management.
939+
func (rd *RUDetails) AddResourceManagerWriteCnt(delta int64) {
940+
if rd == nil || delta == 0 {
941+
return
942+
}
943+
atomic.AddInt64(&rd.resourceManagerWriteCnt, delta)
944+
}
945+
946+
// ResourceManagerWriteCnt returns TiKV write RPCs charged to resource management.
947+
func (rd *RUDetails) ResourceManagerWriteCnt() int64 {
948+
if rd == nil {
949+
return 0
950+
}
951+
return atomic.LoadInt64(&rd.resourceManagerWriteCnt)
952+
}
953+
954+
// AddTiKVStorageProcessedKeysBatchGet records TiKV batch-get processed keys.
955+
func (rd *RUDetails) AddTiKVStorageProcessedKeysBatchGet(delta int64) {
956+
if rd == nil || delta == 0 {
957+
return
958+
}
959+
atomic.AddInt64(&rd.tikvStorageProcessedKeysBatchGet, delta)
960+
}
961+
962+
// TiKVStorageProcessedKeysBatchGet returns TiKV batch-get processed keys.
963+
func (rd *RUDetails) TiKVStorageProcessedKeysBatchGet() int64 {
964+
if rd == nil {
965+
return 0
966+
}
967+
return atomic.LoadInt64(&rd.tikvStorageProcessedKeysBatchGet)
968+
}
969+
970+
// AddTiKVStorageProcessedKeysGet records TiKV get processed keys.
971+
func (rd *RUDetails) AddTiKVStorageProcessedKeysGet(delta int64) {
972+
if rd == nil || delta == 0 {
973+
return
974+
}
975+
atomic.AddInt64(&rd.tikvStorageProcessedKeysGet, delta)
976+
}
977+
978+
// TiKVStorageProcessedKeysGet returns TiKV get processed keys.
979+
func (rd *RUDetails) TiKVStorageProcessedKeysGet() int64 {
980+
if rd == nil {
981+
return 0
982+
}
983+
return atomic.LoadInt64(&rd.tikvStorageProcessedKeysGet)
984+
}
985+
986+
// UpdateTiKVRUV2RawDetails accumulates TiKV-side RUv2 raw counters into RUDetails.
987+
func UpdateTiKVRUV2RawDetails(ctx context.Context, details *kvrpcpb.ExecDetailsV2, resourceManagerReadCnt, resourceManagerWriteCnt int64) {
988+
if ctx == nil {
989+
return
990+
}
991+
ruDetails, _ := ctx.Value(RUDetailsCtxKey).(*RUDetails)
992+
if ruDetails == nil {
993+
return
994+
}
995+
996+
ruDetails.AddResourceManagerReadCnt(resourceManagerReadCnt)
997+
ruDetails.AddResourceManagerWriteCnt(resourceManagerWriteCnt)
998+
if details == nil || details.RuV2 == nil {
999+
return
1000+
}
1001+
ru := details.RuV2
1002+
ruDetails.AddTiKVStorageProcessedKeysBatchGet(int64(ru.StorageProcessedKeysBatchGet))
1003+
ruDetails.AddTiKVStorageProcessedKeysGet(int64(ru.StorageProcessedKeysGet))
1004+
}
1005+
9071006
// Update updates the RU runtime stats with the given consumption info.
9081007
func (rd *RUDetails) Update(consumption *rmpb.Consumption, waitDuration time.Duration) {
9091008
if rd == nil || consumption == nil {

util/execdetails_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package util
1616

1717
import (
18+
"context"
1819
"testing"
1920
"time"
2021

22+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2123
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
2224
"github.com/stretchr/testify/assert"
2325
)
@@ -423,12 +425,37 @@ func TestRUDetailsUpdateTiFlash(t *testing.T) {
423425
RRU: 3.0,
424426
WRU: 4.0,
425427
})
428+
UpdateTiKVRUV2RawDetails(context.WithValue(context.Background(), RUDetailsCtxKey, rd), &kvrpcpb.ExecDetailsV2{
429+
RuV2: &kvrpcpb.RUV2{
430+
StorageProcessedKeysBatchGet: 5,
431+
StorageProcessedKeysGet: 7,
432+
},
433+
}, 11, 13)
426434

427435
assert.InDelta(t, 4.5, rd.RRU(), 1e-9)
428436
assert.InDelta(t, 6.5, rd.WRU(), 1e-9)
429437
assert.InDelta(t, 7.0, rd.TiflashRU(), 1e-9)
430438
assert.Equal(t, 3*time.Millisecond, rd.RUWaitDuration())
439+
assert.Equal(t, int64(11), rd.ResourceManagerReadCnt())
440+
assert.Equal(t, int64(13), rd.ResourceManagerWriteCnt())
441+
assert.Equal(t, int64(5), rd.TiKVStorageProcessedKeysBatchGet())
442+
assert.Equal(t, int64(7), rd.TiKVStorageProcessedKeysGet())
431443

432444
cloned := rd.Clone()
433445
assert.InDelta(t, rd.TiflashRU(), cloned.TiflashRU(), 1e-9)
446+
assert.Equal(t, rd.ResourceManagerReadCnt(), cloned.ResourceManagerReadCnt())
447+
assert.Equal(t, rd.ResourceManagerWriteCnt(), cloned.ResourceManagerWriteCnt())
448+
assert.Equal(t, rd.TiKVStorageProcessedKeysBatchGet(), cloned.TiKVStorageProcessedKeysBatchGet())
449+
assert.Equal(t, rd.TiKVStorageProcessedKeysGet(), cloned.TiKVStorageProcessedKeysGet())
450+
451+
other := NewRUDetails()
452+
other.AddResourceManagerReadCnt(1)
453+
other.AddResourceManagerWriteCnt(2)
454+
other.AddTiKVStorageProcessedKeysBatchGet(3)
455+
other.AddTiKVStorageProcessedKeysGet(4)
456+
rd.Merge(other)
457+
assert.Equal(t, int64(12), rd.ResourceManagerReadCnt())
458+
assert.Equal(t, int64(15), rd.ResourceManagerWriteCnt())
459+
assert.Equal(t, int64(8), rd.TiKVStorageProcessedKeysBatchGet())
460+
assert.Equal(t, int64(11), rd.TiKVStorageProcessedKeysGet())
434461
}

0 commit comments

Comments
 (0)