Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
mc.PutStore(store)
}

// AddLabersStoreWithLearnerCount adds store with specified count of region, learner and labels.
func (mc *Cluster) AddLabersStoreWithLearnerCount(storeID uint64, regionCount int, learnerCount int, labels map[string]string) {
mc.AddLabelsStore(storeID, regionCount, labels)
store := mc.GetStore(storeID).Clone(core.SetLearnerCount(learnerCount))
mc.PutStore(store)
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
Expand Down
6 changes: 4 additions & 2 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type StoreStatus struct {
RegionWeight float64 `json:"region_weight"`
RegionScore float64 `json:"region_score"`
RegionSize int64 `json:"region_size"`
WitnessCount int `json:"witness_count"`
SlowScore uint64 `json:"slow_score"`
LearnerCount int `json:"learner_count,omitempty"`
WitnessCount int `json:"witness_count,omitempty"`
SlowScore uint64 `json:"slow_score,omitempty"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
IsBusy bool `json:"is_busy,omitempty"`
Expand Down Expand Up @@ -94,6 +95,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.GetRegionSize(),
LearnerCount: store.GetLearnerCount(),
WitnessCount: store.GetWitnessCount(),
SlowScore: store.GetSlowScore(),
SendingSnapCount: store.GetSendingSnapCount(),
Expand Down
4 changes: 2 additions & 2 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount)
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
Expand Down
4 changes: 2 additions & 2 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,11 +1171,11 @@ func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
}

// GetStoreStats returns the store stats.
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, pending int, leaderSize, regionSize int64) {
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, learner, pending int, leaderSize, regionSize int64) {
r.st.RLock()
defer r.st.RUnlock()
return r.leaders[storeID].length(), r.getStoreRegionCountLocked(storeID), r.witnesses[storeID].length(),
r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
}

// GetRegionCount gets the total count of RegionInfo of regionMap
Expand Down
9 changes: 8 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type StoreInfo struct {
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
Expand Down Expand Up @@ -221,6 +222,11 @@ func (s *StoreInfo) GetRegionCount() int {
return s.regionCount
}

// GetLearnerCount returns the learner count of the store.
func (s *StoreInfo) GetLearnerCount() int {
return s.learnerCount
}

// GetWitnessCount returns the witness count of the store.
func (s *StoreInfo) GetWitnessCount() int {
return s.witnessCount
Expand Down Expand Up @@ -709,11 +715,12 @@ func (s *StoresInfo) SetRegionSize(storeID uint64, regionSize int64) {
}

// UpdateStoreStatus updates the information of the store.
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64, witnessCount int) {
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount int, leaderSize int64, regionSize int64) {
if store, ok := s.stores[storeID]; ok {
newStore := store.ShallowClone(SetLeaderCount(leaderCount),
SetRegionCount(regionCount),
SetWitnessCount(witnessCount),
SetLearnerCount(learnerCount),
SetPendingPeerCount(pendingPeerCount),
SetLeaderSize(leaderSize),
SetRegionSize(regionSize))
Expand Down
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func SetRegionCount(regionCount int) StoreCreateOption {
}
}

// SetLearnerCount sets the learner count for the store.
func SetLearnerCount(learnerCount int) StoreCreateOption {
return func(store *StoreInfo) {
store.learnerCount = learnerCount
}
}

// SetWitnessCount sets the witness count for the store.
func SetWitnessCount(witnessCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
4 changes: 4 additions & 0 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
Expand Down
10 changes: 7 additions & 3 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestStateSwitch(t *testing.T) {
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 4,
DRReplicas: 1,
DRReplicas: 2,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(ctx, config.NewTestOptions())
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestStateSwitch(t *testing.T) {

// add new store in dr zone.
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone2"})
cluster.AddLabersStoreWithLearnerCount(6, 1, 1, map[string]string{"zone": "zone2"})
// async -> sync
rep.tickDR()
re.Equal(drStateSyncRecover, rep.drGetState())
Expand All @@ -233,10 +233,14 @@ func TestStateSwitch(t *testing.T) {
rep.tickDR()
re.Equal(drStateSync, rep.drGetState()) // cannot guarantee majority, keep sync.

setStoreState(cluster, "up", "up", "up", "up", "up", "down")
rep.tickDR()
re.Equal(drStateSync, rep.drGetState())

// once the voter node down, even learner node up, swith to async state.
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()

rep.drSwitchToSync()
replicator.errors[2] = errors.New("fail to replicate")
Expand Down
4 changes: 4 additions & 0 deletions server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type storeStatistics struct {
StorageCapacity uint64
RegionCount int
LeaderCount int
LearnerCount int
WitnessCount int
LabelCounter map[string]int
Preparing int
Expand Down Expand Up @@ -119,6 +120,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_size").Set(float64(store.GetLeaderSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_count").Set(float64(store.GetLeaderCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "witness_count").Set(float64(store.GetWitnessCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "learner_count").Set(float64(store.GetLearnerCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_available").Set(float64(store.GetAvailable()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_used").Set(float64(store.GetUsedSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_capacity").Set(float64(store.GetCapacity()))
Expand Down Expand Up @@ -170,6 +172,7 @@ func (s *storeStatistics) Collect() {
metrics["region_count"] = float64(s.RegionCount)
metrics["leader_count"] = float64(s.LeaderCount)
metrics["witness_count"] = float64(s.WitnessCount)
metrics["learner_count"] = float64(s.LearnerCount)
metrics["storage_size"] = float64(s.StorageSize)
metrics["storage_capacity"] = float64(s.StorageCapacity)

Expand Down Expand Up @@ -241,6 +244,7 @@ func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
"leader_size",
"leader_count",
"witness_count",
"learner_count",
"store_available",
"store_used",
"store_capacity",
Expand Down