@@ -33,6 +33,7 @@ import (
3333 "github.com/tikv/pd/server/config"
3434 "github.com/tikv/pd/server/core"
3535 "github.com/tikv/pd/server/schedule"
36+ "github.com/tikv/pd/server/schedule/placement"
3637 "github.com/tikv/pd/server/storage/endpoint"
3738 "go.uber.org/zap"
3839)
@@ -363,37 +364,72 @@ func (m *ModeManager) Run(ctx context.Context) {
363364 wg .Wait ()
364365}
365366
367+ func minimalUpVoters (rule * placement.Rule , upStores , downStores []* core.StoreInfo ) int {
368+ if rule .Role == placement .Learner {
369+ return 0
370+ }
371+ var up , down int
372+ for _ , s := range upStores {
373+ if placement .MatchLabelConstraints (s , rule .LabelConstraints ) {
374+ up ++
375+ }
376+ }
377+ for _ , s := range downStores {
378+ if placement .MatchLabelConstraints (s , rule .LabelConstraints ) {
379+ down ++
380+ }
381+ }
382+ minimalUp := rule .Count - down
383+ if minimalUp < 0 {
384+ minimalUp = 0
385+ }
386+ if minimalUp > up {
387+ minimalUp = up
388+ }
389+ return minimalUp
390+ }
391+
366392func (m * ModeManager ) tickUpdateState () {
367393 if m .getModeName () != modeDRAutoSync {
368394 return
369395 }
370396
371397 drTickCounter .Inc ()
372398
373- totalPrimaryPeers , totalDrPeers := m .config .DRAutoSync .PrimaryReplicas , m .config .DRAutoSync .DRReplicas
374- stores := m .checkStoreStatus ()
399+ stores , storeIDs := m .checkStoreStatus ()
375400
376- // canSync is true when every region has at least 1 replica in each DC.
377- canSync := len (stores [primaryDown ]) < totalPrimaryPeers && len (stores [drDown ]) < totalDrPeers &&
378- len (stores [primaryUp ]) > 0 && len (stores [drUp ]) > 0
401+ var primaryHasVoter , drHasVoter bool
402+ var totalVoter , totalUpVoter int
403+ for _ , r := range m .cluster .GetRuleManager ().GetAllRules () {
404+ if len (r .StartKey ) > 0 || len (r .EndKey ) > 0 {
405+ // All rules should be global rules. If not, skip it.
406+ continue
407+ }
408+ if r .Role != placement .Learner {
409+ totalVoter += r .Count
410+ }
411+ minimalUpPrimary := minimalUpVoters (r , stores [primaryUp ], stores [primaryDown ])
412+ minimalUpDr := minimalUpVoters (r , stores [drUp ], stores [drDown ])
413+ primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
414+ drHasVoter = drHasVoter || minimalUpDr > 0
415+ upVoters := minimalUpPrimary + minimalUpDr
416+ if upVoters > r .Count {
417+ upVoters = r .Count
418+ }
419+ totalUpVoter += upVoters
420+ }
379421
422+ // canSync is true when every region has at least 1 voter replica in each DC.
380423 // hasMajority is true when every region has majority peer online.
381- var upPeers int
382- if len (stores [primaryDown ]) < totalPrimaryPeers {
383- upPeers += totalPrimaryPeers - len (stores [primaryDown ])
384- }
385- if len (stores [drDown ]) < totalDrPeers {
386- upPeers += totalDrPeers - len (stores [drDown ])
387- }
388- hasMajority := upPeers * 2 > totalPrimaryPeers + totalDrPeers
424+ canSync := primaryHasVoter && drHasVoter
425+ hasMajority := totalUpVoter * 2 > totalVoter
389426
390427 log .Debug ("replication store status" ,
391- zap .Uint64s ("up-primary" , stores [primaryUp ]),
392- zap .Uint64s ("up-dr" , stores [drUp ]),
393- zap .Uint64s ("down-primary" , stores [primaryDown ]),
394- zap .Uint64s ("down-dr" , stores [drDown ]),
428+ zap .Uint64s ("up-primary" , storeIDs [primaryUp ]),
429+ zap .Uint64s ("up-dr" , storeIDs [drUp ]),
430+ zap .Uint64s ("down-primary" , storeIDs [primaryDown ]),
431+ zap .Uint64s ("down-dr" , storeIDs [drDown ]),
395432 zap .Bool ("can-sync" , canSync ),
396- zap .Int ("up-peers" , upPeers ),
397433 zap .Bool ("has-majority" , hasMajority ),
398434 )
399435
@@ -419,31 +455,31 @@ func (m *ModeManager) tickUpdateState() {
419455 case drStateSync :
420456 // If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
421457 if ! canSync && hasMajority {
422- m .drSwitchToAsyncWait (stores [primaryUp ])
458+ m .drSwitchToAsyncWait (storeIDs [primaryUp ])
423459 }
424460 case drStateAsyncWait :
425461 if canSync {
426462 m .drSwitchToSync ()
427463 break
428464 }
429- if oldAvailableStores := m .drGetAvailableStores (); ! reflect .DeepEqual (oldAvailableStores , stores [primaryUp ]) {
430- m .drSwitchToAsyncWait (stores [primaryUp ])
465+ if oldAvailableStores := m .drGetAvailableStores (); ! reflect .DeepEqual (oldAvailableStores , storeIDs [primaryUp ]) {
466+ m .drSwitchToAsyncWait (storeIDs [primaryUp ])
431467 break
432468 }
433- if m .drCheckStoreStateUpdated (stores [primaryUp ]) {
434- m .drSwitchToAsync (stores [primaryUp ])
469+ if m .drCheckStoreStateUpdated (storeIDs [primaryUp ]) {
470+ m .drSwitchToAsync (storeIDs [primaryUp ])
435471 }
436472 case drStateAsync :
437473 if canSync {
438474 m .drSwitchToSyncRecover ()
439475 break
440476 }
441- if ! reflect .DeepEqual (m .drGetAvailableStores (), stores [primaryUp ]) && m .drCheckStoreStateUpdated (stores [primaryUp ]) {
442- m .drSwitchToAsync (stores [primaryUp ])
477+ if ! reflect .DeepEqual (m .drGetAvailableStores (), storeIDs [primaryUp ]) && m .drCheckStoreStateUpdated (storeIDs [primaryUp ]) {
478+ m .drSwitchToAsync (storeIDs [primaryUp ])
443479 }
444480 case drStateSyncRecover :
445481 if ! canSync && hasMajority {
446- m .drSwitchToAsync (stores [primaryUp ])
482+ m .drSwitchToAsync (storeIDs [primaryUp ])
447483 } else {
448484 m .updateProgress ()
449485 progress := m .estimateProgress ()
@@ -518,39 +554,40 @@ const (
518554 storeStatusTypeCount
519555)
520556
521- func (m * ModeManager ) checkStoreStatus () [][]uint64 {
557+ func (m * ModeManager ) checkStoreStatus () ( [][]* core. StoreInfo , [][] uint64 ) {
522558 m .RLock ()
523559 defer m .RUnlock ()
524- stores := make ([][]uint64 , storeStatusTypeCount )
560+ stores , storeIDs := make ([][] * core. StoreInfo , storeStatusTypeCount ), make ([][]uint64 , storeStatusTypeCount )
525561 for _ , s := range m .cluster .GetStores () {
526562 if s .IsRemoved () {
527563 continue
528564 }
529- // learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
530- if s .GetRegionCount () == s .GetLearnerCount () {
531- continue
532- }
533565 down := s .DownTime () >= m .config .DRAutoSync .WaitStoreTimeout .Duration
534566 labelValue := s .GetLabelValue (m .config .DRAutoSync .LabelKey )
535567 if labelValue == m .config .DRAutoSync .Primary {
536568 if down {
537- stores [primaryDown ] = append (stores [primaryDown ], s .GetID ())
569+ stores [primaryDown ] = append (stores [primaryDown ], s )
570+ storeIDs [primaryDown ] = append (storeIDs [primaryDown ], s .GetID ())
538571 } else {
539- stores [primaryUp ] = append (stores [primaryUp ], s .GetID ())
572+ stores [primaryUp ] = append (stores [primaryUp ], s )
573+ storeIDs [primaryUp ] = append (storeIDs [primaryUp ], s .GetID ())
540574 }
541575 }
542576 if labelValue == m .config .DRAutoSync .DR {
543577 if down {
544- stores [drDown ] = append (stores [drDown ], s .GetID ())
578+ stores [drDown ] = append (stores [drDown ], s )
579+ storeIDs [drDown ] = append (storeIDs [drDown ], s .GetID ())
545580 } else {
546- stores [drUp ] = append (stores [drUp ], s .GetID ())
581+ stores [drUp ] = append (stores [drUp ], s )
582+ storeIDs [drUp ] = append (storeIDs [drUp ], s .GetID ())
547583 }
548584 }
549585 }
550586 for i := range stores {
551- sort .Slice (stores [i ], func (a , b int ) bool { return stores [i ][a ] < stores [i ][b ] })
587+ sort .Slice (stores [i ], func (a , b int ) bool { return stores [i ][a ].GetID () < stores [i ][b ].GetID () })
588+ sort .Slice (storeIDs [i ], func (a , b int ) bool { return storeIDs [i ][a ] < storeIDs [i ][b ] })
552589 }
553- return stores
590+ return stores , storeIDs
554591}
555592
556593// UpdateStoreDRStatus saves the dr-autosync status of a store.
0 commit comments