@@ -29,8 +29,13 @@ import (
2929 "github.com/pingcap/log"
3030 "github.com/tikv/pd/pkg/core"
3131 "github.com/tikv/pd/pkg/errs"
32+ << << << < HEAD :server / replication / replication_mode.go
3233 "github.com/tikv/pd/pkg/schedule"
3334 "github.com/tikv/pd/pkg/slice"
35+ == == == =
36+ sche "github.com/tikv/pd/pkg/schedule/core"
37+ "github.com/tikv/pd/pkg/schedule/placement"
38+ >> >> >> > 4176 c1daa (replication_mode : use placement to determin canSync and hasMajority (#7202 )):pkg / replication / replication_mode .go
3439 "github.com/tikv/pd/pkg/storage/endpoint"
3540 "github.com/tikv/pd/pkg/utils/logutil"
3641 "github.com/tikv/pd/pkg/utils/syncutil"
@@ -391,37 +396,84 @@ func (m *ModeManager) Run(ctx context.Context) {
391396 }
392397}
393398
399+ << << << < HEAD:server / replication / replication_mode .go
394400func (m * ModeManager ) tickDR () {
401+ == == == =
402+ func storeIDs (stores []* core.StoreInfo ) []uint64 {
403+ ids := make ([]uint64 , len (stores ))
404+ for i , s := range stores {
405+ ids [i ] = s .GetID ()
406+ }
407+ return ids
408+ }
409+
410+ func minimalUpVoters (rule * placement.Rule , upStores , downStores []* core.StoreInfo ) int {
411+ if rule .Role == placement .Learner {
412+ return 0
413+ }
414+ var up , down int
415+ for _ , s := range upStores {
416+ if placement .MatchLabelConstraints (s , rule .LabelConstraints ) {
417+ up ++
418+ }
419+ }
420+ for _ , s := range downStores {
421+ if placement .MatchLabelConstraints (s , rule .LabelConstraints ) {
422+ down ++
423+ }
424+ }
425+ minimalUp := rule .Count - down
426+ if minimalUp < 0 {
427+ minimalUp = 0
428+ }
429+ if minimalUp > up {
430+ minimalUp = up
431+ }
432+ return minimalUp
433+ }
434+
435+ func (m * ModeManager ) tickUpdateState () {
436+ >> >> >> > 4176 c1daa (replication_mode : use placement to determin canSync and hasMajority (#7202 )):pkg / replication / replication_mode .go
395437 if m .getModeName () != modeDRAutoSync {
396438 return
397439 }
398440
399441 drTickCounter .Inc ()
400442
401- totalPrimaryPeers , totalDrPeers := m .config .DRAutoSync .PrimaryReplicas , m .config .DRAutoSync .DRReplicas
402443 stores := m .checkStoreStatus ()
403444
404- // canSync is true when every region has at least 1 replica in each DC.
405- canSync := len (stores [primaryDown ]) < totalPrimaryPeers && len (stores [drDown ]) < totalDrPeers &&
406- len (stores [primaryUp ]) > 0 && len (stores [drUp ]) > 0
445+ var primaryHasVoter , drHasVoter bool
446+ var totalVoter , totalUpVoter int
447+ for _ , r := range m .cluster .GetRuleManager ().GetAllRules () {
448+ if len (r .StartKey ) > 0 || len (r .EndKey ) > 0 {
449+ // All rules should be global rules. If not, skip it.
450+ continue
451+ }
452+ if r .Role != placement .Learner {
453+ totalVoter += r .Count
454+ }
455+ minimalUpPrimary := minimalUpVoters (r , stores [primaryUp ], stores [primaryDown ])
456+ minimalUpDr := minimalUpVoters (r , stores [drUp ], stores [drDown ])
457+ primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
458+ drHasVoter = drHasVoter || minimalUpDr > 0
459+ upVoters := minimalUpPrimary + minimalUpDr
460+ if upVoters > r .Count {
461+ upVoters = r .Count
462+ }
463+ totalUpVoter += upVoters
464+ }
407465
466+ // canSync is true when every region has at least 1 voter replica in each DC.
408467 // hasMajority is true when every region has majority peer online.
409- var upPeers int
410- if len (stores [primaryDown ]) < totalPrimaryPeers {
411- upPeers += totalPrimaryPeers - len (stores [primaryDown ])
412- }
413- if len (stores [drDown ]) < totalDrPeers {
414- upPeers += totalDrPeers - len (stores [drDown ])
415- }
416- hasMajority := upPeers * 2 > totalPrimaryPeers + totalDrPeers
468+ canSync := primaryHasVoter && drHasVoter
469+ hasMajority := totalUpVoter * 2 > totalVoter
417470
418471 log .Debug ("replication store status" ,
419- zap .Uint64s ("up-primary" , stores [primaryUp ]),
420- zap .Uint64s ("up-dr" , stores [drUp ]),
421- zap .Uint64s ("down-primary" , stores [primaryDown ]),
422- zap .Uint64s ("down-dr" , stores [drDown ]),
472+ zap .Uint64s ("up-primary" , storeIDs ( stores [primaryUp ]) ),
473+ zap .Uint64s ("up-dr" , storeIDs ( stores [drUp ]) ),
474+ zap .Uint64s ("down-primary" , storeIDs ( stores [primaryDown ]) ),
475+ zap .Uint64s ("down-dr" , storeIDs ( stores [drDown ]) ),
423476 zap .Bool ("can-sync" , canSync ),
424- zap .Int ("up-peers" , upPeers ),
425477 zap .Bool ("has-majority" , hasMajority ),
426478 )
427479
@@ -447,31 +499,31 @@ func (m *ModeManager) tickDR() {
447499 case drStateSync :
448500 // If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
449501 if ! canSync && hasMajority {
450- m .drSwitchToAsyncWait (stores [primaryUp ])
502+ m .drSwitchToAsyncWait (storeIDs ( stores [primaryUp ]) )
451503 }
452504 case drStateAsyncWait :
453505 if canSync {
454506 m .drSwitchToSync ()
455507 break
456508 }
457- if oldAvailableStores := m .drGetAvailableStores (); ! reflect .DeepEqual (oldAvailableStores , stores [primaryUp ]) {
458- m .drSwitchToAsyncWait (stores [primaryUp ])
509+ if oldAvailableStores := m .drGetAvailableStores (); ! reflect .DeepEqual (oldAvailableStores , storeIDs ( stores [primaryUp ]) ) {
510+ m .drSwitchToAsyncWait (storeIDs ( stores [primaryUp ]) )
459511 break
460512 }
461- if m .drCheckStoreStateUpdated (stores [primaryUp ]) {
462- m .drSwitchToAsync (stores [primaryUp ])
513+ if m .drCheckStoreStateUpdated (storeIDs ( stores [primaryUp ]) ) {
514+ m .drSwitchToAsync (storeIDs ( stores [primaryUp ]) )
463515 }
464516 case drStateAsync :
465517 if canSync {
466518 m .drSwitchToSyncRecover ()
467519 break
468520 }
469- if ! reflect .DeepEqual (m .drGetAvailableStores (), stores [primaryUp ]) && m .drCheckStoreStateUpdated (stores [primaryUp ]) {
470- m .drSwitchToAsync (stores [primaryUp ])
521+ if ! reflect .DeepEqual (m .drGetAvailableStores (), stores [primaryUp ]) && m .drCheckStoreStateUpdated (storeIDs ( stores [primaryUp ]) ) {
522+ m .drSwitchToAsync (storeIDs ( stores [primaryUp ]) )
471523 }
472524 case drStateSyncRecover :
473525 if ! canSync && hasMajority {
474- m .drSwitchToAsync (stores [primaryUp ])
526+ m .drSwitchToAsync (storeIDs ( stores [primaryUp ]) )
475527 } else {
476528 m .updateProgress ()
477529 progress := m .estimateProgress ()
@@ -496,10 +548,10 @@ const (
496548 storeStatusTypeCount
497549)
498550
499- func (m * ModeManager ) checkStoreStatus () [][]uint64 {
551+ func (m * ModeManager ) checkStoreStatus () [][]* core. StoreInfo {
500552 m .RLock ()
501553 defer m .RUnlock ()
502- stores := make ([][]uint64 , storeStatusTypeCount )
554+ stores := make ([][]* core. StoreInfo , storeStatusTypeCount )
503555 for _ , s := range m .cluster .GetStores () {
504556 if s .IsRemoved () {
505557 continue
@@ -512,21 +564,21 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
512564 labelValue := s .GetLabelValue (m .config .DRAutoSync .LabelKey )
513565 if labelValue == m .config .DRAutoSync .Primary {
514566 if down {
515- stores [primaryDown ] = append (stores [primaryDown ], s . GetID () )
567+ stores [primaryDown ] = append (stores [primaryDown ], s )
516568 } else {
517- stores [primaryUp ] = append (stores [primaryUp ], s . GetID () )
569+ stores [primaryUp ] = append (stores [primaryUp ], s )
518570 }
519571 }
520572 if labelValue == m .config .DRAutoSync .DR {
521573 if down {
522- stores [drDown ] = append (stores [drDown ], s . GetID () )
574+ stores [drDown ] = append (stores [drDown ], s )
523575 } else {
524- stores [drUp ] = append (stores [drUp ], s . GetID () )
576+ stores [drUp ] = append (stores [drUp ], s )
525577 }
526578 }
527579 }
528580 for i := range stores {
529- sort .Slice (stores [i ], func (a , b int ) bool { return stores [i ][a ] < stores [i ][b ] })
581+ sort .Slice (stores [i ], func (a , b int ) bool { return stores [i ][a ]. GetID () < stores [i ][b ]. GetID () })
530582 }
531583 return stores
532584}
0 commit comments