Skip to content

Commit b06e23e

Browse files
ti-srebotYisaer
andauthored
schedule: revise region_scatter distribution for multi groups (#3422) (#3620)
* cherry pick #3422 to release-4.0 Signed-off-by: ti-srebot <ti-srebot@pingcap.com> * fix conflict Signed-off-by: yisaer <disxiaofei@163.com> * remove test Signed-off-by: yisaer <disxiaofei@163.com> * remove test Signed-off-by: yisaer <disxiaofei@163.com> Co-authored-by: Song Gao <disxiaofei@163.com>
1 parent 6b1f8ab commit b06e23e

File tree

3 files changed

+108
-35
lines changed

3 files changed

+108
-35
lines changed

pkg/cache/ttl.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,16 @@ func (c *TTLString) Pop() (string, interface{}, bool) {
256256
func (c *TTLString) Get(id string) (interface{}, bool) {
257257
return c.ttlCache.get(id)
258258
}
259+
260+
// GetAllID returns all key ids
261+
func (c *TTLString) GetAllID() []string {
262+
keys := c.ttlCache.getKeys()
263+
var ids []string
264+
for _, key := range keys {
265+
id, ok := key.(string)
266+
if ok {
267+
ids = append(ids, id)
268+
}
269+
}
270+
return ids
271+
}

server/schedule/region_scatterer.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@ func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]
9393
return nil, false
9494
}
9595

96+
func (s *selectedStores) totalCountByStore(storeID uint64) uint64 {
97+
groups := s.groupDistribution.GetAllID()
98+
totalCount := uint64(0)
99+
for _, group := range groups {
100+
storeDistribution, ok := s.getDistributionByGroupLocked(group)
101+
if !ok {
102+
continue
103+
}
104+
count, ok := storeDistribution[storeID]
105+
if !ok {
106+
continue
107+
}
108+
totalCount += count
109+
}
110+
return totalCount
111+
}
112+
96113
// RegionScatterer scatters regions.
97114
type RegionScatterer struct {
98115
ctx context.Context
@@ -336,9 +353,26 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
336353
filters = append(filters, scoreGuard)
337354
stores := r.cluster.GetStores()
338355
candidates := make([]uint64, 0)
356+
maxStoreTotalCount := uint64(0)
357+
minStoreTotalCount := uint64(math.MaxUint64)
358+
for _, store := range r.cluster.GetStores() {
359+
count := context.selectedPeer.totalCountByStore(store.GetID())
360+
if count > maxStoreTotalCount {
361+
maxStoreTotalCount = count
362+
}
363+
if count < minStoreTotalCount {
364+
minStoreTotalCount = count
365+
}
366+
}
339367
for _, store := range stores {
340-
if filter.Target(r.cluster, store, filters) && !store.IsBusy() {
341-
candidates = append(candidates, store.GetID())
368+
storeCount := context.selectedPeer.totalCountByStore(store.GetID())
369+
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
370+
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
371+
// could be selected as candidate.
372+
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
373+
if filter.Target(r.cluster, store, filters) && !store.IsBusy() {
374+
candidates = append(candidates, store.GetID())
375+
}
342376
}
343377
}
344378
return candidates

server/schedule/region_scatterer_test.go

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"math"
7+
"math/rand"
78
"time"
89

910
. "github.com/pingcap/check"
@@ -14,7 +15,6 @@ import (
1415
"github.com/tikv/pd/server/core"
1516
"github.com/tikv/pd/server/schedule/operator"
1617
"github.com/tikv/pd/server/schedule/placement"
17-
"github.com/tikv/pd/server/schedule/storelimit"
1818
)
1919

2020
type sequencer struct {
@@ -86,8 +86,6 @@ func (s *testScatterRegionSuite) checkOperator(op *operator.Operator, c *C) {
8686

8787
func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, useRules bool) {
8888
opt := mockoption.NewScheduleOptions()
89-
c.Assert(opt.SetAllStoresLimit(storelimit.AddPeer, 99999), IsNil)
90-
c.Assert(opt.SetAllStoresLimit(storelimit.RemovePeer, 99999), IsNil)
9189
tc := mockcluster.NewCluster(opt)
9290

9391
// Add ordinary stores.
@@ -128,8 +126,6 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use
128126

129127
func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpecialStores, numRegions uint64) {
130128
opt := mockoption.NewScheduleOptions()
131-
c.Assert(opt.SetAllStoresLimit(storelimit.AddPeer, 99999), IsNil)
132-
c.Assert(opt.SetAllStoresLimit(storelimit.RemovePeer, 99999), IsNil)
133129
tc := mockcluster.NewCluster(opt)
134130

135131
// Add ordinary stores.
@@ -217,10 +213,8 @@ func (s *testScatterRegionSuite) TestStoreLimit(c *C) {
217213
}
218214
}
219215

220-
func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
216+
func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
221217
opt := mockoption.NewScheduleOptions()
222-
c.Assert(opt.SetAllStoresLimit(storelimit.AddPeer, 99999), IsNil)
223-
c.Assert(opt.SetAllStoresLimit(storelimit.RemovePeer, 99999), IsNil)
224218
tc := mockcluster.NewCluster(opt)
225219
// Add 5 stores.
226220
for i := uint64(1); i <= 5; i++ {
@@ -245,52 +239,49 @@ func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
245239
},
246240
}
247241

242+
// We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency.
248243
for _, testcase := range testcases {
249244
c.Logf(testcase.name)
250245
ctx, cancel := context.WithCancel(context.Background())
251246
scatterer := NewRegionScatterer(ctx, tc)
252247
regionID := 1
253248
for i := 0; i < 100; i++ {
254249
for j := 0; j < testcase.groupCount; j++ {
255-
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
250+
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
256251
fmt.Sprintf("group-%v", j))
257-
c.Assert(err, IsNil)
258252
regionID++
259253
}
260-
// insert region with no group
261-
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), "")
262-
c.Assert(err, IsNil)
263-
regionID++
264254
}
265255

266-
for i := 0; i < testcase.groupCount; i++ {
267-
// comparing the leader distribution
268-
group := fmt.Sprintf("group-%v", i)
269-
max := uint64(0)
270-
min := uint64(math.MaxUint64)
271-
groupDistribution, exist := scatterer.ordinaryEngine.selectedLeader.GetGroupDistribution(group)
272-
c.Assert(exist, Equals, true)
273-
for _, count := range groupDistribution {
274-
if count > max {
275-
max = count
276-
}
277-
if count < min {
278-
min = count
256+
checker := func(ss *selectedStores, expected uint64, delta float64) {
257+
for i := 0; i < testcase.groupCount; i++ {
258+
// comparing the leader distribution
259+
group := fmt.Sprintf("group-%v", i)
260+
max := uint64(0)
261+
min := uint64(math.MaxUint64)
262+
groupDistribution, _ := ss.groupDistribution.Get(group)
263+
for _, count := range groupDistribution.(map[uint64]uint64) {
264+
if count > max {
265+
max = count
266+
}
267+
if count < min {
268+
min = count
269+
}
279270
}
271+
c.Assert(math.Abs(float64(max)-float64(expected)), LessEqual, delta)
272+
c.Assert(math.Abs(float64(min)-float64(expected)), LessEqual, delta)
280273
}
281-
// 100 regions divided 5 stores, each store expected to have about 20 regions.
282-
c.Assert(min, LessEqual, uint64(20))
283-
c.Assert(max, GreaterEqual, uint64(20))
284-
c.Assert(max-min, LessEqual, uint64(5))
285274
}
275+
// For leader, we expect each store have about 20 leader for each group
276+
checker(scatterer.ordinaryEngine.selectedLeader, 20, 5)
277+
// For peer, we expect each store have about 50 peers for each group
278+
checker(scatterer.ordinaryEngine.selectedPeer, 50, 15)
286279
cancel()
287280
}
288281
}
289282

290283
func (s *testScatterRegionSuite) TestScattersGroup(c *C) {
291284
opt := mockoption.NewScheduleOptions()
292-
c.Assert(opt.SetAllStoresLimit(storelimit.AddPeer, 99999), IsNil)
293-
c.Assert(opt.SetAllStoresLimit(storelimit.RemovePeer, 99999), IsNil)
294285
tc := mockcluster.NewCluster(opt)
295286
// Add 5 stores.
296287
for i := uint64(1); i <= 5; i++ {
@@ -370,3 +361,38 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) {
370361
_, ok = stores.GetGroupDistribution("testgroup")
371362
c.Assert(ok, Equals, false)
372363
}
364+
365+
// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
366+
// After scatter, the distribution for the whole cluster should be well.
367+
func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
368+
opt := mockoption.NewScheduleOptions()
369+
tc := mockcluster.NewCluster(opt)
370+
// Add 6 stores.
371+
storeCount := 6
372+
for i := uint64(1); i <= uint64(storeCount); i++ {
373+
tc.AddRegionStore(i, 0)
374+
}
375+
ctx, cancel := context.WithCancel(context.Background())
376+
defer cancel()
377+
scatterer := NewRegionScatterer(ctx, tc)
378+
regionCount := 50
379+
for i := 1; i <= regionCount; i++ {
380+
p := rand.Perm(storeCount)
381+
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
382+
}
383+
check := func(ss *selectedStores) {
384+
max := uint64(0)
385+
min := uint64(math.MaxUint64)
386+
for i := uint64(1); i <= uint64(storeCount); i++ {
387+
count := ss.totalCountByStore(i)
388+
if count > max {
389+
max = count
390+
}
391+
if count < min {
392+
min = count
393+
}
394+
}
395+
c.Assert(max-min, LessEqual, uint64(2))
396+
}
397+
check(scatterer.ordinaryEngine.selectedPeer)
398+
}

0 commit comments

Comments
 (0)