Skip to content

Commit deea24f

Browse files
tiancaiamaozz-jason
authored andcommitted
store/tikv,executor: redesign the latch scheduler (#7711) (#7859)
1 parent 3950070 commit deea24f

File tree

6 files changed

+211
-98
lines changed

6 files changed

+211
-98
lines changed

executor/write_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,8 +1815,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) {
18151815

18161816
// txn1 and txn2 data range do not overlap, but using latches result in txn conflict.
18171817
fn()
1818-
_, err = tk1.Exec("commit")
1819-
c.Assert(err, NotNil)
1818+
tk1.MustExec("commit")
18201819

18211820
tk1.MustExec("truncate table t")
18221821
fn()

store/tikv/latch/latch.go

Lines changed: 128 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package latch
1515

1616
import (
17+
"bytes"
1718
"math/bits"
1819
"sort"
1920
"sync"
@@ -22,32 +23,26 @@ import (
2223
"github.com/spaolacci/murmur3"
2324
)
2425

25-
// latch stores a key's waiting transactions information.
26-
type latch struct {
27-
// Whether there is any transaction in waitingQueue except head.
28-
hasMoreWaiting bool
29-
// The startTS of the transaction which is the head of waiting transactions.
30-
waitingQueueHead uint64
31-
maxCommitTS uint64
32-
sync.Mutex
33-
}
26+
type node struct {
27+
slotID int
28+
key []byte
29+
maxCommitTS uint64
30+
value *Lock
3431

35-
func (l *latch) isEmpty() bool {
36-
return l.waitingQueueHead == 0 && !l.hasMoreWaiting
32+
next *node
3733
}
3834

39-
func (l *latch) free() {
40-
l.waitingQueueHead = 0
41-
}
42-
43-
func (l *latch) refreshCommitTS(commitTS uint64) {
44-
l.Lock()
45-
defer l.Unlock()
46-
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS)
35+
// latch stores a key's waiting transactions information.
36+
type latch struct {
37+
queue *node
38+
count int
39+
waiting []*Lock
40+
sync.Mutex
4741
}
4842

4943
// Lock is the locks' information required for a transaction.
5044
type Lock struct {
45+
keys [][]byte
5146
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
5247
requiredSlots []int
5348
// The number of latches that the transaction has acquired. For status is stale, it include the
@@ -96,9 +91,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) {
9691
// but conceptually a latch is a queue, and a slot is an index to the queue
9792
type Latches struct {
9893
slots []latch
99-
// The waiting queue for each slot(slotID => slice of Lock).
100-
waitingQueues map[int][]*Lock
101-
sync.RWMutex
94+
}
95+
96+
type bytesSlice [][]byte
97+
98+
func (s bytesSlice) Len() int {
99+
return len(s)
100+
}
101+
102+
func (s bytesSlice) Swap(i, j int) {
103+
s[i], s[j] = s[j], s[i]
104+
}
105+
106+
func (s bytesSlice) Less(i, j int) bool {
107+
return bytes.Compare(s[i], s[j]) < 0
102108
}
103109

104110
// NewLatches create a Latches with fixed length,
@@ -107,14 +113,15 @@ func NewLatches(size uint) *Latches {
107113
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
108114
slots := make([]latch, powerOfTwoSize)
109115
return &Latches{
110-
slots: slots,
111-
waitingQueues: make(map[int][]*Lock),
116+
slots: slots,
112117
}
113118
}
114119

115120
// genLock generates Lock for the transaction with startTS and keys.
116121
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
122+
sort.Sort(bytesSlice(keys))
117123
return &Lock{
124+
keys: keys,
118125
requiredSlots: latches.genSlotIDs(keys),
119126
acquiredCount: 0,
120127
startTS: startTS,
@@ -126,17 +133,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int {
126133
for _, key := range keys {
127134
slots = append(slots, latches.slotID(key))
128135
}
129-
sort.Ints(slots)
130-
if len(slots) <= 1 {
131-
return slots
132-
}
133-
dedup := slots[:1]
134-
for i := 1; i < len(slots); i++ {
135-
if slots[i] != slots[i-1] {
136-
dedup = append(dedup, slots[i])
137-
}
138-
}
139-
return dedup
136+
return slots
140137
}
141138

142139
// slotID return slotID for current key.
@@ -150,8 +147,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
150147
return acquireStale
151148
}
152149
for lock.acquiredCount < len(lock.requiredSlots) {
153-
slotID := lock.requiredSlots[lock.acquiredCount]
154-
status := latches.acquireSlot(slotID, lock)
150+
status := latches.acquireSlot(lock)
155151
if status != acquireSuccess {
156152
return status
157153
}
@@ -161,75 +157,129 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
161157

162158
// release releases all latches owned by the `lock` and returns the wakeup list.
163159
// Preconditions: the caller must ensure the transaction's status is not locked.
164-
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock {
160+
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
165161
wakeupList = wakeupList[:0]
166-
for i := 0; i < lock.acquiredCount; i++ {
167-
slotID := lock.requiredSlots[i]
168-
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil {
162+
for lock.acquiredCount > 0 {
163+
if nextLock := latches.releaseSlot(lock); nextLock != nil {
169164
wakeupList = append(wakeupList, nextLock)
170165
}
171166
}
172167
return wakeupList
173168
}
174169

175-
// refreshCommitTS refreshes commitTS for keys.
176-
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) {
177-
slotIDs := latches.genSlotIDs(keys)
178-
for _, slotID := range slotIDs {
179-
latches.slots[slotID].refreshCommitTS(commitTS)
180-
}
181-
}
182-
183-
func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) {
170+
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
171+
key := lock.keys[lock.acquiredCount-1]
172+
slotID := lock.requiredSlots[lock.acquiredCount-1]
184173
latch := &latches.slots[slotID]
174+
lock.acquiredCount--
185175
latch.Lock()
186176
defer latch.Unlock()
187-
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
188-
if !latch.hasMoreWaiting {
189-
latch.free()
177+
178+
find := findNode(latch.queue, key)
179+
if find.value != lock {
180+
panic("releaseSlot wrong")
181+
}
182+
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
183+
find.value = nil
184+
if len(latch.waiting) == 0 {
190185
return nil
191186
}
192-
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
193-
latch.waitingQueueHead = nextLock.startTS
194-
nextLock.acquiredCount++
195-
if latch.maxCommitTS > nextLock.startTS {
196-
nextLock.isStale = true
187+
188+
var idx int
189+
for idx = 0; idx < len(latch.waiting); idx++ {
190+
waiting := latch.waiting[idx]
191+
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 {
192+
break
193+
}
197194
}
198-
return nextLock
199-
}
195+
// Wake up the first one in waiting queue.
196+
if idx < len(latch.waiting) {
197+
nextLock = latch.waiting[idx]
198+
// Delete element latch.waiting[idx] from the array.
199+
copy(latch.waiting[idx:], latch.waiting[idx+1:])
200+
latch.waiting[len(latch.waiting)-1] = nil
201+
latch.waiting = latch.waiting[:len(latch.waiting)-1]
200202

201-
func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) {
202-
latches.Lock()
203-
defer latches.Unlock()
204-
waiting := latches.waitingQueues[slotID]
205-
front = waiting[0]
206-
if len(waiting) == 1 {
207-
delete(latches.waitingQueues, slotID)
208-
} else {
209-
latches.waitingQueues[slotID] = waiting[1:]
210-
hasMoreWaiting = true
203+
if find.maxCommitTS > nextLock.startTS {
204+
nextLock.isStale = true
205+
}
211206
}
207+
212208
return
213209
}
214210

215-
func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult {
211+
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
212+
key := lock.keys[lock.acquiredCount]
213+
slotID := lock.requiredSlots[lock.acquiredCount]
216214
latch := &latches.slots[slotID]
217215
latch.Lock()
218216
defer latch.Unlock()
219-
if latch.maxCommitTS > lock.startTS {
217+
218+
// Try to recycle to limit the memory usage.
219+
if latch.count >= latchListCount {
220+
latch.recycle(lock.startTS)
221+
}
222+
223+
find := findNode(latch.queue, key)
224+
if find == nil {
225+
tmp := &node{
226+
slotID: slotID,
227+
key: key,
228+
value: lock,
229+
}
230+
tmp.next = latch.queue
231+
latch.queue = tmp
232+
latch.count++
233+
234+
lock.acquiredCount++
235+
return acquireSuccess
236+
}
237+
238+
if find.maxCommitTS > lock.startTS {
220239
lock.isStale = true
221240
return acquireStale
222241
}
223242

224-
if latch.isEmpty() {
225-
latch.waitingQueueHead = lock.startTS
243+
if find.value == nil {
244+
find.value = lock
226245
lock.acquiredCount++
227246
return acquireSuccess
228247
}
248+
229249
// Push the current transaction into waitingQueue.
230-
latch.hasMoreWaiting = true
231-
latches.Lock()
232-
defer latches.Unlock()
233-
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock)
250+
latch.waiting = append(latch.waiting, lock)
234251
return acquireLocked
235252
}
253+
254+
// recycle is not thread safe, the latch should acquire its lock before executing this function.
255+
func (l *latch) recycle(currentTS uint64) {
256+
fakeHead := node{next: l.queue}
257+
prev := &fakeHead
258+
for curr := prev.next; curr != nil; curr = curr.next {
259+
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
260+
l.count--
261+
prev.next = curr.next
262+
} else {
263+
prev = curr
264+
}
265+
}
266+
l.queue = fakeHead.next
267+
}
268+
269+
func (latches *Latches) recycle(currentTS uint64) {
270+
for i := 0; i < len(latches.slots); i++ {
271+
latch := &latches.slots[i]
272+
latch.Lock()
273+
latch.recycle(currentTS)
274+
latch.Unlock()
275+
}
276+
}
277+
278+
func findNode(list *node, key []byte) *node {
279+
for n := list; n != nil; n = n.next {
280+
if bytes.Compare(n.key, key) == 0 {
281+
return n
282+
}
283+
}
284+
return nil
285+
}

0 commit comments

Comments
 (0)