Skip to content

Commit 6b7dc03

Browse files
authored
server/tso: fix the panic issue when getTS in the follower (tikv#1930) (tikv#1931)
Signed-off-by: nolouch <nolouch@gmail.com>
1 parent d14bd3d commit 6b7dc03

File tree

3 files changed

+69
-41
lines changed

3 files changed

+69
-41
lines changed

server/server_test.go

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -49,45 +49,6 @@ func mustWaitLeader(c *C, svrs []*Server) *Server {
4949
return leader
5050
}
5151

52-
var _ = Suite(&testLeaderServerSuite{})
53-
54-
type testLeaderServerSuite struct {
55-
svrs map[string]*Server
56-
leaderPath string
57-
}
58-
59-
func (s *testLeaderServerSuite) SetUpSuite(c *C) {
60-
s.svrs = make(map[string]*Server)
61-
62-
cfgs := NewTestMultiConfig(c, 3)
63-
64-
ch := make(chan *Server, 3)
65-
for i := 0; i < 3; i++ {
66-
cfg := cfgs[i]
67-
68-
go func() {
69-
svr, err := CreateServer(cfg, nil)
70-
c.Assert(err, IsNil)
71-
err = svr.Run(context.TODO())
72-
c.Assert(err, IsNil)
73-
ch <- svr
74-
}()
75-
}
76-
77-
for i := 0; i < 3; i++ {
78-
svr := <-ch
79-
s.svrs[svr.GetAddr()] = svr
80-
s.leaderPath = svr.getLeaderPath()
81-
}
82-
}
83-
84-
func (s *testLeaderServerSuite) TearDownSuite(c *C) {
85-
for _, svr := range s.svrs {
86-
svr.Close()
87-
cleanServer(svr.cfg)
88-
}
89-
}
90-
9152
var _ = Suite(&testServerSuite{})
9253

9354
type testServerSuite struct{}
@@ -100,6 +61,7 @@ func newTestServersWithCfgs(c *C, cfgs []*Config) ([]*Server, CleanupFunc) {
10061
go func(cfg *Config) {
10162
svr, err := CreateServer(cfg, nil)
10263
c.Assert(err, IsNil)
64+
c.Assert(svr, NotNil)
10365
err = svr.Run(context.TODO())
10466
c.Assert(err, IsNil)
10567
ch <- svr

server/tso.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (s *Server) updateTimestamp() error {
214214
return nil
215215
}
216216

217-
const maxRetryCount = 100
217+
var maxRetryCount = 100
218218

219219
func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
220220
var resp pdpb.Timestamp
@@ -225,7 +225,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
225225

226226
for i := 0; i < maxRetryCount; i++ {
227227
current := (*atomicObject)(atomic.LoadPointer(&s.ts))
228-
if current.physical == zeroTime {
228+
if current == nil || current.physical == zeroTime {
229229
log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i))
230230
time.Sleep(200 * time.Millisecond)
231231
continue

server/tso_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package server
1515

1616
import (
1717
"context"
18+
"strings"
1819
"sync"
1920
"time"
2021

@@ -193,3 +194,68 @@ func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Membe
193194
c.Fatal("get leader error")
194195
return nil
195196
}
197+
198+
var _ = Suite(&testFollowerTsoSuite{})
199+
200+
type testFollowerTsoSuite struct {
201+
ctx context.Context
202+
cancel context.CancelFunc
203+
svrs []*Server
204+
}
205+
206+
func (s *testFollowerTsoSuite) SetUpSuite(c *C) {
207+
s.svrs = make([]*Server, 0, 2)
208+
209+
cfgs := NewTestMultiConfig(c, 2)
210+
ch := make(chan *Server, 2)
211+
for i := 0; i < 2; i++ {
212+
cfg := cfgs[i]
213+
go func() {
214+
svr, err := CreateServer(cfg, nil)
215+
c.Assert(err, IsNil)
216+
c.Assert(svr, NotNil)
217+
err = svr.Run(context.TODO())
218+
c.Assert(err, IsNil)
219+
ch <- svr
220+
}()
221+
}
222+
223+
for i := 0; i < 2; i++ {
224+
svr := <-ch
225+
s.svrs = append(s.svrs, svr)
226+
}
227+
mustWaitLeader(c, s.svrs)
228+
}
229+
230+
func (s *testFollowerTsoSuite) TearDownSuite(c *C) {
231+
for _, svr := range s.svrs {
232+
svr.Close()
233+
cleanServer(svr.cfg)
234+
}
235+
}
236+
237+
func (s *testFollowerTsoSuite) TestRequest(c *C) {
238+
var err error
239+
240+
var followerServer *Server
241+
for _, s := range s.svrs {
242+
if !s.IsLeader() {
243+
followerServer = s
244+
}
245+
}
246+
c.Assert(followerServer, NotNil)
247+
grpcPDClient := mustNewGrpcClient(c, followerServer.GetAddr())
248+
clusterID := followerServer.ClusterID()
249+
250+
req := &pdpb.TsoRequest{Header: newRequestHeader(clusterID), Count: 1}
251+
ctx, cancel := context.WithCancel(context.Background())
252+
defer cancel()
253+
tsoClient, err := grpcPDClient.Tso(ctx)
254+
c.Assert(err, IsNil)
255+
defer tsoClient.CloseSend()
256+
err = tsoClient.Send(req)
257+
c.Assert(err, IsNil)
258+
_, err = tsoClient.Recv()
259+
c.Assert(err, NotNil)
260+
c.Assert(strings.Contains(err.Error(), "not leader"), IsTrue)
261+
}

0 commit comments

Comments
 (0)