@@ -25,6 +25,7 @@ import (
2525
2626 "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
2727 "github.com/pingcap/errors"
28+ "github.com/pingcap/failpoint"
2829 "github.com/pingcap/kvproto/pkg/coprocessor"
2930 "github.com/pingcap/kvproto/pkg/debugpb"
3031 "github.com/pingcap/kvproto/pkg/tikvpb"
@@ -33,6 +34,7 @@ import (
3334 "github.com/pingcap/tidb/metrics"
3435 "github.com/pingcap/tidb/store/tikv/tikvrpc"
3536 "github.com/pingcap/tidb/util/logutil"
37+ "github.com/prometheus/client_golang/prometheus"
3638 "go.uber.org/zap"
3739 "google.golang.org/grpc"
3840 "google.golang.org/grpc/credentials"
@@ -78,11 +80,14 @@ type connArray struct {
7880 batchCommandsCh chan * batchCommandsEntry
7981 batchCommandsClients []* batchCommandsClient
8082 tikvTransportLayerLoad uint64
83+ closed chan struct {}
8184
8285 // Notify rpcClient to check the idle flag
8386 idleNotify * uint32
8487 idle bool
8588 idleDetect * time.Timer
89+
90+ pendingRequests prometheus.Gauge
8691}
8792
8893type batchCommandsClient struct {
@@ -105,17 +110,70 @@ func (c *batchCommandsClient) isStopped() bool {
105110 return atomic .LoadInt32 (& c .closed ) != 0
106111}
107112
113+ func (c * batchCommandsClient ) send (request * tikvpb.BatchCommandsRequest , entries []* batchCommandsEntry ) {
114+ // Use the lock to protect the stream client won't be replaced by RecvLoop,
115+ // and new added request won't be removed by `failPendingRequests`.
116+ c .clientLock .Lock ()
117+ defer c .clientLock .Unlock ()
118+ for i , requestID := range request .RequestIds {
119+ c .batched .Store (requestID , entries [i ])
120+ }
121+ if err := c .client .Send (request ); err != nil {
122+ logutil .BgLogger ().Error (
123+ "batch commands send error" ,
124+ zap .String ("target" , c .target ),
125+ zap .Error (err ),
126+ )
127+ c .failPendingRequests (err )
128+ }
129+ }
130+
131+ func (c * batchCommandsClient ) recv () (* tikvpb.BatchCommandsResponse , error ) {
132+ failpoint .Inject ("gotErrorInRecvLoop" , func (_ failpoint.Value ) (* tikvpb.BatchCommandsResponse , error ) {
133+ return nil , errors .New ("injected error in batchRecvLoop" )
134+ })
135+ // When `conn.Close()` is called, `client.Recv()` will return an error.
136+ return c .client .Recv ()
137+ }
138+
139+ // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels.
108140func (c * batchCommandsClient ) failPendingRequests (err error ) {
141+ failpoint .Inject ("panicInFailPendingRequests" , nil )
109142 c .batched .Range (func (key , value interface {}) bool {
110143 id , _ := key .(uint64 )
111144 entry , _ := value .(* batchCommandsEntry )
112145 entry .err = err
113- close (entry .res )
114146 c .batched .Delete (id )
147+ close (entry .res )
115148 return true
116149 })
117150}
118151
152+ func (c * batchCommandsClient ) reCreateStreamingClient (err error ) bool {
153+ // Hold the lock to forbid batchSendLoop using the old client.
154+ c .clientLock .Lock ()
155+ defer c .clientLock .Unlock ()
156+ c .failPendingRequests (err ) // fail all pending requests.
157+
158+ // Re-establish a application layer stream. TCP layer is handled by gRPC.
159+ tikvClient := tikvpb .NewTikvClient (c .conn )
160+ streamClient , err := tikvClient .BatchCommands (context .TODO ())
161+ if err == nil {
162+ logutil .BgLogger ().Info (
163+ "batchRecvLoop re-create streaming success" ,
164+ zap .String ("target" , c .target ),
165+ )
166+ c .client = streamClient
167+ return true
168+ }
169+ logutil .BgLogger ().Error (
170+ "batchRecvLoop re-create streaming fail" ,
171+ zap .String ("target" , c .target ),
172+ zap .Error (err ),
173+ )
174+ return false
175+ }
176+
119177func (c * batchCommandsClient ) batchRecvLoop (cfg config.TiKVClient ) {
120178 defer func () {
121179 if r := recover (); r != nil {
@@ -129,8 +187,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
129187 }()
130188
131189 for {
132- // When `conn.Close()` is called, `client.Recv()` will return an error.
133- resp , err := c .client .Recv ()
190+ resp , err := c .recv ()
134191 if err != nil {
135192 now := time .Now ()
136193 for { // try to re-create the streaming in the loop.
@@ -143,28 +200,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
143200 zap .Error (err ),
144201 )
145202
146- // Hold the lock to forbid batchSendLoop using the old client.
147- c .clientLock .Lock ()
148- c .failPendingRequests (err ) // fail all pending requests.
149-
150- // Re-establish a application layer stream. TCP layer is handled by gRPC.
151- tikvClient := tikvpb .NewTikvClient (c .conn )
152- streamClient , err := tikvClient .BatchCommands (context .TODO ())
153- c .clientLock .Unlock ()
154-
155- if err == nil {
156- logutil .BgLogger ().Info (
157- "batchRecvLoop re-create streaming success" ,
158- zap .String ("target" , c .target ),
159- )
160- c .client = streamClient
203+ if c .reCreateStreamingClient (err ) {
161204 break
162205 }
163- logutil .BgLogger ().Error (
164- "batchRecvLoop re-create streaming fail" ,
165- zap .String ("target" , c .target ),
166- zap .Error (err ),
167- )
206+
168207 // TODO: Use a more smart backoff strategy.
169208 time .Sleep (time .Second )
170209 }
@@ -208,6 +247,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif
208247 batchCommandsCh : make (chan * batchCommandsEntry , cfg .TiKVClient .MaxBatchSize ),
209248 batchCommandsClients : make ([]* batchCommandsClient , 0 , maxSize ),
210249 tikvTransportLayerLoad : 0 ,
250+ closed : make (chan struct {}),
211251
212252 idleNotify : idleNotify ,
213253 idleDetect : time .NewTimer (idleTimeout ),
@@ -220,6 +260,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif
220260
221261func (a * connArray ) Init (addr string , security config.Security ) error {
222262 a .target = addr
263+ a .pendingRequests = metrics .TiKVPendingBatchRequests .WithLabelValues (a .target )
223264
224265 opt := grpc .WithInsecure ()
225266 if len (security .ClusterSSLCA ) != 0 {
@@ -309,7 +350,10 @@ func (a *connArray) Close() {
309350 // After connections are closed, `batchRecvLoop`s will check the flag.
310351 atomic .StoreInt32 (& c .closed , 1 )
311352 }
312- close (a .batchCommandsCh )
353+ // Don't close(batchCommandsCh) because when Close() is called, someone maybe
354+ // calling SendRequest and writing batchCommandsCh, if we close it here the
355+ // writing goroutine will panic.
356+ close (a .closed )
313357
314358 for i , c := range a .v {
315359 if c != nil {
@@ -356,6 +400,8 @@ func (a *connArray) fetchAllPendingRequests(
356400 atomic .CompareAndSwapUint32 (a .idleNotify , 0 , 1 )
357401 // This connArray to be recycled
358402 return
403+ case <- a .closed :
404+ return
359405 }
360406 if headEntry == nil {
361407 return
@@ -450,7 +496,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
450496 requests = requests [:0 ]
451497 requestIDs = requestIDs [:0 ]
452498
453- metrics . TiKVPendingBatchRequests .Set (float64 (len (a .batchCommandsCh )))
499+ a . pendingRequests .Set (float64 (len (a .batchCommandsCh )))
454500 a .fetchAllPendingRequests (int (cfg .MaxBatchSize ), & entries , & requests )
455501
456502 if len (entries ) < int (cfg .MaxBatchSize ) && cfg .MaxBatchWaitTime > 0 {
@@ -484,27 +530,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
484530 requestIDs = append (requestIDs , requestID )
485531 }
486532
487- request := & tikvpb.BatchCommandsRequest {
533+ req := & tikvpb.BatchCommandsRequest {
488534 Requests : requests ,
489535 RequestIds : requestIDs ,
490536 }
491537
492- // Use the lock to protect the stream client won't be replaced by RecvLoop,
493- // and new added request won't be removed by `failPendingRequests`.
494- batchCommandsClient .clientLock .Lock ()
495- for i , requestID := range request .RequestIds {
496- batchCommandsClient .batched .Store (requestID , entries [i ])
497- }
498- err := batchCommandsClient .client .Send (request )
499- batchCommandsClient .clientLock .Unlock ()
500- if err != nil {
501- logutil .BgLogger ().Error (
502- "batch commands send error" ,
503- zap .String ("target" , a .target ),
504- zap .Error (err ),
505- )
506- batchCommandsClient .failPendingRequests (err )
507- }
538+ batchCommandsClient .send (req , entries )
508539 }
509540}
510541
@@ -547,6 +578,11 @@ func newRPCClient(security config.Security) *rpcClient {
547578 }
548579}
549580
581+ // NewTestRPCClient is for some external tests.
582+ func NewTestRPCClient () Client {
583+ return newRPCClient (config.Security {})
584+ }
585+
550586func (c * rpcClient ) getConnArray (addr string ) (* connArray , error ) {
551587 c .RLock ()
552588 if c .isClosed {
0 commit comments