Skip to content

Commit 9b4940b

Browse files
authored
store: use uniform log format for store package (#9664) (#10018)
1 parent b7b8acf commit 9b4940b

File tree

20 files changed

+370
-154
lines changed

20 files changed

+370
-154
lines changed

store/mockstore/mocktikv/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type ErrLocked struct {
2626

2727
// Error formats the lock to a string.
2828
func (e *ErrLocked) Error() string {
29-
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
29+
return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v", e.Key, e.Primary, e.StartTS)
3030
}
3131

3232
// ErrRetryable suggests that client may restart the txn. e.g. write conflict.

store/mockstore/mocktikv/mvcc_leveldb.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import (
2727
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2828
"github.com/pingcap/parser/terror"
2929
"github.com/pingcap/tidb/util/codec"
30-
log "github.com/sirupsen/logrus"
30+
"github.com/pingcap/tidb/util/logutil"
31+
"go.uber.org/zap"
32+
"golang.org/x/net/context"
3133
)
3234

3335
// MVCCLevelDB implements the MVCCStore interface.
@@ -372,7 +374,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
372374
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
373375
defer iter.Release()
374376
if err != nil {
375-
log.Error("scan new iterator fail:", errors.ErrorStack(err))
377+
logutil.Logger(context.Background()).Error("scan new iterator fail", zap.Error(err))
376378
return nil
377379
}
378380

@@ -396,7 +398,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
396398
skip := skipDecoder{currKey}
397399
ok, err = skip.Decode(iter)
398400
if err != nil {
399-
log.Error("seek to next key error:", errors.ErrorStack(err))
401+
logutil.Logger(context.Background()).Error("seek to next key error", zap.Error(err))
400402
break
401403
}
402404
currKey = skip.currKey
@@ -451,7 +453,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
451453
helper.entry.values = append(helper.entry.values, value)
452454
}
453455
if err != nil {
454-
log.Error("Unmarshal fail:", errors.Trace(err))
456+
logutil.Logger(context.Background()).Error("unmarshal fail", zap.Error(err))
455457
break
456458
}
457459
succ = iter.Prev()

store/tikv/2pc.go

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ import (
2929
"github.com/pingcap/tidb/store/tikv/tikvrpc"
3030
"github.com/pingcap/tidb/tablecodec"
3131
"github.com/pingcap/tidb/util/execdetails"
32-
binlog "github.com/pingcap/tipb/go-binlog"
33-
log "github.com/sirupsen/logrus"
32+
"github.com/pingcap/tidb/util/logutil"
33+
"github.com/pingcap/tipb/go-binlog"
34+
"go.uber.org/zap"
3435
"golang.org/x/net/context"
3536
)
3637

@@ -138,14 +139,23 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
138139
const logSize = 4 * 1024 * 1024 // 4MB
139140
if len(keys) > logEntryCount || size > logSize {
140141
tableID := tablecodec.DecodeTableID(keys[0])
141-
log.Infof("[BIG_TXN] con:%d table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
142-
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
142+
logutil.Logger(context.Background()).Info("[BIG_TXN]",
143+
zap.Uint64("conn", connID),
144+
zap.Int64("table ID", tableID),
145+
zap.Int("size", size),
146+
zap.Int("keys", len(keys)),
147+
zap.Int("puts", putCnt),
148+
zap.Int("dels", delCnt),
149+
zap.Int("locks", lockCnt),
150+
zap.Uint64("txnStartTS", txn.startTS))
143151
}
144152

145153
// Sanity check for startTS.
146154
if txn.StartTS() == math.MaxUint64 {
147-
err = errors.Errorf("try to commit with invalid startTS: %d", txn.StartTS())
148-
log.Errorf("con:%d 2PC commit err: %v", connID, err)
155+
err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS())
156+
logutil.Logger(context.Background()).Error("commit failed",
157+
zap.Uint64("conn", connID),
158+
zap.Error(err))
149159
return nil, errors.Trace(err)
150160
}
151161

@@ -241,7 +251,10 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
241251
go func() {
242252
e := c.doActionOnBatches(secondaryBo, action, batches)
243253
if e != nil {
244-
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
254+
logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches",
255+
zap.Uint64("conn", c.connID),
256+
zap.Stringer("action type", action),
257+
zap.Error(e))
245258
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
246259
}
247260
}()
@@ -268,7 +281,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
268281
if len(batches) == 1 {
269282
e := singleBatchActionFunc(bo, batches[0])
270283
if e != nil {
271-
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
284+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
285+
zap.Uint64("conn", c.connID),
286+
zap.Stringer("action type", action),
287+
zap.Error(e),
288+
zap.Uint64("txnStartTS", c.startTS))
272289
}
273290
return errors.Trace(e)
274291
}
@@ -307,10 +324,17 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
307324
var err error
308325
for i := 0; i < len(batches); i++ {
309326
if e := <-ch; e != nil {
310-
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
327+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
328+
zap.Uint64("conn", c.connID),
329+
zap.Stringer("action type", action),
330+
zap.Error(e),
331+
zap.Uint64("txnStartTS", c.startTS))
311332
// Cancel other requests and return the first error.
312333
if cancel != nil {
313-
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
334+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches to cancel other actions",
335+
zap.Uint64("conn", c.connID),
336+
zap.Stringer("action type", action),
337+
zap.Uint64("txnStartTS", c.startTS))
314338
cancel()
315339
}
316340
if err == nil {
@@ -383,7 +407,9 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
383407
if err1 != nil {
384408
return errors.Trace(err1)
385409
}
386-
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
410+
logutil.Logger(context.Background()).Debug("prewrite encounters lock",
411+
zap.Uint64("conn", c.connID),
412+
zap.Stringer("lock", lock))
387413
locks = append(locks, lock)
388414
}
389415
start := time.Now()
@@ -493,15 +519,19 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
493519
if keyErr := commitResp.GetError(); keyErr != nil {
494520
c.mu.RLock()
495521
defer c.mu.RUnlock()
496-
err = errors.Errorf("con:%d 2PC commit failed: %v", c.connID, keyErr.String())
522+
err = errors.Errorf("conn%d 2PC commit failed: %v", c.connID, keyErr.String())
497523
if c.mu.committed {
498524
// No secondary key could be rolled back after it's primary key is committed.
499525
// There must be a serious bug somewhere.
500-
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
526+
logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed",
527+
zap.Error(err),
528+
zap.Uint64("txnStartTS", c.startTS))
501529
return errors.Trace(err)
502530
}
503531
// The transaction maybe rolled back by concurrent transactions.
504-
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
532+
logutil.Logger(context.Background()).Debug("2PC failed commit primary key",
533+
zap.Error(err),
534+
zap.Uint64("txnStartTS", c.startTS))
505535
return errors.Annotate(err, txnRetryableMark)
506536
}
507537

@@ -542,8 +572,10 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
542572
return errors.Trace(err)
543573
}
544574
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
545-
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.connID, keyErr)
546-
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
575+
err = errors.Errorf("conn%d 2PC cleanup failed: %s", c.connID, keyErr)
576+
logutil.Logger(context.Background()).Debug("2PC failed cleanup key",
577+
zap.Error(err),
578+
zap.Uint64("txnStartTS", c.startTS))
547579
return errors.Trace(err)
548580
}
549581
return nil
@@ -592,9 +624,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
592624
err := c.cleanupKeys(NewBackoffer(cleanupKeysCtx, cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
593625
if err != nil {
594626
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
595-
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
627+
logutil.Logger(ctx).Info("2PC cleanup failed",
628+
zap.Error(err),
629+
zap.Uint64("txnStartTS", c.startTS))
596630
} else {
597-
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
631+
logutil.Logger(ctx).Info("2PC clean up done",
632+
zap.Uint64("txnStartTS", c.startTS))
598633
}
599634
c.cleanWg.Done()
600635
}()
@@ -614,23 +649,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
614649
}
615650
}
616651
if err != nil {
617-
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
652+
logutil.Logger(ctx).Debug("2PC failed on prewrite",
653+
zap.Error(err),
654+
zap.Uint64("txnStartTS", c.startTS))
618655
return errors.Trace(err)
619656
}
620657

621658
start = time.Now()
622659
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
623660
if err != nil {
624-
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
661+
logutil.Logger(ctx).Warn("2PC get commitTS failed",
662+
zap.Error(err),
663+
zap.Uint64("txnStartTS", c.startTS))
625664
return errors.Trace(err)
626665
}
627666
c.detail.GetCommitTsTime = time.Since(start)
628667

629668
// check commitTS
630669
if commitTS <= c.startTS {
631-
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
670+
err = errors.Errorf("conn%d Invalid transaction tso with txnStartTS=%v while txnCommitTS=%v",
632671
c.connID, c.startTS, commitTS)
633-
log.Error(err)
672+
logutil.Logger(context.Background()).Error("invalid transaction", zap.Error(err))
634673
return errors.Trace(err)
635674
}
636675
c.commitTS = commitTS
@@ -639,7 +678,8 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
639678
}
640679

641680
if c.store.oracle.IsExpired(c.startTS, maxTxnTimeUse) {
642-
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
681+
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
682+
c.connID, c.startTS, c.commitTS)
643683
return errors.Annotate(err, txnRetryableMark)
644684
}
645685

@@ -650,15 +690,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
650690
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
651691
if err != nil {
652692
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
653-
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
654-
log.Error(err)
693+
logutil.Logger(ctx).Error("2PC commit result undetermined",
694+
zap.Error(err),
695+
zap.NamedError("rpcErr", undeterminedErr),
696+
zap.Uint64("txnStartTS", c.startTS))
655697
err = errors.Trace(terror.ErrResultUndetermined)
656698
}
657699
if !c.mu.committed {
658-
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
700+
logutil.Logger(ctx).Debug("2PC failed on commit",
701+
zap.Error(err),
702+
zap.Uint64("txnStartTS", c.startTS))
659703
return errors.Trace(err)
660704
}
661-
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
705+
logutil.Logger(ctx).Debug("2PC succeed with error",
706+
zap.Error(err),
707+
zap.Uint64("txnStartTS", c.startTS))
662708
}
663709
return nil
664710
}
@@ -707,7 +753,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
707753
go func() {
708754
err := binInfo.WriteBinlog(c.store.clusterID)
709755
if err != nil {
710-
log.Errorf("failed to write binlog: %v", err)
756+
logutil.Logger(context.Background()).Error("failed to write binlog",
757+
zap.Error(err))
711758
}
712759
}()
713760
}

store/tikv/2pc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
319319
txn.startTS = uint64(math.MaxUint64)
320320
err := txn.Commit(context.Background())
321321
c.Assert(err, NotNil)
322-
errMsgMustContain(c, err, "invalid startTS")
322+
errMsgMustContain(c, err, "invalid txnStartTS")
323323
}
324324

325325
func errMsgMustContain(c *C, err error, msg string) {

store/tikv/backoff.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121
"time"
2222

2323
"github.com/pingcap/errors"
24+
"github.com/pingcap/log"
2425
"github.com/pingcap/parser/mysql"
2526
"github.com/pingcap/parser/terror"
2627
"github.com/pingcap/tidb/kv"
2728
"github.com/pingcap/tidb/metrics"
28-
log "github.com/sirupsen/logrus"
29+
"github.com/pingcap/tidb/util/logutil"
30+
"go.uber.org/zap"
31+
"go.uber.org/zap/zapcore"
2932
"golang.org/x/net/context"
3033
)
3134

@@ -64,7 +67,9 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
6467
case DecorrJitter:
6568
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))
6669
}
67-
log.Debugf("backoff base %d, sleep %d", base, sleep)
70+
logutil.Logger(context.Background()).Debug("backoff",
71+
zap.Int("base", base),
72+
zap.Int("sleep", sleep))
6873
select {
6974
case <-time.After(time.Duration(sleep) * time.Millisecond):
7075
case <-ctx.Done():
@@ -206,7 +211,7 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer {
206211
// It returns a retryable error if total sleep time exceeds maxSleep.
207212
func (b *Backoffer) Backoff(typ backoffType, err error) error {
208213
if strings.Contains(err.Error(), mismatchClusterID) {
209-
log.Fatalf("critical error %v", err)
214+
logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err))
210215
}
211216
select {
212217
case <-b.ctx.Done():
@@ -232,18 +237,23 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
232237
if ts := b.ctx.Value(txnStartKey); ts != nil {
233238
startTs = ts
234239
}
235-
log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms), type: %s, txn_start_ts: %v", err, b.totalSleep, b.maxSleep, typ.String(), startTs)
240+
logutil.Logger(context.Background()).Debug("retry later",
241+
zap.Error(err),
242+
zap.Int("totalSleep", b.totalSleep),
243+
zap.Int("maxSleep", b.maxSleep),
244+
zap.Stringer("type", typ),
245+
zap.Reflect("txnStartTS", startTs))
236246

237247
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
238248
if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
239249
errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
240250
for i, err := range b.errors {
241251
// Print only last 3 errors for non-DEBUG log levels.
242-
if log.GetLevel() == log.DebugLevel || i >= len(b.errors)-3 {
252+
if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
243253
errMsg += "\n" + err.Error()
244254
}
245255
}
246-
log.Warn(errMsg)
256+
logutil.Logger(context.Background()).Warn(errMsg)
247257
// Use the first backoff type to generate a MySQL error.
248258
return b.types[0].TError()
249259
}

store/tikv/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/pingcap/tidb/config"
3232
"github.com/pingcap/tidb/metrics"
3333
"github.com/pingcap/tidb/store/tikv/tikvrpc"
34-
log "github.com/sirupsen/logrus"
34+
"github.com/pingcap/tidb/util/logutil"
3535
"golang.org/x/net/context"
3636
"google.golang.org/grpc"
3737
"google.golang.org/grpc/credentials"
@@ -281,7 +281,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
281281
if errors.Cause(err) != io.EOF {
282282
return nil, errors.Trace(err)
283283
}
284-
log.Debug("copstream returns nothing for the request.")
284+
logutil.Logger(context.Background()).Debug("copstream returns nothing for the request.")
285285
}
286286
copStream.Response = first
287287
return resp, nil

0 commit comments

Comments
 (0)