Skip to content

Commit 8dd4a27

Browse files
tiancaiamaozz-jason
authored andcommitted
store/tikv: refine streaming client re-create log and use a sm… (#11370)
1 parent 2348c75 commit 8dd4a27

File tree

1 file changed

+18
-12
lines changed

1 file changed

+18
-12
lines changed

store/tikv/client_batch.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ package tikv
1616

1717
import (
1818
"context"
19+
"math"
1920
"sync"
2021
"sync/atomic"
2122
"time"
2223

2324
"github.com/pingcap/errors"
2425
"github.com/pingcap/failpoint"
2526
"github.com/pingcap/kvproto/pkg/tikvpb"
27+
"github.com/pingcap/parser/terror"
2628
"github.com/pingcap/tidb/config"
2729
"github.com/pingcap/tidb/metrics"
2830
"github.com/pingcap/tidb/store/tikv/tikvrpc"
@@ -209,7 +211,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
209211
})
210212
}
211213

212-
func (c *batchCommandsClient) reCreateStreamingClient(err error) bool {
214+
func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
213215
// Hold the lock to forbid batchSendLoop using the old client.
214216
c.clientLock.Lock()
215217
defer c.clientLock.Unlock()
@@ -224,14 +226,14 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) bool {
224226
zap.String("target", c.target),
225227
)
226228
c.client = streamClient
227-
return true
229+
return nil
228230
}
229231
logutil.Logger(context.Background()).Error(
230232
"batchRecvLoop re-create streaming fail",
231233
zap.String("target", c.target),
232234
zap.Error(err),
233235
)
234-
return false
236+
return err
235237
}
236238

237239
func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
@@ -249,23 +251,27 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
249251
for {
250252
resp, err := c.recv()
251253
if err != nil {
254+
logutil.Logger(context.Background()).Error(
255+
"batchRecvLoop error when receive",
256+
zap.String("target", c.target),
257+
zap.Error(err),
258+
)
259+
260+
b := NewBackoffer(context.Background(), math.MaxInt32)
252261
now := time.Now()
253262
for { // try to re-create the streaming in the loop.
254263
if c.isStopped() {
255264
return
256265
}
257-
logutil.Logger(context.Background()).Error(
258-
"batchRecvLoop error when receive",
259-
zap.String("target", c.target),
260-
zap.Error(err),
261-
)
262266

263-
if c.reCreateStreamingClient(err) {
267+
err1 := c.reCreateStreamingClient(err)
268+
if err1 == nil {
264269
break
265270
}
266-
267-
// TODO: Use a more smart backoff strategy.
268-
time.Sleep(time.Second)
271+
err2 := b.Backoff(boTiKVRPC, err1)
272+
// As timeout is set to math.MaxUint32, err2 should always be nil.
273+
// This line is added to make the 'make errcheck' pass.
274+
terror.Log(err2)
269275
}
270276
metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds())
271277
continue

0 commit comments

Comments
 (0)