Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d h1:rQ
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mod h1:VKt7CNAQxpFpSDz3sXyj9hY/GbVsQCr0sB3w59nE7lU=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ErrLocked struct {

// Error formats the lock to a string.
func (e *ErrLocked) Error() string {
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v", e.Key, e.Primary, e.StartTS)
}

// ErrKeyAlreadyExist is returned when key exists but this key has a constraint that
Expand Down
12 changes: 7 additions & 5 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mocktikv

import (
"bytes"
"context"
"math"
"sync"

Expand All @@ -27,7 +28,8 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/util/codec"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// MVCCLevelDB implements the MVCCStore interface.
Expand Down Expand Up @@ -375,7 +377,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
log.Error("scan new iterator fail:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("scan new iterator fail", zap.Error(err))
return nil
}

Expand All @@ -399,7 +401,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
skip := skipDecoder{currKey}
ok, err = skip.Decode(iter)
if err != nil {
log.Error("seek to next key error:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("seek to next key error", zap.Error(err))
break
}
currKey = skip.currKey
Expand Down Expand Up @@ -454,7 +456,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
helper.entry.values = append(helper.entry.values, value)
}
if err != nil {
log.Error("Unmarshal fail:", errors.Trace(err))
logutil.Logger(context.Background()).Error("Unmarshal fail", zap.Error(err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ Un../ un..

break
}
succ = iter.Prev()
Expand Down Expand Up @@ -592,7 +594,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
// Check assertions.
if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) ||
(!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) {
log.Error("ASSERTION FAIL!!!", mutation)
logutil.Logger(context.Background()).Error("ASSERTION FAIL!!!", zap.Stringer("mutation", mutation))
}

batch.Put(writeKey, writeValue)
Expand Down
5 changes: 3 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
package store

import (
"context"
"net/url"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
)

var stores = make(map[string]kv.Driver)
Expand Down Expand Up @@ -64,7 +65,7 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {

var s kv.Storage
err = util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("new store")
logutil.Logger(context.Background()).Info("new store")
s, err = d.Open(path)
return kv.IsRetryableError(err), err
})
Expand Down
112 changes: 81 additions & 31 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

type twoPhaseCommitAction int
Expand Down Expand Up @@ -155,7 +156,8 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
for _, pair := range txn.assertions {
mutation, ok := mutations[string(pair.key)]
if !ok {
log.Error("ASSERTION FAIL!!! assertion exists but no mutation?", pair)
logutil.Logger(context.Background()).Error("ASSERTION FAIL!!! assertion exists but no mutation?",
zap.Stringer("assertion", pair))
continue
}
// Only apply the first assertion!
Expand All @@ -181,17 +183,26 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
const logSize = 4 * 1024 * 1024 // 4MB
if len(keys) > logEntryCount || size > logSize {
tableID := tablecodec.DecodeTableID(keys[0])
log.Infof("[BIG_TXN] con:%d table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
logutil.Logger(context.Background()).Info("[BIG_TXN]",
zap.Uint64("con", connID),
zap.Int64("table ID", tableID),
zap.Int("size", size),
zap.Int("keys", len(keys)),
zap.Int("puts", putCnt),
zap.Int("dels", delCnt),
zap.Int("locks", lockCnt),
zap.Uint64("txnStartTS", txn.startTS))
}

// Convert from sec to ms
maxTxnTimeUse := uint64(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse) * 1000

// Sanity check for startTS.
if txn.StartTS() == math.MaxUint64 {
err = errors.Errorf("try to commit with invalid startTS: %d", txn.StartTS())
log.Errorf("con:%d 2PC commit err: %v", connID, err)
err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS())
logutil.Logger(context.Background()).Error("commit failed",
zap.Uint64("conn", connID),
zap.Error(err))
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -288,7 +299,10 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e))
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
}()
Expand All @@ -315,7 +329,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
}
return errors.Trace(e)
}
Expand Down Expand Up @@ -354,10 +372,17 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
logutil.Logger(context.Background()).Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
Expand Down Expand Up @@ -432,9 +457,11 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
key := alreadyExist.GetKey()
conditionPair := c.txn.us.LookupConditionPair(key)
if conditionPair == nil {
panic(fmt.Sprintf("con:%d, conditionPair for key:%s should not be nil", c.connID, key))
panic(fmt.Sprintf("conn%d, conditionPair for key:%s should not be nil", c.connID, key))
}
log.Debugf("con:%d key: %s already exists", c.connID, key)
logutil.Logger(context.Background()).Debug("key already exists",
zap.Uint64("conn", c.connID),
zap.Binary("key", key))
return errors.Trace(conditionPair.Err())
}

Expand All @@ -443,7 +470,9 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
if err1 != nil {
return errors.Trace(err1)
}
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
logutil.Logger(context.Background()).Debug("prewrite encounters lock",
zap.Uint64("conn", c.connID),
zap.Stringer("lock", lock))
locks = append(locks, lock)
}
start := time.Now()
Expand Down Expand Up @@ -553,15 +582,19 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("con:%d 2PC commit failed: %v", c.connID, keyErr.String())
err = errors.Errorf("conn%d 2PC commit failed: %v", c.connID, keyErr.String())
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
logutil.Logger(context.Background()).Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Annotate(err, txnRetryableMark)
}

Expand Down Expand Up @@ -602,8 +635,10 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
return errors.Trace(err)
}
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.connID, keyErr)
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
err = errors.Errorf("conn%d 2PC cleanup failed: %s", c.connID, keyErr)
logutil.Logger(context.Background()).Debug("2PC failed cleanup key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -647,9 +682,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
err := c.cleanupKeys(NewBackoffer(cleanupKeysCtx, cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
} else {
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
logutil.Logger(ctx).Info("2PC clean up done",
zap.Uint64("txnStartTS", c.startTS))
}
c.cleanWg.Done()
}()
Expand All @@ -669,23 +707,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}
}
if err != nil {
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC failed on prewrite",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}

start = time.Now()
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)

// check commitTS
if commitTS <= c.startTS {
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
err = errors.Errorf("conn%d Invalid transaction tso with txnStartTS=%v while txnCommitTS=%v",
c.connID, c.startTS, commitTS)
log.Error(err)
logutil.Logger(context.Background()).Error("invalid transaction", zap.Error(err))
return errors.Trace(err)
}
c.commitTS = commitTS
Expand All @@ -699,7 +741,8 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
// }

if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) {
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
c.connID, c.startTS, c.commitTS)
return errors.Annotate(err, txnRetryableMark)
}

Expand All @@ -710,15 +753,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
log.Error(err)
logutil.Logger(ctx).Error("2PC commit result undetermined",
zap.Error(err),
zap.NamedError("rpcErr", undeterminedErr),
zap.Uint64("txnStartTS", c.startTS))
err = errors.Trace(terror.ErrResultUndetermined)
}
if !c.mu.committed {
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC failed on commit",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
logutil.Logger(ctx).Debug("2PC succeed with error",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
}
return nil
}
Expand Down Expand Up @@ -767,7 +816,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
go func() {
err := binInfo.WriteBinlog(c.store.clusterID)
if err != nil {
log.Errorf("failed to write binlog: %v", err)
logutil.Logger(context.Background()).Error("failed to write binlog",
zap.Error(err))
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
txn.startTS = uint64(math.MaxUint64)
err := txn.Commit(context.Background())
c.Assert(err, NotNil)
errMsgMustContain(c, err, "invalid startTS")
errMsgMustContain(c, err, "invalid txnStartTS")
}

func errMsgMustContain(c *C, err error, msg string) {
Expand Down
Loading