Skip to content

Commit 1540767

Browse files
committed
tso: address callee id follow-up comments
Signed-off-by: bufferflies <1045931706@qq.com>
1 parent eeaa902 commit 1540767

File tree

5 files changed

+81
-12
lines changed

5 files changed

+81
-12
lines changed

client/clients/tso/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func (c *Cli) tryCreateTSODispatcher() {
539539
if len(url) == 0 {
540540
return
541541
}
542-
dispatcher := newTSODispatcher(c.ctx, defaultMaxTSOBatchSize, c)
542+
dispatcher := newTSODispatcher(c.ctx, c)
543543
c.wg.Add(1)
544544
go dispatcher.handleDispatcher(&c.wg)
545545
// Try to set the dispatcher atomically.

client/clients/tso/dispatcher.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,10 @@ type tsoDispatcher struct {
8484

8585
func newTSODispatcher(
8686
ctx context.Context,
87-
maxBatchSize int,
8887
provider tsoServiceProvider,
8988
) *tsoDispatcher {
9089
dispatcherCtx, dispatcherCancel := context.WithCancel(ctx)
91-
tsoRequestCh := make(chan *Request, maxBatchSize*2)
90+
tsoRequestCh := make(chan *Request, defaultMaxTSOBatchSize*2)
9291
failpoint.Inject("shortDispatcherChannel", func() {
9392
tsoRequestCh = make(chan *Request, 1)
9493
})
@@ -106,7 +105,7 @@ func newTSODispatcher(
106105
batchBufferPool: &sync.Pool{
107106
New: func() any {
108107
return batch.NewController[*Request](
109-
maxBatchSize*2,
108+
defaultMaxTSOBatchSize*2,
110109
tsoRequestFinisher(0, 0, invalidStreamID),
111110
metrics.TSOBestBatchSize,
112111
)

client/clients/tso/dispatcher_test.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@ import (
2626
"github.com/stretchr/testify/suite"
2727
"go.uber.org/goleak"
2828
"go.uber.org/zap/zapcore"
29+
"google.golang.org/grpc"
2930

3031
"github.com/pingcap/failpoint"
3132
"github.com/pingcap/log"
3233

34+
"github.com/tikv/pd/client/errs"
3335
"github.com/tikv/pd/client/opt"
3436
cctx "github.com/tikv/pd/client/pkg/connectionctx"
37+
"github.com/tikv/pd/client/pkg/retry"
3538
"github.com/tikv/pd/client/pkg/utils/testutil"
3639
sd "github.com/tikv/pd/client/servicediscovery"
3740
)
@@ -44,6 +47,7 @@ type mockTSOServiceProvider struct {
4447
option *opt.Option
4548
createStream func(ctx context.Context) *tsoStream
4649
conCtxMgr *cctx.Manager[*tsoStream]
50+
svcDiscovery sd.ServiceDiscovery
4751
}
4852

4953
func newMockTSOServiceProvider(option *opt.Option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider {
@@ -58,7 +62,10 @@ func (m *mockTSOServiceProvider) getOption() *opt.Option {
5862
return m.option
5963
}
6064

61-
func (*mockTSOServiceProvider) getServiceDiscovery() sd.ServiceDiscovery {
65+
func (m *mockTSOServiceProvider) getServiceDiscovery() sd.ServiceDiscovery {
66+
if m.svcDiscovery != nil {
67+
return m.svcDiscovery
68+
}
6269
return sd.NewMockServiceDiscovery([]string{mockStreamURL}, nil)
6370
}
6471

@@ -108,7 +115,7 @@ func (s *testTSODispatcherSuite) SetupTest() {
108115
created.Store(true)
109116
return s.stream
110117
}
111-
s.dispatcher = newTSODispatcher(context.Background(), defaultMaxTSOBatchSize, newMockTSOServiceProvider(s.option, createStream))
118+
s.dispatcher = newTSODispatcher(context.Background(), newMockTSOServiceProvider(s.option, createStream))
112119
s.reqPool = &sync.Pool{
113120
New: func() any {
114121
return &Request{
@@ -183,6 +190,37 @@ func TestTSODispatcherTestSuite(t *testing.T) {
183190
suite.Run(t, new(testTSODispatcherSuite))
184191
}
185192

193+
type countingServiceDiscovery struct {
194+
removeCount atomic.Int32
195+
scheduleCount atomic.Int32
196+
}
197+
198+
func (*countingServiceDiscovery) Init() error { return nil }
199+
func (*countingServiceDiscovery) Close() {}
200+
func (*countingServiceDiscovery) GetClusterID() uint64 { return 0 }
201+
func (*countingServiceDiscovery) GetKeyspaceID() uint32 { return 0 }
202+
func (*countingServiceDiscovery) SetKeyspaceID(uint32) {}
203+
func (*countingServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 }
204+
func (*countingServiceDiscovery) GetServiceURLs() []string { return nil }
205+
func (*countingServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil }
206+
func (*countingServiceDiscovery) GetClientConns() *sync.Map { return &sync.Map{} }
207+
func (*countingServiceDiscovery) GetServingURL() string { return "" }
208+
func (*countingServiceDiscovery) GetBackupURLs() []string { return nil }
209+
func (*countingServiceDiscovery) GetServiceClient() sd.ServiceClient { return nil }
210+
func (*countingServiceDiscovery) GetServiceClientByKind(sd.APIKind) sd.ServiceClient { return nil }
211+
func (*countingServiceDiscovery) GetAllServiceClients() []sd.ServiceClient { return nil }
212+
func (*countingServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, error) {
213+
return nil, nil
214+
}
215+
func (s *countingServiceDiscovery) RemoveClientConn(string) { s.removeCount.Add(1) }
216+
func (s *countingServiceDiscovery) ScheduleCheckMemberChanged() {
217+
s.scheduleCount.Add(1)
218+
}
219+
func (*countingServiceDiscovery) CheckMemberChanged() error { return nil }
220+
func (*countingServiceDiscovery) ExecAndAddLeaderSwitchedCallback(sd.LeaderSwitchedCallbackFunc) {}
221+
func (*countingServiceDiscovery) AddLeaderSwitchedCallback(sd.LeaderSwitchedCallbackFunc) {}
222+
func (*countingServiceDiscovery) AddMembersChangedCallback(func()) {}
223+
186224
func (s *testTSODispatcherSuite) TestBasic() {
187225
ctx := context.Background()
188226
req := s.sendReq(ctx)
@@ -196,6 +234,29 @@ func (s *testTSODispatcherSuite) TestBasic() {
196234
s.reqMustNotReady(req)
197235
}
198236

237+
func (s *testTSODispatcherSuite) TestHandleProcessRequestErrorRemoveConnOnCalleeMismatch() {
238+
svcDiscovery := &countingServiceDiscovery{}
239+
td := newTSODispatcher(context.Background(), &mockTSOServiceProvider{
240+
option: s.option,
241+
conCtxMgr: cctx.NewManager[*tsoStream](),
242+
svcDiscovery: svcDiscovery,
243+
})
244+
defer td.close()
245+
bo := retry.InitialBackoffer(time.Millisecond, time.Millisecond, time.Millisecond)
246+
247+
ok := td.handleProcessRequestError(
248+
context.Background(),
249+
bo,
250+
cctx.NewManager[*tsoStream](),
251+
mockStreamURL,
252+
fmt.Errorf("%s", errs.MismatchCalleeIDErr),
253+
)
254+
255+
s.re.True(ok)
256+
s.re.Equal(int32(1), svcDiscovery.removeCount.Load())
257+
s.re.Zero(svcDiscovery.scheduleCount.Load())
258+
}
259+
199260
func (s *testTSODispatcherSuite) checkIdleTokenCount(expectedTotal int) {
200261
// When the tsoDispatcher is idle, the dispatcher loop will acquire a token and wait for requests. Therefore
201262
// there should be N-1 free tokens remaining.
@@ -358,7 +419,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) {
358419
return req
359420
}
360421

361-
dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(opt.NewOption(), nil))
422+
dispatcher := newTSODispatcher(ctx, newMockTSOServiceProvider(opt.NewOption(), nil))
362423
var wg sync.WaitGroup
363424
wg.Add(1)
364425

pkg/mcs/tso/server/grpc_service.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"io"
2020
"net/http"
21-
"net/url"
2221
"strconv"
2322
"time"
2423

@@ -54,6 +53,7 @@ func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
5453
// Service is the TSO grpc service.
5554
type Service struct {
5655
*Server
56+
advertiseListenHost string
5757
}
5858

5959
// NewService creates a new TSO service.
@@ -63,7 +63,8 @@ func NewService(svr bs.Server) registry.RegistrableService {
6363
log.Fatal("create tso server failed")
6464
}
6565
return &Service{
66-
Server: server,
66+
Server: server,
67+
advertiseListenHost: getAdvertiseListenHost(server.GetAdvertiseListenAddr()),
6768
}
6869
}
6970

@@ -102,9 +103,8 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
102103
if clusterID != keypath.ClusterID() {
103104
return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID)
104105
}
105-
if calleeID := header.GetCalleeId(); calleeID != "" {
106-
advertiseURL, _ := url.Parse(s.GetAdvertiseListenAddr())
107-
if advertiseURL != nil && advertiseURL.Host != "" && calleeID != advertiseURL.Host {
106+
if calleeID := header.GetCalleeId(); calleeID != "" && s.advertiseListenHost != "" {
107+
if calleeID != s.advertiseListenHost {
108108
return status.Errorf(
109109
codes.FailedPrecondition, "mismatch callee id, need %s but got %s",
110110
s.GetAdvertiseListenAddr(), calleeID,

pkg/mcs/tso/server/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"net/http"
21+
"net/url"
2122
"os"
2223
"os/signal"
2324
"runtime"
@@ -92,6 +93,14 @@ type Server struct {
9293
cgMonitor cgroup.Monitor
9394
}
9495

96+
func getAdvertiseListenHost(advertiseListenAddr string) string {
97+
parsed, err := url.Parse(advertiseListenAddr)
98+
if err != nil || parsed.Host == "" {
99+
return ""
100+
}
101+
return parsed.Host
102+
}
103+
95104
// Implement the following methods defined in bs.Server
96105

97106
// Name returns the unique name for this server in the TSO cluster.

0 commit comments

Comments
 (0)