Skip to content

Commit 11f71ee

Browse files
ti-chi-botbufferfliesti-chi-bot[bot]
authored
Scatter: make peer scatter logic same with the leader (#6965) (#7027)
close #6962 In past, PD conside peer distribution in the different group influenece by using `TotalCountByStore` , but not include the leader distribution. The max used situation is partition table. After this pr, TIDB call scatter api will use same group not different. ref: #3422 pingcap/tidb#46156 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: buffer <1045931706@qq.com> Co-authored-by: bufferflies <1045931706@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 094ce3c commit 11f71ee

File tree

2 files changed

+81
-110
lines changed

2 files changed

+81
-110
lines changed

server/schedule/region_scatterer.go

Lines changed: 45 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math"
21+
"strconv"
2122
"sync"
2223
"time"
2324

@@ -87,26 +88,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64,
8788
return s.getDistributionByGroupLocked(group)
8889
}
8990

90-
// TotalCountByStore counts the total count by store
91-
func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 {
92-
s.mu.RLock()
93-
defer s.mu.RUnlock()
94-
groups := s.groupDistribution.GetAllID()
95-
totalCount := uint64(0)
96-
for _, group := range groups {
97-
storeDistribution, ok := s.getDistributionByGroupLocked(group)
98-
if !ok {
99-
continue
100-
}
101-
count, ok := storeDistribution[storeID]
102-
if !ok {
103-
continue
104-
}
105-
totalCount += count
106-
}
107-
return totalCount
108-
}
109-
11091
// getDistributionByGroupLocked should be called with lock
11192
func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) {
11293
if result, ok := s.groupDistribution.Get(group); ok {
@@ -315,6 +296,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
315296
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
316297
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
317298
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
299+
filterLen := len(context.filterFuncs) + 2
300+
filters := make([]filter.Filter, filterLen)
301+
for i, filterFunc := range context.filterFuncs {
302+
filters[i] = filterFunc()
303+
}
304+
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
318305
for _, peer := range peers {
319306
if _, ok := selectedStores[peer.GetStoreId()]; ok {
320307
if allowLeader(oldFit, peer) {
@@ -323,9 +310,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
323310
// It is both sourcePeer and targetPeer itself, no need to select.
324311
continue
325312
}
313+
sourceStore := r.cluster.GetStore(peer.GetStoreId())
314+
if sourceStore == nil {
315+
log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
316+
continue
317+
}
318+
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
326319
for {
327-
candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context)
328-
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
320+
newPeer := r.selectNewPeer(context, group, peer, filters)
329321
targetPeers[newPeer.GetStoreId()] = newPeer
330322
selectedStores[newPeer.GetStoreId()] = struct{}{}
331323
// If the selected peer is a peer other than origin peer in this region,
@@ -346,7 +338,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
346338
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
347339
// special engine stores if the engine supports to become a leader. But now there is only
348340
// one engine, tiflash, which does not support the leader, so don't consider it for now.
349-
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
341+
targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
350342
if targetLeader == 0 {
351343
scatterCounter.WithLabelValues("no-leader", "").Inc()
352344
return nil
@@ -381,6 +373,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
381373
if op != nil {
382374
scatterCounter.WithLabelValues("success", "").Inc()
383375
r.Put(targetPeers, targetLeader, group)
376+
op.AdditionalInfos["group"] = group
377+
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
384378
op.SetPriorityLevel(core.High)
385379
}
386380
return op
@@ -418,69 +412,52 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
418412
return region.GetLeader().GetStoreId() == targetLeader
419413
}
420414

421-
func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
422-
sourceStore := r.cluster.GetStore(sourceStoreID)
423-
if sourceStore == nil {
424-
log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore))
425-
return nil
426-
}
427-
filters := []filter.Filter{
428-
filter.NewExcludedFilter(r.name, nil, selectedStores),
429-
}
430-
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
431-
for _, filterFunc := range context.filterFuncs {
432-
filters = append(filters, filterFunc())
433-
}
434-
filters = append(filters, scoreGuard)
415+
// selectNewPeer return the new peer which pick the fewest picked count.
416+
// it keeps the origin peer if the origin store's pick count is equal the fewest pick.
417+
// it can be diveded into three steps:
418+
// 1. found the max pick count and the min pick count.
419+
// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
420+
// 3. otherwise, select the store which pick count is the min pick count and pass all filter.
421+
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
435422
stores := r.cluster.GetStores()
436-
candidates := make([]uint64, 0)
437423
maxStoreTotalCount := uint64(0)
438424
minStoreTotalCount := uint64(math.MaxUint64)
439425
for _, store := range stores {
440-
count := context.selectedPeer.TotalCountByStore(store.GetID())
426+
count := context.selectedPeer.Get(store.GetID(), group)
441427
if count > maxStoreTotalCount {
442428
maxStoreTotalCount = count
443429
}
444430
if count < minStoreTotalCount {
445431
minStoreTotalCount = count
446432
}
447433
}
434+
435+
var newPeer *metapb.Peer
436+
minCount := uint64(math.MaxUint64)
437+
originStorePickedCount := uint64(math.MaxUint64)
448438
for _, store := range stores {
449-
storeCount := context.selectedPeer.TotalCountByStore(store.GetID())
439+
storeCount := context.selectedPeer.Get(store.GetID(), group)
440+
if store.GetID() == peer.GetId() {
441+
originStorePickedCount = storeCount
442+
}
450443
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
451444
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
452445
// could be selected as candidate.
453446
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
454447
if filter.Target(r.cluster.GetOpts(), store, filters) {
455-
candidates = append(candidates, store.GetID())
448+
if storeCount < minCount {
449+
minCount = storeCount
450+
newPeer = &metapb.Peer{
451+
StoreId: store.GetID(),
452+
Role: peer.GetRole(),
453+
}
454+
}
456455
}
457456
}
458457
}
459-
return candidates
460-
}
461-
462-
func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer {
463-
if len(candidates) < 1 {
458+
if originStorePickedCount <= minCount {
464459
return peer
465460
}
466-
var newPeer *metapb.Peer
467-
minCount := uint64(math.MaxUint64)
468-
for _, storeID := range candidates {
469-
count := context.selectedPeer.Get(storeID, group)
470-
if count < minCount {
471-
minCount = count
472-
newPeer = &metapb.Peer{
473-
StoreId: storeID,
474-
Role: peer.GetRole(),
475-
}
476-
}
477-
}
478-
// if the source store have the least count, we don't need to scatter this peer
479-
for _, storeID := range candidates {
480-
if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount {
481-
return peer
482-
}
483-
}
484461
if newPeer == nil {
485462
return peer
486463
}
@@ -489,11 +466,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
489466

490467
// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
491468
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
492-
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
469+
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo,
470+
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) {
493471
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
494472
if sourceStore == nil {
495473
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
496-
return 0
474+
return 0, 0
497475
}
498476
minStoreGroupLeader := uint64(math.MaxUint64)
499477
id := uint64(0)
@@ -508,7 +486,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.
508486
id = storeID
509487
}
510488
}
511-
return id
489+
return id, minStoreGroupLeader
512490
}
513491

514492
// Put put the final distribution in the context no matter the operator was created

server/schedule/region_scatterer_test.go

Lines changed: 36 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"fmt"
2020
"math"
21-
"math/rand"
2221
"strconv"
2322
"sync"
2423
"testing"
@@ -532,48 +531,11 @@ func TestSelectedStoreGC(t *testing.T) {
532531
re.False(ok)
533532
}
534533

535-
// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
536-
// After scatter, the distribution for the whole cluster should be well.
537-
func TestRegionFromDifferentGroups(t *testing.T) {
538-
re := require.New(t)
539-
ctx, cancel := context.WithCancel(context.Background())
540-
defer cancel()
541-
opt := config.NewTestOptions()
542-
tc := mockcluster.NewCluster(ctx, opt)
543-
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
544-
oc := NewOperatorController(ctx, tc, stream)
545-
// Add 6 stores.
546-
storeCount := 6
547-
for i := uint64(1); i <= uint64(storeCount); i++ {
548-
tc.AddRegionStore(i, 0)
549-
}
550-
scatterer := NewRegionScatterer(ctx, tc, oc)
551-
regionCount := 50
552-
for i := 1; i <= regionCount; i++ {
553-
p := rand.Perm(storeCount)
554-
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
555-
}
556-
check := func(ss *selectedStores) {
557-
max := uint64(0)
558-
min := uint64(math.MaxUint64)
559-
for i := uint64(1); i <= uint64(storeCount); i++ {
560-
count := ss.TotalCountByStore(i)
561-
if count > max {
562-
max = count
563-
}
564-
if count < min {
565-
min = count
566-
}
567-
}
568-
re.LessOrEqual(max-min, uint64(2))
569-
}
570-
check(scatterer.ordinaryEngine.selectedPeer)
571-
}
572-
573534
func TestRegionHasLearner(t *testing.T) {
574535
re := require.New(t)
575536
ctx, cancel := context.WithCancel(context.Background())
576537
defer cancel()
538+
group := "group"
577539
opt := config.NewTestOptions()
578540
tc := mockcluster.NewCluster(ctx, opt)
579541
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
@@ -616,14 +578,14 @@ func TestRegionHasLearner(t *testing.T) {
616578
scatterer := NewRegionScatterer(ctx, tc, oc)
617579
regionCount := 50
618580
for i := 1; i <= regionCount; i++ {
619-
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group")
581+
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group)
620582
re.NoError(err)
621583
}
622584
check := func(ss *selectedStores) {
623585
max := uint64(0)
624586
min := uint64(math.MaxUint64)
625587
for i := uint64(1); i <= max; i++ {
626-
count := ss.TotalCountByStore(i)
588+
count := ss.Get(i, group)
627589
if count > max {
628590
max = count
629591
}
@@ -638,7 +600,7 @@ func TestRegionHasLearner(t *testing.T) {
638600
max := uint64(0)
639601
min := uint64(math.MaxUint64)
640602
for i := uint64(1); i <= voterCount; i++ {
641-
count := ss.TotalCountByStore(i)
603+
count := ss.Get(i, group)
642604
if count > max {
643605
max = count
644606
}
@@ -649,7 +611,7 @@ func TestRegionHasLearner(t *testing.T) {
649611
re.LessOrEqual(max-2, uint64(regionCount)/voterCount)
650612
re.LessOrEqual(min-1, uint64(regionCount)/voterCount)
651613
for i := voterCount + 1; i <= storeCount; i++ {
652-
count := ss.TotalCountByStore(i)
614+
count := ss.Get(i, group)
653615
re.LessOrEqual(count, uint64(0))
654616
}
655617
}
@@ -690,6 +652,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
690652
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
691653
op := scatterer.scatterRegion(region, group)
692654
re.False(isPeerCountChanged(op))
655+
if op != nil {
656+
re.Equal(group, op.AdditionalInfos["group"])
657+
}
693658
}
694659
}
695660

@@ -730,6 +695,34 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
730695
}
731696
}
732697

698+
// TestBalanceLeader only tests whether region leaders are balanced after scatter.
699+
func TestBalanceLeader(t *testing.T) {
700+
re := require.New(t)
701+
ctx, cancel := context.WithCancel(context.Background())
702+
defer cancel()
703+
opt := config.NewTestOptions()
704+
tc := mockcluster.NewCluster(ctx, opt)
705+
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
706+
oc := NewOperatorController(ctx, tc, stream)
707+
// Add 3 stores
708+
for i := uint64(2); i <= 4; i++ {
709+
tc.AddLabelsStore(i, 0, nil)
710+
// prevent store from being disconnected
711+
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
712+
}
713+
group := "group"
714+
scatterer := NewRegionScatterer(ctx, tc, oc)
715+
for i := uint64(1001); i <= 1300; i++ {
716+
region := tc.AddLeaderRegion(i, 2, 3, 4)
717+
op := scatterer.scatterRegion(region, group)
718+
re.False(isPeerCountChanged(op))
719+
}
720+
// all leader will be balanced in three stores.
721+
for i := uint64(2); i <= 4; i++ {
722+
re.Equal(uint64(100), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
723+
}
724+
}
725+
733726
// TestBalanceRegion tests whether region peers and leaders are balanced after scatter.
734727
// ref https://github.com/tikv/pd/issues/6017
735728
func TestBalanceRegion(t *testing.T) {

0 commit comments

Comments
 (0)