Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ErrLocked struct {

// Error formats the lock to a string.
func (e *ErrLocked) Error() string {
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v", e.Key, e.Primary, e.StartTS)
}

// ErrRetryable suggests that client may restart the txn. e.g. write conflict.
Expand Down
10 changes: 6 additions & 4 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/util/codec"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/net/context"
)

// MVCCLevelDB implements the MVCCStore interface.
Expand Down Expand Up @@ -372,7 +374,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
log.Error("scan new iterator fail:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("scan new iterator fail", zap.Error(err))
return nil
}

Expand All @@ -396,7 +398,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
skip := skipDecoder{currKey}
ok, err = skip.Decode(iter)
if err != nil {
log.Error("seek to next key error:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("seek to next key error", zap.Error(err))
break
}
currKey = skip.currKey
Expand Down Expand Up @@ -451,7 +453,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
helper.entry.values = append(helper.entry.values, value)
}
if err != nil {
log.Error("Unmarshal fail:", errors.Trace(err))
logutil.Logger(context.Background()).Error("unmarshal fail", zap.Error(err))
break
}
succ = iter.Prev()
Expand Down
103 changes: 75 additions & 28 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -138,14 +139,23 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
const logSize = 4 * 1024 * 1024 // 4MB
if len(keys) > logEntryCount || size > logSize {
tableID := tablecodec.DecodeTableID(keys[0])
log.Infof("[BIG_TXN] con:%d table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
logutil.Logger(context.Background()).Info("[BIG_TXN]",
zap.Uint64("conn", connID),
zap.Int64("table ID", tableID),
zap.Int("size", size),
zap.Int("keys", len(keys)),
zap.Int("puts", putCnt),
zap.Int("dels", delCnt),
zap.Int("locks", lockCnt),
zap.Uint64("txnStartTS", txn.startTS))
}

// Sanity check for startTS.
if txn.StartTS() == math.MaxUint64 {
err = errors.Errorf("try to commit with invalid startTS: %d", txn.StartTS())
log.Errorf("con:%d 2PC commit err: %v", connID, err)
err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS())
logutil.Logger(context.Background()).Error("commit failed",
zap.Uint64("conn", connID),
zap.Error(err))
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -241,7 +251,10 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e))
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
}()
Expand All @@ -268,7 +281,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
}
return errors.Trace(e)
}
Expand Down Expand Up @@ -307,10 +324,17 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
Expand Down Expand Up @@ -383,7 +407,9 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
if err1 != nil {
return errors.Trace(err1)
}
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
logutil.Logger(context.Background()).Debug("prewrite encounters lock",
zap.Uint64("conn", c.connID),
zap.Stringer("lock", lock))
locks = append(locks, lock)
}
start := time.Now()
Expand Down Expand Up @@ -493,15 +519,19 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("con:%d 2PC commit failed: %v", c.connID, keyErr.String())
err = errors.Errorf("conn%d 2PC commit failed: %v", c.connID, keyErr.String())
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
logutil.Logger(context.Background()).Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Annotate(err, txnRetryableMark)
}

Expand Down Expand Up @@ -542,8 +572,10 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
return errors.Trace(err)
}
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.connID, keyErr)
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
err = errors.Errorf("conn%d 2PC cleanup failed: %s", c.connID, keyErr)
logutil.Logger(context.Background()).Debug("2PC failed cleanup key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -592,9 +624,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
err := c.cleanupKeys(NewBackoffer(cleanupKeysCtx, cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
} else {
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
logutil.Logger(ctx).Info("2PC clean up done",
zap.Uint64("txnStartTS", c.startTS))
}
c.cleanWg.Done()
}()
Expand All @@ -614,23 +649,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}
}
if err != nil {
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC failed on prewrite",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}

start = time.Now()
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)

// check commitTS
if commitTS <= c.startTS {
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
err = errors.Errorf("conn%d Invalid transaction tso with txnStartTS=%v while txnCommitTS=%v",
c.connID, c.startTS, commitTS)
log.Error(err)
logutil.Logger(context.Background()).Error("invalid transaction", zap.Error(err))
return errors.Trace(err)
}
c.commitTS = commitTS
Expand All @@ -639,7 +678,8 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}

if c.store.oracle.IsExpired(c.startTS, maxTxnTimeUse) {
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
c.connID, c.startTS, c.commitTS)
return errors.Annotate(err, txnRetryableMark)
}

Expand All @@ -650,15 +690,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
log.Error(err)
logutil.Logger(ctx).Error("2PC commit result undetermined",
zap.Error(err),
zap.NamedError("rpcErr", undeterminedErr),
zap.Uint64("txnStartTS", c.startTS))
err = errors.Trace(terror.ErrResultUndetermined)
}
if !c.mu.committed {
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC failed on commit",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC succeed with error",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
}
return nil
}
Expand Down Expand Up @@ -707,7 +753,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
go func() {
err := binInfo.WriteBinlog(c.store.clusterID)
if err != nil {
log.Errorf("failed to write binlog: %v", err)
logutil.Logger(context.Background()).Error("failed to write binlog",
zap.Error(err))
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
txn.startTS = uint64(math.MaxUint64)
err := txn.Commit(context.Background())
c.Assert(err, NotNil)
errMsgMustContain(c, err, "invalid startTS")
errMsgMustContain(c, err, "invalid txnStartTS")
}

func errMsgMustContain(c *C, err error, msg string) {
Expand Down
22 changes: 16 additions & 6 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -64,7 +67,9 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
case DecorrJitter:
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))
}
log.Debugf("backoff base %d, sleep %d", base, sleep)
logutil.Logger(context.Background()).Debug("backoff",
zap.Int("base", base),
zap.Int("sleep", sleep))
select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-ctx.Done():
Expand Down Expand Up @@ -206,7 +211,7 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer {
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ backoffType, err error) error {
if strings.Contains(err.Error(), mismatchClusterID) {
log.Fatalf("critical error %v", err)
logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err))
}
select {
case <-b.ctx.Done():
Expand All @@ -232,18 +237,23 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
if ts := b.ctx.Value(txnStartKey); ts != nil {
startTs = ts
}
log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms), type: %s, txn_start_ts: %v", err, b.totalSleep, b.maxSleep, typ.String(), startTs)
logutil.Logger(context.Background()).Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", typ),
zap.Reflect("txnStartTS", startTs))

b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
for i, err := range b.errors {
// Print only last 3 errors for non-DEBUG log levels.
if log.GetLevel() == log.DebugLevel || i >= len(b.errors)-3 {
if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
errMsg += "\n" + err.Error()
}
}
log.Warn(errMsg)
logutil.Logger(context.Background()).Warn(errMsg)
// Use the first backoff type to generate a MySQL error.
return b.types[0].TError()
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -281,7 +281,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
if errors.Cause(err) != io.EOF {
return nil, errors.Trace(err)
}
log.Debug("copstream returns nothing for the request.")
logutil.Logger(context.Background()).Debug("copstream returns nothing for the request.")
}
copStream.Response = first
return resp, nil
Expand Down
Loading