Skip to content

Commit facadf7

Browse files
authored
store: use uniform log format for store package (#9664)
1 parent e53b56b commit facadf7

File tree

21 files changed

+421
-185
lines changed

21 files changed

+421
-185
lines changed

store/mockstore/mocktikv/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type ErrLocked struct {
2626

2727
// Error formats the lock to a string.
2828
func (e *ErrLocked) Error() string {
29-
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
29+
return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v", e.Key, e.Primary, e.StartTS)
3030
}
3131

3232
// ErrKeyAlreadyExist is returned when key exists but this key has a constraint that

store/mockstore/mocktikv/mvcc_leveldb.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package mocktikv
1515

1616
import (
1717
"bytes"
18+
"context"
1819
"math"
1920
"sync"
2021

@@ -27,7 +28,8 @@ import (
2728
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2829
"github.com/pingcap/parser/terror"
2930
"github.com/pingcap/tidb/util/codec"
30-
log "github.com/sirupsen/logrus"
31+
"github.com/pingcap/tidb/util/logutil"
32+
"go.uber.org/zap"
3133
)
3234

3335
// MVCCLevelDB implements the MVCCStore interface.
@@ -375,7 +377,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
375377
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
376378
defer iter.Release()
377379
if err != nil {
378-
log.Error("scan new iterator fail:", errors.ErrorStack(err))
380+
logutil.Logger(context.Background()).Error("scan new iterator fail", zap.Error(err))
379381
return nil
380382
}
381383

@@ -399,7 +401,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
399401
skip := skipDecoder{currKey}
400402
ok, err = skip.Decode(iter)
401403
if err != nil {
402-
log.Error("seek to next key error:", errors.ErrorStack(err))
404+
logutil.Logger(context.Background()).Error("seek to next key error", zap.Error(err))
403405
break
404406
}
405407
currKey = skip.currKey
@@ -454,7 +456,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
454456
helper.entry.values = append(helper.entry.values, value)
455457
}
456458
if err != nil {
457-
log.Error("Unmarshal fail:", errors.Trace(err))
459+
logutil.Logger(context.Background()).Error("unmarshal fail", zap.Error(err))
458460
break
459461
}
460462
succ = iter.Prev()
@@ -592,7 +594,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
592594
// Check assertions.
593595
if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) ||
594596
(!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) {
595-
log.Error("ASSERTION FAIL!!!", mutation)
597+
logutil.Logger(context.Background()).Error("ASSERTION FAIL!!!", zap.Stringer("mutation", mutation))
596598
}
597599

598600
batch.Put(writeKey, writeValue)

store/store.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
package store
1515

1616
import (
17+
"context"
1718
"net/url"
1819
"strings"
1920

2021
"github.com/pingcap/errors"
2122
"github.com/pingcap/tidb/kv"
2223
"github.com/pingcap/tidb/util"
23-
log "github.com/sirupsen/logrus"
24+
"github.com/pingcap/tidb/util/logutil"
2425
)
2526

2627
var stores = make(map[string]kv.Driver)
@@ -64,7 +65,7 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
6465

6566
var s kv.Storage
6667
err = util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
67-
log.Infof("new store")
68+
logutil.Logger(context.Background()).Info("new store")
6869
s, err = d.Open(path)
6970
return kv.IsRetryableError(err), err
7071
})

store/tikv/2pc.go

Lines changed: 81 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ import (
3232
"github.com/pingcap/tidb/store/tikv/tikvrpc"
3333
"github.com/pingcap/tidb/tablecodec"
3434
"github.com/pingcap/tidb/util/execdetails"
35-
binlog "github.com/pingcap/tipb/go-binlog"
36-
log "github.com/sirupsen/logrus"
35+
"github.com/pingcap/tidb/util/logutil"
36+
"github.com/pingcap/tipb/go-binlog"
37+
"go.uber.org/zap"
3738
)
3839

3940
type twoPhaseCommitAction int
@@ -155,7 +156,8 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
155156
for _, pair := range txn.assertions {
156157
mutation, ok := mutations[string(pair.key)]
157158
if !ok {
158-
log.Error("ASSERTION FAIL!!! assertion exists but no mutation?", pair)
159+
logutil.Logger(context.Background()).Error("ASSERTION FAIL!!! assertion exists but no mutation?",
160+
zap.Stringer("assertion", pair))
159161
continue
160162
}
161163
// Only apply the first assertion!
@@ -181,17 +183,26 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
181183
const logSize = 4 * 1024 * 1024 // 4MB
182184
if len(keys) > logEntryCount || size > logSize {
183185
tableID := tablecodec.DecodeTableID(keys[0])
184-
log.Infof("[BIG_TXN] con:%d table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
185-
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
186+
logutil.Logger(context.Background()).Info("[BIG_TXN]",
187+
zap.Uint64("con", connID),
188+
zap.Int64("table ID", tableID),
189+
zap.Int("size", size),
190+
zap.Int("keys", len(keys)),
191+
zap.Int("puts", putCnt),
192+
zap.Int("dels", delCnt),
193+
zap.Int("locks", lockCnt),
194+
zap.Uint64("txnStartTS", txn.startTS))
186195
}
187196

188197
// Convert from sec to ms
189198
maxTxnTimeUse := uint64(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse) * 1000
190199

191200
// Sanity check for startTS.
192201
if txn.StartTS() == math.MaxUint64 {
193-
err = errors.Errorf("try to commit with invalid startTS: %d", txn.StartTS())
194-
log.Errorf("con:%d 2PC commit err: %v", connID, err)
202+
err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS())
203+
logutil.Logger(context.Background()).Error("commit failed",
204+
zap.Uint64("conn", connID),
205+
zap.Error(err))
195206
return nil, errors.Trace(err)
196207
}
197208

@@ -288,7 +299,10 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
288299
go func() {
289300
e := c.doActionOnBatches(secondaryBo, action, batches)
290301
if e != nil {
291-
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
302+
logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches",
303+
zap.Uint64("conn", c.connID),
304+
zap.Stringer("action type", action),
305+
zap.Error(e))
292306
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
293307
}
294308
}()
@@ -315,7 +329,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
315329
if len(batches) == 1 {
316330
e := singleBatchActionFunc(bo, batches[0])
317331
if e != nil {
318-
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
332+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
333+
zap.Uint64("conn", c.connID),
334+
zap.Stringer("action type", action),
335+
zap.Error(e),
336+
zap.Uint64("txnStartTS", c.startTS))
319337
}
320338
return errors.Trace(e)
321339
}
@@ -354,10 +372,17 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
354372
var err error
355373
for i := 0; i < len(batches); i++ {
356374
if e := <-ch; e != nil {
357-
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
375+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
376+
zap.Uint64("conn", c.connID),
377+
zap.Stringer("action type", action),
378+
zap.Error(e),
379+
zap.Uint64("txnStartTS", c.startTS))
358380
// Cancel other requests and return the first error.
359381
if cancel != nil {
360-
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
382+
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches to cancel other actions",
383+
zap.Uint64("conn", c.connID),
384+
zap.Stringer("action type", action),
385+
zap.Uint64("txnStartTS", c.startTS))
361386
cancel()
362387
}
363388
if err == nil {
@@ -432,9 +457,11 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
432457
key := alreadyExist.GetKey()
433458
conditionPair := c.txn.us.LookupConditionPair(key)
434459
if conditionPair == nil {
435-
panic(fmt.Sprintf("con:%d, conditionPair for key:%s should not be nil", c.connID, key))
460+
panic(fmt.Sprintf("conn%d, conditionPair for key:%s should not be nil", c.connID, key))
436461
}
437-
log.Debugf("con:%d key: %s already exists", c.connID, key)
462+
logutil.Logger(context.Background()).Debug("key already exists",
463+
zap.Uint64("conn", c.connID),
464+
zap.Binary("key", key))
438465
return errors.Trace(conditionPair.Err())
439466
}
440467

@@ -443,7 +470,9 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
443470
if err1 != nil {
444471
return errors.Trace(err1)
445472
}
446-
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
473+
logutil.Logger(context.Background()).Debug("prewrite encounters lock",
474+
zap.Uint64("conn", c.connID),
475+
zap.Stringer("lock", lock))
447476
locks = append(locks, lock)
448477
}
449478
start := time.Now()
@@ -553,15 +582,19 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
553582
if keyErr := commitResp.GetError(); keyErr != nil {
554583
c.mu.RLock()
555584
defer c.mu.RUnlock()
556-
err = errors.Errorf("con:%d 2PC commit failed: %v", c.connID, keyErr.String())
585+
err = errors.Errorf("conn%d 2PC commit failed: %v", c.connID, keyErr.String())
557586
if c.mu.committed {
558587
// No secondary key could be rolled back after it's primary key is committed.
559588
// There must be a serious bug somewhere.
560-
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
589+
logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed",
590+
zap.Error(err),
591+
zap.Uint64("txnStartTS", c.startTS))
561592
return errors.Trace(err)
562593
}
563594
// The transaction maybe rolled back by concurrent transactions.
564-
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
595+
logutil.Logger(context.Background()).Debug("2PC failed commit primary key",
596+
zap.Error(err),
597+
zap.Uint64("txnStartTS", c.startTS))
565598
return errors.Annotate(err, txnRetryableMark)
566599
}
567600

@@ -602,8 +635,10 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
602635
return errors.Trace(err)
603636
}
604637
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
605-
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.connID, keyErr)
606-
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
638+
err = errors.Errorf("conn%d 2PC cleanup failed: %s", c.connID, keyErr)
639+
logutil.Logger(context.Background()).Debug("2PC failed cleanup key",
640+
zap.Error(err),
641+
zap.Uint64("txnStartTS", c.startTS))
607642
return errors.Trace(err)
608643
}
609644
return nil
@@ -647,9 +682,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
647682
err := c.cleanupKeys(NewBackoffer(cleanupKeysCtx, cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
648683
if err != nil {
649684
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
650-
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
685+
logutil.Logger(ctx).Info("2PC cleanup failed",
686+
zap.Error(err),
687+
zap.Uint64("txnStartTS", c.startTS))
651688
} else {
652-
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
689+
logutil.Logger(ctx).Info("2PC clean up done",
690+
zap.Uint64("txnStartTS", c.startTS))
653691
}
654692
c.cleanWg.Done()
655693
}()
@@ -669,23 +707,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
669707
}
670708
}
671709
if err != nil {
672-
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
710+
logutil.Logger(ctx).Debug("2PC failed on prewrite",
711+
zap.Error(err),
712+
zap.Uint64("txnStartTS", c.startTS))
673713
return errors.Trace(err)
674714
}
675715

676716
start = time.Now()
677717
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
678718
if err != nil {
679-
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
719+
logutil.Logger(ctx).Warn("2PC get commitTS failed",
720+
zap.Error(err),
721+
zap.Uint64("txnStartTS", c.startTS))
680722
return errors.Trace(err)
681723
}
682724
c.detail.GetCommitTsTime = time.Since(start)
683725

684726
// check commitTS
685727
if commitTS <= c.startTS {
686-
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
728+
err = errors.Errorf("conn%d Invalid transaction tso with txnStartTS=%v while txnCommitTS=%v",
687729
c.connID, c.startTS, commitTS)
688-
log.Error(err)
730+
logutil.Logger(context.Background()).Error("invalid transaction", zap.Error(err))
689731
return errors.Trace(err)
690732
}
691733
c.commitTS = commitTS
@@ -699,7 +741,8 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
699741
// }
700742

701743
if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) {
702-
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
744+
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
745+
c.connID, c.startTS, c.commitTS)
703746
return errors.Annotate(err, txnRetryableMark)
704747
}
705748

@@ -710,15 +753,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
710753
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
711754
if err != nil {
712755
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
713-
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
714-
log.Error(err)
756+
logutil.Logger(ctx).Error("2PC commit result undetermined",
757+
zap.Error(err),
758+
zap.NamedError("rpcErr", undeterminedErr),
759+
zap.Uint64("txnStartTS", c.startTS))
715760
err = errors.Trace(terror.ErrResultUndetermined)
716761
}
717762
if !c.mu.committed {
718-
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
763+
logutil.Logger(ctx).Debug("2PC failed on commit",
764+
zap.Error(err),
765+
zap.Uint64("txnStartTS", c.startTS))
719766
return errors.Trace(err)
720767
}
721-
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
768+
logutil.Logger(ctx).Debug("2PC succeed with error",
769+
zap.Error(err),
770+
zap.Uint64("txnStartTS", c.startTS))
722771
}
723772
return nil
724773
}
@@ -767,7 +816,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
767816
go func() {
768817
err := binInfo.WriteBinlog(c.store.clusterID)
769818
if err != nil {
770-
log.Errorf("failed to write binlog: %v", err)
819+
logutil.Logger(context.Background()).Error("failed to write binlog",
820+
zap.Error(err))
771821
}
772822
}()
773823
}

store/tikv/2pc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
319319
txn.startTS = uint64(math.MaxUint64)
320320
err := txn.Commit(context.Background())
321321
c.Assert(err, NotNil)
322-
errMsgMustContain(c, err, "invalid startTS")
322+
errMsgMustContain(c, err, "invalid txnStartTS")
323323
}
324324

325325
func errMsgMustContain(c *C, err error, msg string) {

0 commit comments

Comments
 (0)