Skip to content

Commit 60df228

Browse files
ti-chi-botdisksing
andcommitted
replication_mode: use placement to determin canSync and hasMajority (tikv#7202) (tikv#7209)
close tikv#7201 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: disksing <i@disksing.com> Co-authored-by: disksing <i@disksing.com>
1 parent 75bb796 commit 60df228

File tree

2 files changed

+239
-45
lines changed

2 files changed

+239
-45
lines changed

server/replication/replication_mode.go

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/tikv/pd/server/config"
3434
"github.com/tikv/pd/server/core"
3535
"github.com/tikv/pd/server/schedule"
36+
"github.com/tikv/pd/server/schedule/placement"
3637
"github.com/tikv/pd/server/storage/endpoint"
3738
"go.uber.org/zap"
3839
)
@@ -363,37 +364,80 @@ func (m *ModeManager) Run(ctx context.Context) {
363364
wg.Wait()
364365
}
365366

367+
func storeIDs(stores []*core.StoreInfo) []uint64 {
368+
ids := make([]uint64, len(stores))
369+
for i, s := range stores {
370+
ids[i] = s.GetID()
371+
}
372+
return ids
373+
}
374+
375+
func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
376+
if rule.Role == placement.Learner {
377+
return 0
378+
}
379+
var up, down int
380+
for _, s := range upStores {
381+
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
382+
up++
383+
}
384+
}
385+
for _, s := range downStores {
386+
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
387+
down++
388+
}
389+
}
390+
minimalUp := rule.Count - down
391+
if minimalUp < 0 {
392+
minimalUp = 0
393+
}
394+
if minimalUp > up {
395+
minimalUp = up
396+
}
397+
return minimalUp
398+
}
399+
366400
func (m *ModeManager) tickUpdateState() {
367401
if m.getModeName() != modeDRAutoSync {
368402
return
369403
}
370404

371405
drTickCounter.Inc()
372406

373-
totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
374407
stores := m.checkStoreStatus()
375408

376-
// canSync is true when every region has at least 1 replica in each DC.
377-
canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers &&
378-
len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0
409+
var primaryHasVoter, drHasVoter bool
410+
var totalVoter, totalUpVoter int
411+
for _, r := range m.cluster.GetRuleManager().GetAllRules() {
412+
if len(r.StartKey) > 0 || len(r.EndKey) > 0 {
413+
// All rules should be global rules. If not, skip it.
414+
continue
415+
}
416+
if r.Role != placement.Learner {
417+
totalVoter += r.Count
418+
}
419+
minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown])
420+
minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown])
421+
primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
422+
drHasVoter = drHasVoter || minimalUpDr > 0
423+
upVoters := minimalUpPrimary + minimalUpDr
424+
if upVoters > r.Count {
425+
upVoters = r.Count
426+
}
427+
totalUpVoter += upVoters
428+
}
379429

430+
// canSync is true when every region has at least 1 voter replica in each DC.
380431
// hasMajority is true when every region has majority peer online.
381-
var upPeers int
382-
if len(stores[primaryDown]) < totalPrimaryPeers {
383-
upPeers += totalPrimaryPeers - len(stores[primaryDown])
384-
}
385-
if len(stores[drDown]) < totalDrPeers {
386-
upPeers += totalDrPeers - len(stores[drDown])
387-
}
388-
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers
432+
canSync := primaryHasVoter && drHasVoter
433+
hasMajority := totalUpVoter*2 > totalVoter
389434

390435
log.Debug("replication store status",
391-
zap.Uint64s("up-primary", stores[primaryUp]),
392-
zap.Uint64s("up-dr", stores[drUp]),
393-
zap.Uint64s("down-primary", stores[primaryDown]),
394-
zap.Uint64s("down-dr", stores[drDown]),
436+
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
437+
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
438+
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
439+
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
395440
zap.Bool("can-sync", canSync),
396-
zap.Int("up-peers", upPeers),
397441
zap.Bool("has-majority", hasMajority),
398442
)
399443

@@ -419,31 +463,31 @@ func (m *ModeManager) tickUpdateState() {
419463
case drStateSync:
420464
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
421465
if !canSync && hasMajority {
422-
m.drSwitchToAsyncWait(stores[primaryUp])
466+
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
423467
}
424468
case drStateAsyncWait:
425469
if canSync {
426470
m.drSwitchToSync()
427471
break
428472
}
429-
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) {
430-
m.drSwitchToAsyncWait(stores[primaryUp])
473+
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
474+
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
431475
break
432476
}
433-
if m.drCheckStoreStateUpdated(stores[primaryUp]) {
434-
m.drSwitchToAsync(stores[primaryUp])
477+
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
478+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
435479
}
436480
case drStateAsync:
437481
if canSync {
438482
m.drSwitchToSyncRecover()
439483
break
440484
}
441-
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) {
442-
m.drSwitchToAsync(stores[primaryUp])
485+
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
486+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
443487
}
444488
case drStateSyncRecover:
445489
if !canSync && hasMajority {
446-
m.drSwitchToAsync(stores[primaryUp])
490+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
447491
} else {
448492
m.updateProgress()
449493
progress := m.estimateProgress()
@@ -502,10 +546,10 @@ const (
502546
storeStatusTypeCount
503547
)
504548

505-
func (m *ModeManager) checkStoreStatus() [][]uint64 {
549+
func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
506550
m.RLock()
507551
defer m.RUnlock()
508-
stores := make([][]uint64, storeStatusTypeCount)
552+
stores := make([][]*core.StoreInfo, storeStatusTypeCount)
509553
for _, s := range m.cluster.GetStores() {
510554
if s.IsRemoved() {
511555
continue
@@ -518,21 +562,21 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
518562
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
519563
if labelValue == m.config.DRAutoSync.Primary {
520564
if down {
521-
stores[primaryDown] = append(stores[primaryDown], s.GetID())
565+
stores[primaryDown] = append(stores[primaryDown], s)
522566
} else {
523-
stores[primaryUp] = append(stores[primaryUp], s.GetID())
567+
stores[primaryUp] = append(stores[primaryUp], s)
524568
}
525569
}
526570
if labelValue == m.config.DRAutoSync.DR {
527571
if down {
528-
stores[drDown] = append(stores[drDown], s.GetID())
572+
stores[drDown] = append(stores[drDown], s)
529573
} else {
530-
stores[drUp] = append(stores[drUp], s.GetID())
574+
stores[drUp] = append(stores[drUp], s)
531575
}
532576
}
533577
}
534578
for i := range stores {
535-
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] })
579+
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
536580
}
537581
return stores
538582
}

0 commit comments

Comments
 (0)