Skip to content

Commit 87ffadc

Browse files
committed
region_scatterer: fix the bug that could generate schedule with too few peers (tikv#4570)
close tikv#4565 Signed-off-by: HunDunDM <hundundm@gmail.com>
1 parent 552c53e commit 87ffadc

File tree

2 files changed

+74
-10
lines changed

2 files changed

+74
-10
lines changed

server/schedule/region_scatterer.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera
274274

275275
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
276276
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
277-
ordinaryPeers := make(map[uint64]*metapb.Peer)
277+
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
278278
specialPeers := make(map[string]map[uint64]*metapb.Peer)
279279
// Group peers by the engine of their stores
280280
for _, peer := range region.GetPeers() {
@@ -283,24 +283,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
283283
return nil
284284
}
285285
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
286-
ordinaryPeers[peer.GetId()] = peer
286+
ordinaryPeers[peer.GetStoreId()] = peer
287287
} else {
288288
engine := store.GetLabelValue(filter.EngineKey)
289289
if _, ok := specialPeers[engine]; !ok {
290290
specialPeers[engine] = make(map[uint64]*metapb.Peer)
291291
}
292-
specialPeers[engine][peer.GetId()] = peer
292+
specialPeers[engine][peer.GetStoreId()] = peer
293293
}
294294
}
295295

296-
targetPeers := make(map[uint64]*metapb.Peer)
297-
selectedStores := make(map[uint64]struct{})
298-
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) {
296+
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
297+
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
298+
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
299299
for _, peer := range peers {
300-
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
301-
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
302-
targetPeers[newPeer.GetStoreId()] = newPeer
303-
selectedStores[newPeer.GetStoreId()] = struct{}{}
300+
if _, ok := selectedStores[peer.GetStoreId()]; ok {
301+
// It is both sourcePeer and targetPeer itself, no need to select.
302+
continue
303+
}
304+
for {
305+
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
306+
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
307+
targetPeers[newPeer.GetStoreId()] = newPeer
308+
selectedStores[newPeer.GetStoreId()] = struct{}{}
309+
// If the selected peer is a peer other than origin peer in this region,
310+
// it is considered that the selected peer select itself.
311+
// This origin peer re-selects.
312+
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
313+
break
314+
}
315+
}
304316
}
305317
}
306318

server/schedule/region_scatterer_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
. "github.com/pingcap/check"
1111
"github.com/pingcap/failpoint"
12+
"github.com/pingcap/kvproto/pkg/metapb"
1213
"github.com/tikv/pd/pkg/mock/mockcluster"
1314
"github.com/tikv/pd/server/config"
1415
"github.com/tikv/pd/server/core"
@@ -476,3 +477,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
476477
}
477478
check(scatterer.ordinaryEngine.selectedPeer)
478479
}
480+
481+
// TestSelectedStores tests if the peer count has changed due to the picking strategy.
482+
// Ref https://github.com/tikv/pd/issues/4565
483+
func (s *testScatterRegionSuite) TestSelectedStores(c *C) {
484+
ctx, cancel := context.WithCancel(context.Background())
485+
defer cancel()
486+
opt := config.NewTestOptions()
487+
tc := mockcluster.NewCluster(opt)
488+
// Add 4 stores.
489+
for i := uint64(1); i <= 4; i++ {
490+
tc.AddRegionStore(i, 0)
491+
// prevent store from being disconnected
492+
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
493+
}
494+
group := "group"
495+
scatterer := NewRegionScatterer(ctx, tc)
496+
497+
// Put a lot of regions in Store 1/2/3.
498+
for i := uint64(1); i < 100; i++ {
499+
region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1)
500+
peers := make(map[uint64]*metapb.Peer, 3)
501+
for _, peer := range region.GetPeers() {
502+
peers[peer.GetStoreId()] = peer
503+
}
504+
scatterer.Put(peers, i%3+1, group)
505+
}
506+
507+
// Try to scatter a region with peer store id 2/3/4
508+
for i := uint64(1); i < 20; i++ {
509+
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
510+
op := scatterer.scatterRegion(region, group)
511+
c.Assert(isPeerCountChanged(op), IsFalse)
512+
}
513+
}
514+
515+
func isPeerCountChanged(op *operator.Operator) bool {
516+
if op == nil {
517+
return false
518+
}
519+
add, remove := 0, 0
520+
for i := 0; i < op.Len(); i++ {
521+
step := op.Step(i)
522+
switch step.(type) {
523+
case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner:
524+
add++
525+
case operator.RemovePeer:
526+
remove++
527+
}
528+
}
529+
return add != remove
530+
}

0 commit comments

Comments
 (0)