Skip to content

Commit 39a4de6

Browse files
region_scatter: fix a bug that the leaders may be unbalanced after scatter region (tikv#6054) (tikv#6060)
close tikv#6017, ref tikv#6054 Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com> Co-authored-by: Cabinfever_B <cabinfeveroier@gmail.com>
1 parent 0646a37 commit 39a4de6

File tree

2 files changed

+68
-15
lines changed

2 files changed

+68
-15
lines changed

server/schedule/region_scatterer.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/tikv/pd/server/core"
3232
"github.com/tikv/pd/server/schedule/filter"
3333
"github.com/tikv/pd/server/schedule/operator"
34+
"github.com/tikv/pd/server/schedule/placement"
3435
"go.uber.org/zap"
3536
)
3637

@@ -286,6 +287,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
286287
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
287288
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
288289
specialPeers := make(map[string]map[uint64]*metapb.Peer)
290+
oldFit := r.cluster.GetRuleManager().FitRegion(r.cluster, region)
289291
// Group peers by the engine of their stores
290292
for _, peer := range region.GetPeers() {
291293
store := r.cluster.GetStore(peer.GetStoreId())
@@ -304,10 +306,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
304306
}
305307

306308
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
307-
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
309+
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
310+
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
308311
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
309312
for _, peer := range peers {
310313
if _, ok := selectedStores[peer.GetStoreId()]; ok {
314+
if allowLeader(oldFit, peer) {
315+
leaderCandidateStores = append(leaderCandidateStores, peer.GetStoreId())
316+
}
311317
// It is both sourcePeer and targetPeer itself, no need to select.
312318
continue
313319
}
@@ -321,6 +327,9 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
321327
// This origin peer re-selects.
322328
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
323329
selectedStores[peer.GetStoreId()] = struct{}{}
330+
if allowLeader(oldFit, peer) {
331+
leaderCandidateStores = append(leaderCandidateStores, newPeer.GetStoreId())
332+
}
324333
break
325334
}
326335
}
@@ -331,7 +340,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
331340
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
332341
// special engine stores if the engine supports to become a leader. But now there is only
333342
// one engine, tiflash, which does not support the leader, so don't consider it for now.
334-
targetLeader := r.selectAvailableLeaderStore(group, region, targetPeers, r.ordinaryEngine)
343+
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
335344
if targetLeader == 0 {
336345
scatterCounter.WithLabelValues("no-leader", "").Inc()
337346
return nil
@@ -369,6 +378,22 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
369378
return op
370379
}
371380

381+
func allowLeader(fit *placement.RegionFit, peer *metapb.Peer) bool {
382+
switch peer.GetRole() {
383+
case metapb.PeerRole_Learner, metapb.PeerRole_DemotingVoter:
384+
return false
385+
}
386+
peerFit := fit.GetRuleFit(peer.GetId())
387+
if peerFit == nil || peerFit.Rule == nil {
388+
return false
389+
}
390+
switch peerFit.Rule.Role {
391+
case placement.Voter, placement.Leader:
392+
return true
393+
}
394+
return false
395+
}
396+
372397
func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) bool {
373398
peers := region.GetPeers()
374399
for _, peer := range peers {
@@ -448,27 +473,19 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
448473

449474
// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
450475
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
451-
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
476+
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
452477
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
453478
if sourceStore == nil {
454479
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
455480
return 0
456481
}
457-
leaderCandidateStores := make([]uint64, 0)
458-
// use PlacementLeaderSafeguard for filtering follower and learner in rule
459-
filter := filter.NewPlacementLeaderSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, true /*allowMoveLeader*/)
460-
for storeID := range peers {
461-
store := r.cluster.GetStore(storeID)
462-
if store == nil {
463-
return 0
464-
}
465-
if filter == nil || filter.Target(r.cluster.GetOpts(), store) {
466-
leaderCandidateStores = append(leaderCandidateStores, storeID)
467-
}
468-
}
469482
minStoreGroupLeader := uint64(math.MaxUint64)
470483
id := uint64(0)
471484
for _, storeID := range leaderCandidateStores {
485+
store := r.cluster.GetStore(storeID)
486+
if store == nil {
487+
continue
488+
}
472489
storeGroupLeaderCount := context.selectedLeader.Get(storeID, group)
473490
if minStoreGroupLeader > storeGroupLeaderCount {
474491
minStoreGroupLeader = storeGroupLeaderCount

server/schedule/region_scatterer_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"math"
2121
"math/rand"
22+
"strconv"
2223
"testing"
2324
"time"
2425

@@ -680,6 +681,41 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
680681
}
681682
}
682683

684+
// TestBalanceRegion tests whether region peers and leaders are balanced after scatter.
685+
// ref https://github.com/tikv/pd/issues/6017
686+
func TestBalanceRegion(t *testing.T) {
687+
re := require.New(t)
688+
ctx, cancel := context.WithCancel(context.Background())
689+
defer cancel()
690+
opt := config.NewTestOptions()
691+
opt.SetLocationLabels([]string{"host"})
692+
tc := mockcluster.NewCluster(ctx, opt)
693+
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
694+
oc := NewOperatorController(ctx, tc, stream)
695+
// Add 6 stores in 3 hosts.
696+
for i := uint64(2); i <= 7; i++ {
697+
tc.AddLabelsStore(i, 0, map[string]string{"host": strconv.FormatUint(i/2, 10)})
698+
// prevent store from being disconnected
699+
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
700+
}
701+
group := "group"
702+
scatterer := NewRegionScatterer(ctx, tc, oc)
703+
for i := uint64(1001); i <= 1300; i++ {
704+
region := tc.AddLeaderRegion(i, 2, 4, 6)
705+
op := scatterer.scatterRegion(region, group)
706+
re.False(isPeerCountChanged(op))
707+
}
708+
for i := uint64(2); i <= 7; i++ {
709+
re.Equal(uint64(150), scatterer.ordinaryEngine.selectedPeer.Get(i, group))
710+
re.Equal(uint64(50), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
711+
}
712+
// Test for unhealthy region
713+
// ref https://github.com/tikv/pd/issues/6099
714+
region := tc.AddLeaderRegion(1500, 2, 3, 4, 6)
715+
op := scatterer.scatterRegion(region, group)
716+
re.False(isPeerCountChanged(op))
717+
}
718+
683719
func isPeerCountChanged(op *operator.Operator) bool {
684720
if op == nil {
685721
return false

0 commit comments

Comments
 (0)