Skip to content

Commit 88e4f7d

Browse files
authored
tso: validate callee id for tso requests (tikv#439)
* tso: validate callee id for tso requests Signed-off-by: iosmanthus <myosmanthustree@gmail.com> * attach CalleeId to tsopb.RequestHeader Signed-off-by: iosmanthus <myosmanthustree@gmail.com> * update kvproto to tidbcloud/kvproto/release-8.5-keyspace Signed-off-by: iosmanthus <myosmanthustree@gmail.com> * compare callee id with url.Parse Signed-off-by: iosmanthus <myosmanthustree@gmail.com> * resolve comments from @rleungx Signed-off-by: iosmanthus <myosmanthustree@gmail.com> * make linter happy Signed-off-by: iosmanthus <myosmanthustree@gmail.com> --------- Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
1 parent 5a2d0d4 commit 88e4f7d

File tree

17 files changed

+5162
-39
lines changed

17 files changed

+5162
-39
lines changed

client/errs/errno.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
RetryTimeoutErr = "retry timeout"
3535
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
3636
NotPrimaryErr = "not primary"
37+
// MismatchCalleeIDErr indicates the callee ID is mismatched, usually caused by the stale DNS cache.
38+
MismatchCalleeIDErr = "mismatch callee id"
3739
)
3840

3941
// client errors

client/errs/errs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func IsLeaderChange(err error) bool {
3838
strings.Contains(errMsg, NotPrimaryErr)
3939
}
4040

41+
// IsCalleeMismatch checks whether the error is caused by callee ID mismatch.
42+
func IsCalleeMismatch(err error) bool {
43+
if err == nil {
44+
return false
45+
}
46+
errMsg := err.Error()
47+
return strings.Contains(errMsg, MismatchCalleeIDErr)
48+
}
49+
4150
// ZapError is used to make the log output easier.
4251
func ZapError(err error, causeError ...error) zap.Field {
4352
if err == nil {

client/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,5 @@ require (
4444
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
4545
gopkg.in/yaml.v3 v3.0.1 // indirect
4646
)
47+
48+
replace github.com/pingcap/kvproto => github.com/tidbcloud/kvproto v0.0.0-20251121150221-d07b34e68f9f

client/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
4848
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
4949
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
5050
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
51-
github.com/pingcap/kvproto v0.0.0-20250224053625-b6a98c6bf02d h1:52qhTQG8G8V/pHo/w7F4d2Tw98KMk2C+gAe3U8SWRAg=
52-
github.com/pingcap/kvproto v0.0.0-20250224053625-b6a98c6bf02d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
5351
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
5452
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
5553
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -79,6 +77,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
7977
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
8078
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
8179
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
80+
github.com/tidbcloud/kvproto v0.0.0-20251121150221-d07b34e68f9f h1:d2HVdcW8LM+6s0vZGz/vusUqHlAK/q82x/X35whRcGk=
81+
github.com/tidbcloud/kvproto v0.0.0-20251121150221-d07b34e68f9f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
8282
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
8383
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
8484
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

client/mock_pd_service_discovery.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ func (*mockPDServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, er
8888
return nil, nil
8989
}
9090

91+
// RemoveClientConn implements the ServiceDiscovery interface.
92+
func (*mockPDServiceDiscovery) RemoveClientConn(string) {}
93+
9194
// ScheduleCheckMemberChanged implements the ServiceDiscovery interface.
9295
func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}
9396

client/pd_service_discovery.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ type ServiceDiscovery interface {
105105
GetAllServiceClients() []ServiceClient
106106
// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given url.
107107
GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error)
108+
// RemoveClientConn removes and closes the gRPC connection of the given url.
109+
RemoveClientConn(url string)
108110
// ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change
109111
// among the leader/followers in a quorum-based cluster or among the primary/secondaries in a
110112
// primary/secondary configured cluster.
@@ -1129,6 +1131,17 @@ func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn,
11291131
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
11301132
}
11311133

1134+
// RemoveClientConn removes and closes the grpc client connection of the given URL.
1135+
func (c *pdServiceDiscovery) RemoveClientConn(url string) {
1136+
cc, ok := c.clientConns.Load(url)
1137+
if !ok {
1138+
return
1139+
}
1140+
if err := cc.(*grpc.ClientConn).Close(); err != nil {
1141+
log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
1142+
}
1143+
}
1144+
11321145
func addrsToURLs(addrs []string, tlsCfg *tls.Config) []string {
11331146
// Add default schema "http://" to addrs.
11341147
urls := make([]string, 0, len(addrs))

client/tso_client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,21 @@ func (c *tsoClient) GetTSOAllocatorServingURLByDCLocation(dcLocation string) (st
231231

232232
// GetTSOAllocatorClientConnByDCLocation returns the TSO allocator gRPC client connection of the given dcLocation.
233233
func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) {
234-
url, ok := c.tsoAllocators.Load(dcLocation)
234+
u, ok := c.tsoAllocators.Load(dcLocation)
235235
if !ok {
236236
log.Fatal("[tso] the allocator leader should exist", zap.String("dc-location", dcLocation))
237237
}
238-
cc, ok := c.svcDiscovery.GetClientConns().Load(url)
239-
if !ok {
240-
return nil, url.(string)
238+
url := u.(string)
239+
cc, err := c.svcDiscovery.GetOrCreateGRPCConn(url)
240+
if err != nil {
241+
log.Error("[tso] failed to get tso allocator client connection",
242+
zap.String("dc-location", dcLocation),
243+
zap.String("serving-url", url),
244+
errs.ZapError(err),
245+
)
246+
return nil, url
241247
}
242-
return cc.(*grpc.ClientConn), url.(string)
248+
return cc, url
243249
}
244250

245251
// AddTSOAllocatorServingURLSwitchedCallback adds callbacks which will be called

client/tso_dispatcher.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,12 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr
453453
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to error.
454454
td.connectionCtxs.Delete(streamURL)
455455
streamCancelFunc()
456-
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
457-
if errs.IsLeaderChange(err) {
456+
switch {
457+
case errs.IsCalleeMismatch(err):
458+
// If callee ID mismatches, the gRPC connection is no longer valid, we need to enforce reconnecting.
459+
svcDiscovery.RemoveClientConn(streamURL)
460+
case errs.IsLeaderChange(err):
461+
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
458462
if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil {
459463
select {
460464
case <-ctx.Done():

client/tso_service_discovery.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,17 @@ func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn,
317317
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
318318
}
319319

320+
// RemoveClientConn removes and closes the gRPC connection of the given url.
321+
func (c *tsoServiceDiscovery) RemoveClientConn(url string) {
322+
cc, ok := c.clientConns.Load(url)
323+
if !ok {
324+
return
325+
}
326+
if err := cc.(*grpc.ClientConn).Close(); err != nil {
327+
log.Error("[tso] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
328+
}
329+
}
330+
320331
// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints.
321332
func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
322333
select {

client/tso_stream.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ func (b *tsoTSOStreamBuilder) build(
8989
stream, err := b.client.Tso(ctx)
9090
done <- struct{}{}
9191
if err == nil {
92-
return newTSOStream(ctx, b.serverURL, tsoTSOStreamAdapter{stream}), nil
92+
return newTSOStream(ctx, b.serverURL, tsoTSOStreamAdapter{
93+
stream: stream,
94+
calleeID: b.serverURL,
95+
}), nil
9396
}
9497
return nil, err
9598
}
@@ -152,7 +155,8 @@ func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
152155
}
153156

154157
type tsoTSOStreamAdapter struct {
155-
stream tsopb.TSO_TsoClient
158+
stream tsopb.TSO_TsoClient
159+
calleeID string
156160
}
157161

158162
// Send implements the grpcTSOStreamAdapter interface.
@@ -162,6 +166,7 @@ func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID
162166
ClusterId: clusterID,
163167
KeyspaceId: keyspaceID,
164168
KeyspaceGroupId: keyspaceGroupID,
169+
CalleeId: s.calleeID,
165170
},
166171
Count: uint32(count),
167172
DcLocation: dcLocation,

0 commit comments

Comments
 (0)