Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"golang.org/x/time/rate"
)

// 64 is chosen based on nextgen import shape: subtask size is ~100 GiB and each region is ~1 GiB,
// so split key count is often around 100. A threshold of 100 may miss coarse split/scatter on boundary
// cases, while 64 triggers early load spreading and still avoids this stage for smaller tasks.
const coarseGrainedSplitKeysThreshold = 64

// splitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *Backend) splitAndScatterRegionInBatches(
Expand All @@ -50,6 +55,36 @@ func (local *Backend) splitAndScatterRegionInBatches(
limiter = rate.NewLimiter(rate.Limit(eventLimit), burstPerSec*ratePerSecMultiplier)
batchCnt = min(batchCnt, burstPerSec)
}
if len(splitKeys) > coarseGrainedSplitKeysThreshold {
// Split and scatter a coarse-grained set of keys first to spread regions
// before the fine-grained split stage.
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
if err := local.splitAndScatterRegionInBatchesWithLimiter(ctx, coarseGrainedSplitKeys, batchCnt, limiter); err != nil {
return errors.Trace(err)
}
}
return local.splitAndScatterRegionInBatchesWithLimiter(ctx, splitKeys, batchCnt, limiter)
}

func getCoarseGrainedSplitKeys(splitKeys [][]byte) [][]byte {
sqrtCnt := int(math.Sqrt(float64(len(splitKeys))))
coarseGrainedSplitKeys := make([][]byte, 0, sqrtCnt+1)
i := 0
for ; i < len(splitKeys); i += sqrtCnt {
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[i])
}
if i-sqrtCnt != len(splitKeys)-1 {
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[len(splitKeys)-1])
}
return coarseGrainedSplitKeys
}

func (local *Backend) splitAndScatterRegionInBatchesWithLimiter(
ctx context.Context,
splitKeys [][]byte,
batchCnt int,
limiter *rate.Limiter,
) error {
for i := 0; i < len(splitKeys); i += batchCnt {
batch := splitKeys[i:]
if len(batch) > batchCnt {
Expand Down
114 changes: 107 additions & 7 deletions pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (

type testSplitClient struct {
split.SplitClient
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*split.RegionInfo
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
nextRegionID uint64
splitCount atomic.Int32
hook clientHook
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*split.RegionInfo
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
nextRegionID uint64
splitCount atomic.Int32
splitKeysAndScatterF func(context.Context, [][]byte, int32) ([]*split.RegionInfo, error)
hook clientHook
}

func newTestSplitClient(
Expand Down Expand Up @@ -95,6 +96,18 @@ func (c *testSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*
return region, nil
}

func (c *testSplitClient) SplitKeysAndScatter(ctx context.Context, splitKeys [][]byte) ([]*split.RegionInfo, error) {
cnt := c.splitCount.Inc()
if c.splitKeysAndScatterF != nil {
return c.splitKeysAndScatterF(ctx, splitKeys, cnt)
}
return []*split.RegionInfo{
{
Region: &metapb.Region{Id: 1},
},
}, nil
}

func (c *testSplitClient) SplitWaitAndScatter(ctx context.Context, region *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -363,3 +376,90 @@ func TestTuneStoreWriteLimiter(t *testing.T) {
defer cancel1()
testLimiter(ctx1, 200)
}

func TestSplitAndScatterRegionInBatchesTwoLevel(t *testing.T) {
makeSplitKeys := func(n int) [][]byte {
keys := make([][]byte, n)
for i := 0; i < n; i++ {
keys[i] = []byte{byte(i >> 8), byte(i)}
}
return keys
}

t.Run("large split keys trigger coarse and fine layers", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
require.NoError(t, err)
// 121 keys => coarse pass(11 keys, 1 batch) + fine pass(121 keys, 3 batches) = 4 calls.
require.Equal(t, int32(4), splitCli.splitCount.Load())
})

t.Run("small split keys only use fine layer", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(coarseGrainedSplitKeysThreshold), 50, 0)
require.NoError(t, err)
require.Equal(t, int32(2), splitCli.splitCount.Load())
})

t.Run("coarse layer error returns immediately", func(t *testing.T) {
splitCli := &testSplitClient{
splitKeysAndScatterF: func(_ context.Context, _ [][]byte, splitCnt int32) ([]*split.RegionInfo, error) {
if splitCnt == 1 {
return nil, errors.New("mock split error")
}
return []*split.RegionInfo{
{
Region: &metapb.Region{Id: 1},
},
}, nil
},
}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
require.ErrorContains(t, err, "mock split error")
require.Equal(t, int32(1), splitCli.splitCount.Load())
})

t.Run("limiter is still enforced after restoring two levels", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// maxCntPerSec=0.5 => burstPerSec=1, so after first batch, limiter blocks
// and should hit context deadline before entering fine-grained stage.
err := local.splitAndScatterRegionInBatches(ctx, makeSplitKeys(121), 50, 0.5)
require.ErrorContains(t, err, "context deadline")
require.Equal(t, int32(1), splitCli.splitCount.Load())
})
}

func TestGetCoarseGrainedSplitKeys(t *testing.T) {
makeSplitKeys := func(n int) [][]byte {
keys := make([][]byte, n)
for i := 0; i < n; i++ {
keys[i] = []byte{byte(i >> 8), byte(i)}
}
return keys
}

t.Run("last key selected in loop is not appended twice", func(t *testing.T) {
splitKeys := makeSplitKeys(122)
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
lastKey := splitKeys[len(splitKeys)-1]

lastKeyCount := 0
for _, key := range coarseGrainedSplitKeys {
if bytes.Equal(key, lastKey) {
lastKeyCount++
}
}

require.Equal(t, 1, lastKeyCount)
})
}