Skip to content

Commit b996066

Browse files
OliverS929ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#66312
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent 5f3cbfe commit b996066

File tree

2 files changed

+153
-7
lines changed

2 files changed

+153
-7
lines changed

pkg/lightning/backend/local/localhelper.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,21 @@ import (
3535
"golang.org/x/time/rate"
3636
)
3737

38+
<<<<<<< HEAD
39+
=======
40+
// 64 is chosen based on nextgen import shape: subtask size is ~100 GiB and each region is ~1 GiB,
41+
// so split key count is often around 100. A threshold of 100 may miss coarse split/scatter on boundary
42+
// cases, while 64 triggers early load spreading and still avoids this stage for smaller tasks.
43+
const coarseGrainedSplitKeysThreshold = 64
44+
45+
var (
46+
47+
// the base exponential backoff time
48+
// the variable is only changed in unit test for running test faster.
49+
splitRegionBaseBackOffTime = time.Second
50+
)
51+
52+
>>>>>>> 4ffecda589a (lightning: restore two-level split/scatter while keeping limiter behavior (#66312))
3853
// splitAndScatterRegionInBatches splits&scatter regions in batches.
3954
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
4055
func (local *Backend) splitAndScatterRegionInBatches(
@@ -50,6 +65,36 @@ func (local *Backend) splitAndScatterRegionInBatches(
5065
limiter = rate.NewLimiter(rate.Limit(eventLimit), burstPerSec*ratePerSecMultiplier)
5166
batchCnt = min(batchCnt, burstPerSec)
5267
}
68+
if len(splitKeys) > coarseGrainedSplitKeysThreshold {
69+
// Split and scatter a coarse-grained set of keys first to spread regions
70+
// before the fine-grained split stage.
71+
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
72+
if err := local.splitAndScatterRegionInBatchesWithLimiter(ctx, coarseGrainedSplitKeys, batchCnt, limiter); err != nil {
73+
return errors.Trace(err)
74+
}
75+
}
76+
return local.splitAndScatterRegionInBatchesWithLimiter(ctx, splitKeys, batchCnt, limiter)
77+
}
78+
79+
func getCoarseGrainedSplitKeys(splitKeys [][]byte) [][]byte {
80+
sqrtCnt := int(math.Sqrt(float64(len(splitKeys))))
81+
coarseGrainedSplitKeys := make([][]byte, 0, sqrtCnt+1)
82+
i := 0
83+
for ; i < len(splitKeys); i += sqrtCnt {
84+
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[i])
85+
}
86+
if i-sqrtCnt != len(splitKeys)-1 {
87+
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[len(splitKeys)-1])
88+
}
89+
return coarseGrainedSplitKeys
90+
}
91+
92+
func (local *Backend) splitAndScatterRegionInBatchesWithLimiter(
93+
ctx context.Context,
94+
splitKeys [][]byte,
95+
batchCnt int,
96+
limiter *rate.Limiter,
97+
) error {
5398
for i := 0; i < len(splitKeys); i += batchCnt {
5499
batch := splitKeys[i:]
55100
if len(batch) > batchCnt {

pkg/lightning/backend/local/localhelper_test.go

Lines changed: 108 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ import (
3636

3737
type testSplitClient struct {
3838
split.SplitClient
39-
mu sync.RWMutex
40-
stores map[uint64]*metapb.Store
41-
regions map[uint64]*split.RegionInfo
42-
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
43-
nextRegionID uint64
44-
splitCount atomic.Int32
45-
hook clientHook
39+
mu sync.RWMutex
40+
stores map[uint64]*metapb.Store
41+
regions map[uint64]*split.RegionInfo
42+
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
43+
nextRegionID uint64
44+
splitCount atomic.Int32
45+
splitKeysAndScatterF func(context.Context, [][]byte, int32) ([]*split.RegionInfo, error)
46+
hook clientHook
4647
}
4748

4849
func newTestSplitClient(
@@ -95,6 +96,18 @@ func (c *testSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*
9596
return region, nil
9697
}
9798

99+
func (c *testSplitClient) SplitKeysAndScatter(ctx context.Context, splitKeys [][]byte) ([]*split.RegionInfo, error) {
100+
cnt := c.splitCount.Inc()
101+
if c.splitKeysAndScatterF != nil {
102+
return c.splitKeysAndScatterF(ctx, splitKeys, cnt)
103+
}
104+
return []*split.RegionInfo{
105+
{
106+
Region: &metapb.Region{Id: 1},
107+
},
108+
}, nil
109+
}
110+
98111
func (c *testSplitClient) SplitWaitAndScatter(ctx context.Context, region *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
99112
c.mu.Lock()
100113
defer c.mu.Unlock()
@@ -327,6 +340,7 @@ func TestStoreWriteLimiter(t *testing.T) {
327340
wg.Wait()
328341
}
329342

343+
<<<<<<< HEAD
330344
func TestTuneStoreWriteLimiter(t *testing.T) {
331345
limiter := newStoreWriteLimiter(100)
332346
testLimiter := func(ctx context.Context, maxT int) {
@@ -362,4 +376,91 @@ func TestTuneStoreWriteLimiter(t *testing.T) {
362376
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*2)
363377
defer cancel1()
364378
testLimiter(ctx1, 200)
379+
=======
380+
func TestSplitAndScatterRegionInBatchesTwoLevel(t *testing.T) {
381+
makeSplitKeys := func(n int) [][]byte {
382+
keys := make([][]byte, n)
383+
for i := 0; i < n; i++ {
384+
keys[i] = []byte{byte(i >> 8), byte(i)}
385+
}
386+
return keys
387+
}
388+
389+
t.Run("large split keys trigger coarse and fine layers", func(t *testing.T) {
390+
splitCli := &testSplitClient{}
391+
local := &Backend{splitCli: splitCli}
392+
393+
err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
394+
require.NoError(t, err)
395+
// 121 keys => coarse pass(11 keys, 1 batch) + fine pass(121 keys, 3 batches) = 4 calls.
396+
require.Equal(t, int32(4), splitCli.splitCount.Load())
397+
})
398+
399+
t.Run("small split keys only use fine layer", func(t *testing.T) {
400+
splitCli := &testSplitClient{}
401+
local := &Backend{splitCli: splitCli}
402+
403+
err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(coarseGrainedSplitKeysThreshold), 50, 0)
404+
require.NoError(t, err)
405+
require.Equal(t, int32(2), splitCli.splitCount.Load())
406+
})
407+
408+
t.Run("coarse layer error returns immediately", func(t *testing.T) {
409+
splitCli := &testSplitClient{
410+
splitKeysAndScatterF: func(_ context.Context, _ [][]byte, splitCnt int32) ([]*split.RegionInfo, error) {
411+
if splitCnt == 1 {
412+
return nil, errors.New("mock split error")
413+
}
414+
return []*split.RegionInfo{
415+
{
416+
Region: &metapb.Region{Id: 1},
417+
},
418+
}, nil
419+
},
420+
}
421+
local := &Backend{splitCli: splitCli}
422+
423+
err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
424+
require.ErrorContains(t, err, "mock split error")
425+
require.Equal(t, int32(1), splitCli.splitCount.Load())
426+
})
427+
428+
t.Run("limiter is still enforced after restoring two levels", func(t *testing.T) {
429+
splitCli := &testSplitClient{}
430+
local := &Backend{splitCli: splitCli}
431+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
432+
defer cancel()
433+
434+
// maxCntPerSec=0.5 => burstPerSec=1, so after first batch, limiter blocks
435+
// and should hit context deadline before entering fine-grained stage.
436+
err := local.splitAndScatterRegionInBatches(ctx, makeSplitKeys(121), 50, 0.5)
437+
require.ErrorContains(t, err, "context deadline")
438+
require.Equal(t, int32(1), splitCli.splitCount.Load())
439+
})
440+
}
441+
442+
func TestGetCoarseGrainedSplitKeys(t *testing.T) {
443+
makeSplitKeys := func(n int) [][]byte {
444+
keys := make([][]byte, n)
445+
for i := 0; i < n; i++ {
446+
keys[i] = []byte{byte(i >> 8), byte(i)}
447+
}
448+
return keys
449+
}
450+
451+
t.Run("last key selected in loop is not appended twice", func(t *testing.T) {
452+
splitKeys := makeSplitKeys(122)
453+
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
454+
lastKey := splitKeys[len(splitKeys)-1]
455+
456+
lastKeyCount := 0
457+
for _, key := range coarseGrainedSplitKeys {
458+
if bytes.Equal(key, lastKey) {
459+
lastKeyCount++
460+
}
461+
}
462+
463+
require.Equal(t, 1, lastKeyCount)
464+
})
465+
>>>>>>> 4ffecda589a (lightning: restore two-level split/scatter while keeping limiter behavior (#66312))
365466
}

0 commit comments

Comments
 (0)