Skip to content

Commit b71905d

Browse files
ti-chi-botHunDunDM
andauthored
region_scatterer: fix the bug that could generate schedule with too few peers (#4570) (#4581)
Signed-off-by: HunDunDM <hundundm@gmail.com> Signed-off-by: nolouch <nolouch@gmail.com> Co-authored-by: 混沌DM <hundundm@gmail.com>
1 parent 9413840 commit b71905d

File tree

2 files changed

+74
-10
lines changed

2 files changed

+74
-10
lines changed

server/schedule/region_scatterer.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera
274274

275275
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
276276
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
277-
ordinaryPeers := make(map[uint64]*metapb.Peer)
277+
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
278278
specialPeers := make(map[string]map[uint64]*metapb.Peer)
279279
// Group peers by the engine of their stores
280280
for _, peer := range region.GetPeers() {
@@ -283,24 +283,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
283283
return nil
284284
}
285285
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
286-
ordinaryPeers[peer.GetId()] = peer
286+
ordinaryPeers[peer.GetStoreId()] = peer
287287
} else {
288288
engine := store.GetLabelValue(core.EngineKey)
289289
if _, ok := specialPeers[engine]; !ok {
290290
specialPeers[engine] = make(map[uint64]*metapb.Peer)
291291
}
292-
specialPeers[engine][peer.GetId()] = peer
292+
specialPeers[engine][peer.GetStoreId()] = peer
293293
}
294294
}
295295

296-
targetPeers := make(map[uint64]*metapb.Peer)
297-
selectedStores := make(map[uint64]struct{})
298-
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) {
296+
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
297+
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
298+
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
299299
for _, peer := range peers {
300-
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
301-
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
302-
targetPeers[newPeer.GetStoreId()] = newPeer
303-
selectedStores[newPeer.GetStoreId()] = struct{}{}
300+
if _, ok := selectedStores[peer.GetStoreId()]; ok {
301+
// It is both sourcePeer and targetPeer itself, no need to select.
302+
continue
303+
}
304+
for {
305+
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
306+
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
307+
targetPeers[newPeer.GetStoreId()] = newPeer
308+
selectedStores[newPeer.GetStoreId()] = struct{}{}
309+
// If the selected peer is a peer other than origin peer in this region,
310+
// it is considered that the selected peer select itself.
311+
// This origin peer re-selects.
312+
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
313+
break
314+
}
315+
}
304316
}
305317
}
306318

server/schedule/region_scatterer_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
. "github.com/pingcap/check"
2525
"github.com/pingcap/failpoint"
26+
"github.com/pingcap/kvproto/pkg/metapb"
2627
"github.com/tikv/pd/pkg/mock/mockcluster"
2728
"github.com/tikv/pd/server/config"
2829
"github.com/tikv/pd/server/core"
@@ -490,3 +491,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
490491
}
491492
check(scatterer.ordinaryEngine.selectedPeer)
492493
}
494+
495+
// TestSelectedStores tests if the peer count has changed due to the picking strategy.
496+
// Ref https://github.com/tikv/pd/issues/4565
497+
func (s *testScatterRegionSuite) TestSelectedStores(c *C) {
498+
ctx, cancel := context.WithCancel(context.Background())
499+
defer cancel()
500+
opt := config.NewTestOptions()
501+
tc := mockcluster.NewCluster(ctx, opt)
502+
// Add 4 stores.
503+
for i := uint64(1); i <= 4; i++ {
504+
tc.AddRegionStore(i, 0)
505+
// prevent store from being disconnected
506+
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
507+
}
508+
group := "group"
509+
scatterer := NewRegionScatterer(ctx, tc)
510+
511+
// Put a lot of regions in Store 1/2/3.
512+
for i := uint64(1); i < 100; i++ {
513+
region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1)
514+
peers := make(map[uint64]*metapb.Peer, 3)
515+
for _, peer := range region.GetPeers() {
516+
peers[peer.GetStoreId()] = peer
517+
}
518+
scatterer.Put(peers, i%3+1, group)
519+
}
520+
521+
// Try to scatter a region with peer store id 2/3/4
522+
for i := uint64(1); i < 20; i++ {
523+
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
524+
op := scatterer.scatterRegion(region, group)
525+
c.Assert(isPeerCountChanged(op), IsFalse)
526+
}
527+
}
528+
529+
func isPeerCountChanged(op *operator.Operator) bool {
530+
if op == nil {
531+
return false
532+
}
533+
add, remove := 0, 0
534+
for i := 0; i < op.Len(); i++ {
535+
step := op.Step(i)
536+
switch step.(type) {
537+
case operator.AddPeer, operator.AddLearner:
538+
add++
539+
case operator.RemovePeer:
540+
remove++
541+
}
542+
}
543+
return add != remove
544+
}

0 commit comments

Comments
 (0)