Skip to content

Commit 618f1cf

Browse files
committed
region_scatterer: fix the bug that could generate schedule with too few peers (tikv#4570)
close tikv#4565 Signed-off-by: HunDunDM <hundundm@gmail.com>
1 parent 0f07368 commit 618f1cf

File tree

2 files changed

+74
-10
lines changed

2 files changed

+74
-10
lines changed

server/schedule/region_scatterer.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera
273273

274274
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
275275
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
276-
ordinaryPeers := make(map[uint64]*metapb.Peer)
276+
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
277277
specialPeers := make(map[string]map[uint64]*metapb.Peer)
278278
// Group peers by the engine of their stores
279279
for _, peer := range region.GetPeers() {
@@ -282,24 +282,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
282282
return nil
283283
}
284284
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
285-
ordinaryPeers[peer.GetId()] = peer
285+
ordinaryPeers[peer.GetStoreId()] = peer
286286
} else {
287287
engine := store.GetLabelValue(filter.EngineKey)
288288
if _, ok := specialPeers[engine]; !ok {
289289
specialPeers[engine] = make(map[uint64]*metapb.Peer)
290290
}
291-
specialPeers[engine][peer.GetId()] = peer
291+
specialPeers[engine][peer.GetStoreId()] = peer
292292
}
293293
}
294294

295-
targetPeers := make(map[uint64]*metapb.Peer)
296-
selectedStores := make(map[uint64]struct{})
297-
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) {
295+
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
296+
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
297+
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
298298
for _, peer := range peers {
299-
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
300-
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
301-
targetPeers[newPeer.GetStoreId()] = newPeer
302-
selectedStores[newPeer.GetStoreId()] = struct{}{}
299+
if _, ok := selectedStores[peer.GetStoreId()]; ok {
300+
// It is both sourcePeer and targetPeer itself, no need to select.
301+
continue
302+
}
303+
for {
304+
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
305+
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
306+
targetPeers[newPeer.GetStoreId()] = newPeer
307+
selectedStores[newPeer.GetStoreId()] = struct{}{}
308+
// If the selected peer is a peer other than origin peer in this region,
309+
// it is considered that the selected peer select itself.
310+
// This origin peer re-selects.
311+
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
312+
break
313+
}
314+
}
303315
}
304316
}
305317

server/schedule/region_scatterer_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
. "github.com/pingcap/check"
1111
"github.com/pingcap/failpoint"
12+
"github.com/pingcap/kvproto/pkg/metapb"
1213
"github.com/tikv/pd/pkg/mock/mockcluster"
1314
"github.com/tikv/pd/server/config"
1415
"github.com/tikv/pd/server/core"
@@ -474,3 +475,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
474475
}
475476
check(scatterer.ordinaryEngine.selectedPeer)
476477
}
478+
479+
// TestSelectedStores tests if the peer count has changed due to the picking strategy.
480+
// Ref https://github.com/tikv/pd/issues/4565
481+
func (s *testScatterRegionSuite) TestSelectedStores(c *C) {
482+
ctx, cancel := context.WithCancel(context.Background())
483+
defer cancel()
484+
opt := config.NewTestOptions()
485+
tc := mockcluster.NewCluster(ctx, opt)
486+
// Add 4 stores.
487+
for i := uint64(1); i <= 4; i++ {
488+
tc.AddRegionStore(i, 0)
489+
// prevent store from being disconnected
490+
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
491+
}
492+
group := "group"
493+
scatterer := NewRegionScatterer(ctx, tc)
494+
495+
// Put a lot of regions in Store 1/2/3.
496+
for i := uint64(1); i < 100; i++ {
497+
region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1)
498+
peers := make(map[uint64]*metapb.Peer, 3)
499+
for _, peer := range region.GetPeers() {
500+
peers[peer.GetStoreId()] = peer
501+
}
502+
scatterer.Put(peers, i%3+1, group)
503+
}
504+
505+
// Try to scatter a region with peer store id 2/3/4
506+
for i := uint64(1); i < 20; i++ {
507+
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
508+
op := scatterer.scatterRegion(region, group)
509+
c.Assert(isPeerCountChanged(op), IsFalse)
510+
}
511+
}
512+
513+
func isPeerCountChanged(op *operator.Operator) bool {
514+
if op == nil {
515+
return false
516+
}
517+
add, remove := 0, 0
518+
for i := 0; i < op.Len(); i++ {
519+
step := op.Step(i)
520+
switch step.(type) {
521+
case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner:
522+
add++
523+
case operator.RemovePeer:
524+
remove++
525+
}
526+
}
527+
return add != remove
528+
}

0 commit comments

Comments
 (0)