Skip to content

Commit 77183d5

Browse files
ti-srebotYisaerti-chi-bot
authored
schedule: revise region_scatter distribution for multi groups (#3422) (#3621)
Signed-off-by: ti-srebot <ti-srebot@pingcap.com> Co-authored-by: Song Gao <disxiaofei@163.com> Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
1 parent 04ed1ff commit 77183d5

File tree

6 files changed

+123
-32
lines changed

6 files changed

+123
-32
lines changed

pkg/cache/ttl.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,16 @@ func (c *TTLString) Pop() (string, interface{}, bool) {
256256
func (c *TTLString) Get(id string) (interface{}, bool) {
257257
return c.ttlCache.get(id)
258258
}
259+
260+
// GetAllID returns all key ids
261+
func (c *TTLString) GetAllID() []string {
262+
keys := c.ttlCache.getKeys()
263+
var ids []string
264+
for _, key := range keys {
265+
id, ok := key.(string)
266+
if ok {
267+
ids = append(ids, id)
268+
}
269+
}
270+
return ids
271+
}

server/api/region_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,10 @@ func (s *testRegionSuite) TestScatterRegions(c *C) {
294294
err := postJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", s.urlPrefix), []byte(body))
295295
c.Assert(err, IsNil)
296296
op1 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(601)
297-
c.Assert(op1 != nil, Equals, true)
298297
op2 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(602)
299-
c.Assert(op2 != nil, Equals, true)
298+
op3 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(603)
299+
// At least one operator used to scatter region
300+
c.Assert(op1 != nil || op2 != nil || op3 != nil, IsTrue)
300301
}
301302

302303
func (s *testRegionSuite) TestSplitRegions(c *C) {

server/schedule/operator/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (b *Builder) EnableLightWeight() *Builder {
304304
return b
305305
}
306306

307-
// EnableForceTargetLeader marks the step of transferring leader to target is forcible. It is used for grant leader.
307+
// EnableForceTargetLeader marks the step of transferring leader to target is forcible.
308308
func (b *Builder) EnableForceTargetLeader() *Builder {
309309
b.forceTargetLeader = true
310310
return b

server/schedule/operator/create_operator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ func CreateScatterRegionOperator(desc string, cluster opt.Cluster, origin *core.
195195
SetPeers(targetPeers).
196196
SetLeader(leader).
197197
EnableLightWeight().
198+
// EnableForceTargetLeader in order to ignore the leader schedule limit
199+
EnableForceTargetLeader().
198200
Build(0)
199201
}
200202

server/schedule/region_scatterer.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,23 @@ func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]
9494
return nil, false
9595
}
9696

97+
func (s *selectedStores) totalCountByStore(storeID uint64) uint64 {
98+
groups := s.groupDistribution.GetAllID()
99+
totalCount := uint64(0)
100+
for _, group := range groups {
101+
storeDistribution, ok := s.getDistributionByGroupLocked(group)
102+
if !ok {
103+
continue
104+
}
105+
count, ok := storeDistribution[storeID]
106+
if !ok {
107+
continue
108+
}
109+
totalCount += count
110+
}
111+
return totalCount
112+
}
113+
97114
// RegionScatterer scatters regions.
98115
type RegionScatterer struct {
99116
ctx context.Context
@@ -328,9 +345,26 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
328345
filters = append(filters, scoreGuard)
329346
stores := r.cluster.GetStores()
330347
candidates := make([]uint64, 0)
348+
maxStoreTotalCount := uint64(0)
349+
minStoreTotalCount := uint64(math.MaxUint64)
350+
for _, store := range r.cluster.GetStores() {
351+
count := context.selectedPeer.totalCountByStore(store.GetID())
352+
if count > maxStoreTotalCount {
353+
maxStoreTotalCount = count
354+
}
355+
if count < minStoreTotalCount {
356+
minStoreTotalCount = count
357+
}
358+
}
331359
for _, store := range stores {
332-
if filter.Target(r.cluster.GetOpts(), store, filters) {
333-
candidates = append(candidates, store.GetID())
360+
storeCount := context.selectedPeer.totalCountByStore(store.GetID())
361+
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
362+
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
363+
// could be selected as candidate.
364+
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
365+
if filter.Target(r.cluster.GetOpts(), store, filters) {
366+
candidates = append(candidates, store.GetID())
367+
}
334368
}
335369
}
336370
return candidates
@@ -367,12 +401,17 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
367401
// selectAvailableLeaderStores select the target leader store from the candidates. The candidates would be collected by
368402
// the existed peers store depended on the leader counts in the group level.
369403
func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
370-
minStoreGroupLeader := uint64(math.MaxUint64)
371-
id := uint64(0)
404+
leaderCandidateStores := make([]uint64, 0)
372405
for storeID := range peers {
373-
if id == 0 {
374-
id = storeID
406+
store := r.cluster.GetStore(storeID)
407+
engine := store.GetLabelValue(filter.EngineKey)
408+
if len(engine) < 1 {
409+
leaderCandidateStores = append(leaderCandidateStores, storeID)
375410
}
411+
}
412+
minStoreGroupLeader := uint64(math.MaxUint64)
413+
id := uint64(0)
414+
for _, storeID := range leaderCandidateStores {
376415
storeGroupLeaderCount := context.selectedLeader.Get(storeID, group)
377416
if minStoreGroupLeader > storeGroupLeaderCount {
378417
minStoreGroupLeader = storeGroupLeaderCount

server/schedule/region_scatterer_test.go

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"math"
7+
"math/rand"
78
"time"
89

910
. "github.com/pingcap/check"
@@ -292,7 +293,7 @@ func (s *testScatterRegionSuite) TestScatterCheck(c *C) {
292293
}
293294
}
294295

295-
func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
296+
func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
296297
opt := config.NewTestOptions()
297298
tc := mockcluster.NewCluster(opt)
298299
// Add 5 stores.
@@ -318,43 +319,43 @@ func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
318319
},
319320
}
320321

322+
// We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency.
321323
for _, testcase := range testcases {
322324
c.Logf(testcase.name)
323325
ctx, cancel := context.WithCancel(context.Background())
324326
scatterer := NewRegionScatterer(ctx, tc)
325327
regionID := 1
326328
for i := 0; i < 100; i++ {
327329
for j := 0; j < testcase.groupCount; j++ {
328-
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
330+
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
329331
fmt.Sprintf("group-%v", j))
330-
c.Assert(err, IsNil)
331332
regionID++
332333
}
333-
// insert region with no group
334-
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), "")
335-
c.Assert(err, IsNil)
336-
regionID++
337334
}
338335

339-
for i := 0; i < testcase.groupCount; i++ {
340-
// comparing the leader distribution
341-
group := fmt.Sprintf("group-%v", i)
342-
max := uint64(0)
343-
min := uint64(math.MaxUint64)
344-
groupDistribution, _ := scatterer.ordinaryEngine.selectedLeader.groupDistribution.Get(group)
345-
for _, count := range groupDistribution.(map[uint64]uint64) {
346-
if count > max {
347-
max = count
348-
}
349-
if count < min {
350-
min = count
336+
checker := func(ss *selectedStores, expected uint64, delta float64) {
337+
for i := 0; i < testcase.groupCount; i++ {
338+
// comparing the leader distribution
339+
group := fmt.Sprintf("group-%v", i)
340+
max := uint64(0)
341+
min := uint64(math.MaxUint64)
342+
groupDistribution, _ := ss.groupDistribution.Get(group)
343+
for _, count := range groupDistribution.(map[uint64]uint64) {
344+
if count > max {
345+
max = count
346+
}
347+
if count < min {
348+
min = count
349+
}
351350
}
351+
c.Assert(math.Abs(float64(max)-float64(expected)), LessEqual, delta)
352+
c.Assert(math.Abs(float64(min)-float64(expected)), LessEqual, delta)
352353
}
353-
// 100 regions divided 5 stores, each store expected to have about 20 regions.
354-
c.Assert(min, LessEqual, uint64(20))
355-
c.Assert(max, GreaterEqual, uint64(20))
356-
c.Assert(max-min, LessEqual, uint64(5))
357354
}
355+
// For leader, we expect each store have about 20 leader for each group
356+
checker(scatterer.ordinaryEngine.selectedLeader, 20, 5)
357+
// For peer, we expect each store have about 50 peers for each group
358+
checker(scatterer.ordinaryEngine.selectedPeer, 50, 15)
358359
cancel()
359360
}
360361
}
@@ -440,3 +441,38 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) {
440441
_, ok = stores.GetGroupDistribution("testgroup")
441442
c.Assert(ok, Equals, false)
442443
}
444+
445+
// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
446+
// After scatter, the distribution for the whole cluster should be well.
447+
func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
448+
opt := config.NewTestOptions()
449+
tc := mockcluster.NewCluster(opt)
450+
// Add 6 stores.
451+
storeCount := 6
452+
for i := uint64(1); i <= uint64(storeCount); i++ {
453+
tc.AddRegionStore(i, 0)
454+
}
455+
ctx, cancel := context.WithCancel(context.Background())
456+
defer cancel()
457+
scatterer := NewRegionScatterer(ctx, tc)
458+
regionCount := 50
459+
for i := 1; i <= regionCount; i++ {
460+
p := rand.Perm(storeCount)
461+
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
462+
}
463+
check := func(ss *selectedStores) {
464+
max := uint64(0)
465+
min := uint64(math.MaxUint64)
466+
for i := uint64(1); i <= uint64(storeCount); i++ {
467+
count := ss.totalCountByStore(i)
468+
if count > max {
469+
max = count
470+
}
471+
if count < min {
472+
min = count
473+
}
474+
}
475+
c.Assert(max-min, LessEqual, uint64(2))
476+
}
477+
check(scatterer.ordinaryEngine.selectedPeer)
478+
}

0 commit comments

Comments
 (0)