Skip to content

Commit d5786ac

Browse files
committed
keyspace metrics
Signed-off-by: 童剑 <1045931706@qq.com>
1 parent e2f7162 commit d5786ac

File tree

9 files changed

+90
-59
lines changed

9 files changed

+90
-59
lines changed

client/client.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func createClientWithKeyspace(
248248
callerComponent: adjustCallerComponent(callerComponent),
249249
inner: &innerClient{
250250
keyspaceID: keyspaceID,
251+
keyspaceName: constants.NullKeyspaceName,
251252
svrUrls: svrAddrs,
252253
updateTokenConnectionCh: make(chan struct{}, 1),
253254
ctx: clientCtx,
@@ -262,7 +263,21 @@ func createClientWithKeyspace(
262263
opt(c.inner.option)
263264
}
264265

265-
return c, c.inner.init(nil)
266+
updateKeyspaceNameFunc := func() error {
267+
metas, err := c.GetAllKeyspaces(clientCtx, keyspaceID, 1)
268+
if err != nil {
269+
return err
270+
}
271+
for _, m := range metas {
272+
if m.GetId() == keyspaceID {
273+
c.inner.keyspaceName = m.GetName()
274+
break
275+
}
276+
}
277+
return nil
278+
}
279+
280+
return c, c.inner.init(updateKeyspaceNameFunc)
266281
}
267282

268283
// APIVersion is the API version the server and the client is using.
@@ -368,6 +383,7 @@ func newClientWithKeyspaceName(
368383
// Create a service discovery with null keyspace id, then query the real id with the keyspace name,
369384
// finally update the keyspace id to the service discovery for the following interactions.
370385
keyspaceID: constants.NullKeyspaceID,
386+
keyspaceName: keyspaceName,
371387
updateTokenConnectionCh: make(chan struct{}, 1),
372388
ctx: clientCtx,
373389
cancel: clientCancel,

client/clients/tso/client.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,14 @@ type Cli struct {
8888
// tsoReqPool is the pool to recycle `*tsoRequest`.
8989
tsoReqPool *sync.Pool
9090
// dispatcher is used to dispatch the TSO requests to the channel.
91-
dispatcher atomic.Pointer[tsoDispatcher]
91+
dispatcher atomic.Pointer[tsoDispatcher]
92+
keyspaceName string
9293
}
9394

9495
// NewClient returns a new TSO client.
9596
func NewClient(
9697
ctx context.Context, option *opt.Option,
97-
svcDiscovery sd.ServiceDiscovery, factory tsoStreamBuilderFactory,
98+
svcDiscovery sd.ServiceDiscovery, factory tsoStreamBuilderFactory, keyspaceName string,
9899
) *Cli {
99100
ctx, cancel := context.WithCancel(ctx)
100101
c := &Cli{
@@ -114,6 +115,7 @@ func NewClient(
114115
}
115116
},
116117
},
118+
keyspaceName: keyspaceName,
117119
}
118120

119121
c.svcDiscovery.ExecAndAddLeaderSwitchedCallback(c.updateTSOLeaderURL)
@@ -338,7 +340,7 @@ func (c *Cli) tryConnectToTSO(ctx context.Context) error {
338340
}
339341
if cc != nil {
340342
cctx, cancel := context.WithCancel(ctx)
341-
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout)
343+
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc, c.keyspaceName).build(cctx, cancel, c.option.Timeout)
342344
failpoint.Inject("unreachableNetwork", func() {
343345
stream = nil
344346
err = status.New(codes.Unavailable, "unavailable").Err()
@@ -382,7 +384,7 @@ func (c *Cli) tryConnectToTSO(ctx context.Context) error {
382384
// create the follower stream
383385
cctx, cancel := context.WithCancel(ctx)
384386
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
385-
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.Timeout)
387+
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn, c.keyspaceName).build(cctx, cancel, c.option.Timeout)
386388
if err == nil {
387389
forwardedHostTrim := tlsutil.TrimHTTPPrefix(forwardedHost)
388390
addr := tlsutil.TrimHTTPPrefix(backupURL)
@@ -431,7 +433,7 @@ func (c *Cli) checkLeader(
431433
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
432434
// create a stream of the original tso leader
433435
cctx, cancel := context.WithCancel(ctx)
434-
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout)
436+
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc, c.keyspaceName).build(cctx, cancel, c.option.Timeout)
435437
if err == nil && stream != nil {
436438
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url))
437439
c.conCtxMgr.CleanAllAndStore(cctx, cancel, url, stream)
@@ -518,7 +520,7 @@ func (c *Cli) getAllTSOStreamBuilders() map[string]tsoStreamBuilder {
518520
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
519521
healthCancel()
520522
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
521-
streamBuilders[addr] = c.makeBuilder(cc)
523+
streamBuilders[addr] = c.makeBuilder(cc, c.keyspaceName)
522524
}
523525
}
524526
return streamBuilders

client/clients/tso/dispatcher_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/failpoint"
3131
"github.com/pingcap/log"
3232

33+
"github.com/tikv/pd/client/constants"
3334
"github.com/tikv/pd/client/opt"
3435
cctx "github.com/tikv/pd/client/pkg/connectionctx"
3536
"github.com/tikv/pd/client/pkg/utils/testutil"
@@ -73,7 +74,7 @@ func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context) bool
7374
cctx, cancel := context.WithCancel(ctx)
7475
var stream *tsoStream
7576
if m.createStream == nil {
76-
stream = newTSOStream(cctx, mockStreamURL, newMockTSOStreamImpl(ctx, resultModeGenerated))
77+
stream = newTSOStream(cctx, mockStreamURL, newMockTSOStreamImpl(ctx, resultModeGenerated), constants.DefaultKeyspaceName)
7778
} else {
7879
stream = m.createStream(ctx)
7980
}
@@ -104,7 +105,7 @@ func (s *testTSODispatcherSuite) SetupTest() {
104105
created := new(atomic.Bool)
105106
createStream := func(ctx context.Context) *tsoStream {
106107
s.streamInner = newMockTSOStreamImpl(ctx, resultModeGenerateOnSignal)
107-
s.stream = newTSOStream(ctx, mockStreamURL, s.streamInner)
108+
s.stream = newTSOStream(ctx, mockStreamURL, s.streamInner, constants.DefaultKeyspaceName)
108109
created.Store(true)
109110
return s.stream
110111
}

client/clients/tso/stream.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,21 @@ import (
4141
// TSO Stream Builder Factory
4242

4343
type tsoStreamBuilderFactory interface {
44-
makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder
44+
makeBuilder(cc *grpc.ClientConn, keyspaceName string) tsoStreamBuilder
4545
}
4646

4747
// PDStreamBuilderFactory is a factory for building TSO streams to the PD cluster.
4848
type PDStreamBuilderFactory struct{}
4949

50-
func (*PDStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
51-
return &pdStreamBuilder{client: pdpb.NewPDClient(cc), serverURL: cc.Target()}
50+
func (*PDStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn, keyspaceName string) tsoStreamBuilder {
51+
return &pdStreamBuilder{client: pdpb.NewPDClient(cc), serverURL: cc.Target(), keyspaceName: keyspaceName}
5252
}
5353

5454
// MSStreamBuilderFactory is a factory for building TSO streams to the microservice cluster.
5555
type MSStreamBuilderFactory struct{}
5656

57-
func (*MSStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
58-
return &msStreamBuilder{client: tsopb.NewTSOClient(cc), serverURL: cc.Target()}
57+
func (*MSStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn, keyspaceName string) tsoStreamBuilder {
58+
return &msStreamBuilder{client: tsopb.NewTSOClient(cc), serverURL: cc.Target(), keyspaceName: keyspaceName}
5959
}
6060

6161
// TSO Stream Builder
@@ -65,8 +65,9 @@ type tsoStreamBuilder interface {
6565
}
6666

6767
type pdStreamBuilder struct {
68-
serverURL string
69-
client pdpb.PDClient
68+
serverURL string
69+
keyspaceName string
70+
client pdpb.PDClient
7071
}
7172

7273
func (b *pdStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) {
@@ -76,14 +77,15 @@ func (b *pdStreamBuilder) build(ctx context.Context, cancel context.CancelFunc,
7677
stream, err := b.client.Tso(ctx)
7778
done <- struct{}{}
7879
if err == nil {
79-
return newTSOStream(ctx, b.serverURL, pdTSOStreamAdapter{stream}), nil
80+
return newTSOStream(ctx, b.serverURL, pdTSOStreamAdapter{stream}, b.keyspaceName), nil
8081
}
8182
return nil, err
8283
}
8384

8485
type msStreamBuilder struct {
85-
serverURL string
86-
client tsopb.TSOClient
86+
serverURL string
87+
client tsopb.TSOClient
88+
keyspaceName string
8789
}
8890

8991
func (b *msStreamBuilder) build(
@@ -95,7 +97,7 @@ func (b *msStreamBuilder) build(
9597
stream, err := b.client.Tso(ctx)
9698
done <- struct{}{}
9799
if err == nil {
98-
return newTSOStream(ctx, b.serverURL, tsoTSOStreamAdapter{stream}), nil
100+
return newTSOStream(ctx, b.serverURL, tsoTSOStreamAdapter{stream}, b.keyspaceName), nil
99101
}
100102
return nil, err
101103
}
@@ -220,6 +222,8 @@ type tsoStream struct {
220222

221223
ongoingRequestCountGauge prometheus.Gauge
222224
ongoingRequests atomic.Int32
225+
226+
keyspaceName string
223227
}
224228

225229
const (
@@ -235,7 +239,7 @@ const (
235239
maxPendingRequestsInTSOStream = 64
236240
)
237241

238-
func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream {
242+
func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter, keyspaceName string) *tsoStream {
239243
streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1))
240244
// To make error handling in `tsoDispatcher` work, the internal `cancel` and external `cancel` is better to be
241245
// distinguished.
@@ -250,6 +254,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda
250254
cancel: cancel,
251255

252256
ongoingRequestCountGauge: metrics.OngoingRequestCountGauge.WithLabelValues(streamID),
257+
keyspaceName: keyspaceName,
253258
}
254259
s.wg.Add(1)
255260
go s.recvLoop(ctx)
@@ -390,6 +395,13 @@ func (s *tsoStream) recvLoop(ctx context.Context) {
390395
// Update the metrics in seconds.
391396
metrics.EstimateTSOLatencyGauge.WithLabelValues(s.streamID).Set(micros * 1e-6)
392397
}
398+
successObserver := sync.OnceValue(func() prometheus.Observer {
399+
return metrics.RequestDuration.WithLabelValues("tso", s.keyspaceName)
400+
})
401+
402+
failedObserver := sync.OnceValue(func() prometheus.Observer {
403+
return metrics.RequestDuration.WithLabelValues("tso-failed", s.keyspaceName)
404+
})
393405

394406
recvLoop:
395407
for {
@@ -401,7 +413,6 @@ recvLoop:
401413
}
402414

403415
res, err := s.stream.Recv()
404-
405416
// Try to load the corresponding `batchedRequests`. If `Recv` is successful, there must be a request pending
406417
// in the queue.
407418
select {
@@ -419,7 +430,7 @@ recvLoop:
419430
// Note that it's also possible that the stream is broken due to network without being requested. In this
420431
// case, `Recv` may return an error while no request is pending.
421432
if hasReq {
422-
metrics.RequestFailedDurationTSO.Observe(latencySeconds)
433+
successObserver().Observe(latencySeconds)
423434
}
424435
if err == io.EOF {
425436
finishWithErr = errors.WithStack(errs.ErrClientTSOStreamClosed)
@@ -431,8 +442,7 @@ recvLoop:
431442
finishWithErr = errors.New("tsoStream timing order broken")
432443
break recvLoop
433444
}
434-
435-
metrics.RequestDurationTSO.Observe(latencySeconds)
445+
failedObserver().Observe(latencySeconds)
436446
metrics.TSOBatchSize.Observe(float64(res.count))
437447
updateEstimatedLatency(currentReq.startTime, latency)
438448

client/clients/tso/stream_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/errors"
2929
"github.com/pingcap/log"
3030

31+
"github.com/tikv/pd/client/constants"
3132
"github.com/tikv/pd/client/errs"
3233
)
3334

@@ -54,11 +55,12 @@ const (
5455
)
5556

5657
type mockTSOStreamImpl struct {
57-
ctx context.Context
58-
requestCh chan requestMsg
59-
resultCh chan resultMsg
60-
keyspaceID uint32
61-
errorState error
58+
ctx context.Context
59+
requestCh chan requestMsg
60+
resultCh chan resultMsg
61+
keyspaceID uint32
62+
keyspaceName string
63+
errorState error
6264

6365
resultMode resultMode
6466
// Current progress of generating TSO results
@@ -67,10 +69,11 @@ type mockTSOStreamImpl struct {
6769

6870
func newMockTSOStreamImpl(ctx context.Context, resultMode resultMode) *mockTSOStreamImpl {
6971
return &mockTSOStreamImpl{
70-
ctx: ctx,
71-
requestCh: make(chan requestMsg, 64),
72-
resultCh: make(chan resultMsg, 64),
73-
keyspaceID: 0,
72+
ctx: ctx,
73+
requestCh: make(chan requestMsg, 64),
74+
resultCh: make(chan resultMsg, 64),
75+
keyspaceID: constants.DefaultKeyspaceID,
76+
keyspaceName: constants.DefaultKeyspaceName,
7477

7578
resultMode: resultMode,
7679
resGenPhysical: 10000,
@@ -271,7 +274,7 @@ type testTSOStreamSuite struct {
271274
func (s *testTSOStreamSuite) SetupTest() {
272275
s.re = require.New(s.T())
273276
s.inner = newMockTSOStreamImpl(context.Background(), resultModeManual)
274-
s.stream = newTSOStream(context.Background(), mockStreamURL, s.inner)
277+
s.stream = newTSOStream(context.Background(), mockStreamURL, s.inner, constants.DefaultKeyspaceName)
275278
}
276279

277280
func (s *testTSOStreamSuite) TearDownTest() {
@@ -609,7 +612,7 @@ func BenchmarkTSOStreamSendRecv(b *testing.B) {
609612
log.SetLevel(zapcore.FatalLevel)
610613

611614
streamInner := newMockTSOStreamImpl(context.Background(), resultModeGenerated)
612-
stream := newTSOStream(context.Background(), mockStreamURL, streamInner)
615+
stream := newTSOStream(context.Background(), mockStreamURL, streamInner, constants.DefaultKeyspaceName)
613616
defer func() {
614617
streamInner.stop()
615618
stream.WaitForClosed()

client/constants/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
MaxKeyspaceID = uint32(0xFFFFFF)
2727
// NullKeyspaceID is used for API v1 or legacy path where is keyspace agnostic.
2828
NullKeyspaceID = uint32(0xFFFFFFFF)
29+
// NullKeyspaceName is used for API v1 or legacy path where is keyspace agnostic.
30+
NullKeyspaceName = ""
2931
// DefaultKeyspaceGroupID is the default key space group id.
3032
// We also reserved 0 for the keyspace group for the same purpose.
3133
DefaultKeyspaceGroupID = uint32(0)

client/inner_client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242

4343
type innerClient struct {
4444
keyspaceID uint32
45+
keyspaceName string
4546
svrUrls []string
4647
serviceDiscovery sd.ServiceDiscovery
4748
tokenDispatcher *tokenDispatcher
@@ -62,7 +63,7 @@ type innerClient struct {
6263
func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
6364
c.serviceDiscovery = sd.NewServiceDiscovery(
6465
c.ctx, c.cancel, &c.wg, c.setServiceMode,
65-
updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
66+
updateKeyspaceIDCb, c.keyspaceID, c.keyspaceName, c.svrUrls, c.tlsCfg, c.option)
6667
if err := c.setup(); err != nil {
6768
c.cancel()
6869
if c.serviceDiscovery != nil {
@@ -181,21 +182,21 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
181182
switch mode {
182183
case pdpb.ServiceMode_PD_SVC_MODE:
183184
newTSOCli = tso.NewClient(c.ctx, c.option,
184-
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
185+
c.serviceDiscovery, &tso.PDStreamBuilderFactory{}, c.keyspaceName)
185186
case pdpb.ServiceMode_API_SVC_MODE:
186187
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
187188
c.ctx, c, c.serviceDiscovery,
188189
c.keyspaceID, c.tlsCfg, c.option)
189-
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
190-
// and will be updated later.
191-
newTSOCli = tso.NewClient(c.ctx, c.option,
192-
newTSOSvcDiscovery, &tso.MSStreamBuilderFactory{})
193190
if err := newTSOSvcDiscovery.Init(); err != nil {
194191
log.Error("[pd] failed to initialize tso service discovery",
195192
zap.Strings("svr-urls", c.svrUrls),
196193
zap.Error(err))
197194
return
198195
}
196+
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
197+
// and will be updated later.
198+
newTSOCli = tso.NewClient(c.ctx, c.option,
199+
newTSOSvcDiscovery, &tso.MSStreamBuilderFactory{}, c.keyspaceName)
199200
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
200201
log.Warn("[pd] intend to switch to unknown service mode, just return")
201202
return

0 commit comments

Comments
 (0)