Skip to content

Commit ba09621

Browse files
ti-chi-botdisksing
andauthored
replication_mode: use placement to determin canSync and hasMajority (#7202) (#7209)
close #7201 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: disksing <i@disksing.com> Co-authored-by: disksing <i@disksing.com>
1 parent 77d6f5b commit ba09621

File tree

2 files changed

+239
-45
lines changed

2 files changed

+239
-45
lines changed

server/replication/replication_mode.go

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/tikv/pd/server/config"
3434
"github.com/tikv/pd/server/core"
3535
"github.com/tikv/pd/server/schedule"
36+
"github.com/tikv/pd/server/schedule/placement"
3637
"github.com/tikv/pd/server/storage/endpoint"
3738
"go.uber.org/zap"
3839
)
@@ -364,37 +365,80 @@ func (m *ModeManager) Run(ctx context.Context) {
364365
wg.Wait()
365366
}
366367

368+
func storeIDs(stores []*core.StoreInfo) []uint64 {
369+
ids := make([]uint64, len(stores))
370+
for i, s := range stores {
371+
ids[i] = s.GetID()
372+
}
373+
return ids
374+
}
375+
376+
func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
377+
if rule.Role == placement.Learner {
378+
return 0
379+
}
380+
var up, down int
381+
for _, s := range upStores {
382+
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
383+
up++
384+
}
385+
}
386+
for _, s := range downStores {
387+
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
388+
down++
389+
}
390+
}
391+
minimalUp := rule.Count - down
392+
if minimalUp < 0 {
393+
minimalUp = 0
394+
}
395+
if minimalUp > up {
396+
minimalUp = up
397+
}
398+
return minimalUp
399+
}
400+
367401
func (m *ModeManager) tickUpdateState() {
368402
if m.getModeName() != modeDRAutoSync {
369403
return
370404
}
371405

372406
drTickCounter.Inc()
373407

374-
totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
375408
stores := m.checkStoreStatus()
376409

377-
// canSync is true when every region has at least 1 replica in each DC.
378-
canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers &&
379-
len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0
410+
var primaryHasVoter, drHasVoter bool
411+
var totalVoter, totalUpVoter int
412+
for _, r := range m.cluster.GetRuleManager().GetAllRules() {
413+
if len(r.StartKey) > 0 || len(r.EndKey) > 0 {
414+
// All rules should be global rules. If not, skip it.
415+
continue
416+
}
417+
if r.Role != placement.Learner {
418+
totalVoter += r.Count
419+
}
420+
minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown])
421+
minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown])
422+
primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
423+
drHasVoter = drHasVoter || minimalUpDr > 0
424+
upVoters := minimalUpPrimary + minimalUpDr
425+
if upVoters > r.Count {
426+
upVoters = r.Count
427+
}
428+
totalUpVoter += upVoters
429+
}
380430

431+
// canSync is true when every region has at least 1 voter replica in each DC.
381432
// hasMajority is true when every region has majority peer online.
382-
var upPeers int
383-
if len(stores[primaryDown]) < totalPrimaryPeers {
384-
upPeers += totalPrimaryPeers - len(stores[primaryDown])
385-
}
386-
if len(stores[drDown]) < totalDrPeers {
387-
upPeers += totalDrPeers - len(stores[drDown])
388-
}
389-
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers
433+
canSync := primaryHasVoter && drHasVoter
434+
hasMajority := totalUpVoter*2 > totalVoter
390435

391436
log.Debug("replication store status",
392-
zap.Uint64s("up-primary", stores[primaryUp]),
393-
zap.Uint64s("up-dr", stores[drUp]),
394-
zap.Uint64s("down-primary", stores[primaryDown]),
395-
zap.Uint64s("down-dr", stores[drDown]),
437+
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
438+
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
439+
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
440+
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
396441
zap.Bool("can-sync", canSync),
397-
zap.Int("up-peers", upPeers),
398442
zap.Bool("has-majority", hasMajority),
399443
)
400444

@@ -420,31 +464,31 @@ func (m *ModeManager) tickUpdateState() {
420464
case drStateSync:
421465
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
422466
if !canSync && hasMajority {
423-
m.drSwitchToAsyncWait(stores[primaryUp])
467+
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
424468
}
425469
case drStateAsyncWait:
426470
if canSync {
427471
m.drSwitchToSync()
428472
break
429473
}
430-
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) {
431-
m.drSwitchToAsyncWait(stores[primaryUp])
474+
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
475+
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
432476
break
433477
}
434-
if m.drCheckStoreStateUpdated(stores[primaryUp]) {
435-
m.drSwitchToAsync(stores[primaryUp])
478+
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
479+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
436480
}
437481
case drStateAsync:
438482
if canSync {
439483
m.drSwitchToSyncRecover()
440484
break
441485
}
442-
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) {
443-
m.drSwitchToAsync(stores[primaryUp])
486+
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
487+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
444488
}
445489
case drStateSyncRecover:
446490
if !canSync && hasMajority {
447-
m.drSwitchToAsync(stores[primaryUp])
491+
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
448492
} else {
449493
m.updateProgress()
450494
progress := m.estimateProgress()
@@ -519,10 +563,10 @@ const (
519563
storeStatusTypeCount
520564
)
521565

522-
func (m *ModeManager) checkStoreStatus() [][]uint64 {
566+
func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
523567
m.RLock()
524568
defer m.RUnlock()
525-
stores := make([][]uint64, storeStatusTypeCount)
569+
stores := make([][]*core.StoreInfo, storeStatusTypeCount)
526570
for _, s := range m.cluster.GetStores() {
527571
if s.IsRemoved() {
528572
continue
@@ -535,21 +579,21 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
535579
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
536580
if labelValue == m.config.DRAutoSync.Primary {
537581
if down {
538-
stores[primaryDown] = append(stores[primaryDown], s.GetID())
582+
stores[primaryDown] = append(stores[primaryDown], s)
539583
} else {
540-
stores[primaryUp] = append(stores[primaryUp], s.GetID())
584+
stores[primaryUp] = append(stores[primaryUp], s)
541585
}
542586
}
543587
if labelValue == m.config.DRAutoSync.DR {
544588
if down {
545-
stores[drDown] = append(stores[drDown], s.GetID())
589+
stores[drDown] = append(stores[drDown], s)
546590
} else {
547-
stores[drUp] = append(stores[drUp], s.GetID())
591+
stores[drUp] = append(stores[drUp], s)
548592
}
549593
}
550594
}
551595
for i := range stores {
552-
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] })
596+
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
553597
}
554598
return stores
555599
}

0 commit comments

Comments
 (0)