Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@
"golang.org/x/time/rate"
)

<<<<<<< HEAD

Check failure on line 38 in pkg/lightning/backend/local/localhelper.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

syntax error: non-declaration statement outside function body
=======
// 64 is chosen based on nextgen import shape: subtask size is ~100 GiB and each region is ~1 GiB,
// so split key count is often around 100. A threshold of 100 may miss coarse split/scatter on boundary
// cases, while 64 triggers early load spreading and still avoids this stage for smaller tasks.
const coarseGrainedSplitKeysThreshold = 64

var (

// the base exponential backoff time
// the variable is only changed in unit test for running test faster.
splitRegionBaseBackOffTime = time.Second
)

>>>>>>> 4ffecda589a (lightning: restore two-level split/scatter while keeping limiter behavior (#66312))

Check failure on line 52 in pkg/lightning/backend/local/localhelper.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

invalid character U+0023 '#'

Check failure on line 52 in pkg/lightning/backend/local/localhelper.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

syntax error: non-declaration statement outside function body
// splitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *Backend) splitAndScatterRegionInBatches(
Expand All @@ -50,6 +65,36 @@
limiter = rate.NewLimiter(rate.Limit(eventLimit), burstPerSec*ratePerSecMultiplier)
batchCnt = min(batchCnt, burstPerSec)
}
if len(splitKeys) > coarseGrainedSplitKeysThreshold {
// Split and scatter a coarse-grained set of keys first to spread regions
// before the fine-grained split stage.
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
if err := local.splitAndScatterRegionInBatchesWithLimiter(ctx, coarseGrainedSplitKeys, batchCnt, limiter); err != nil {
return errors.Trace(err)
}
}
return local.splitAndScatterRegionInBatchesWithLimiter(ctx, splitKeys, batchCnt, limiter)
}

func getCoarseGrainedSplitKeys(splitKeys [][]byte) [][]byte {
sqrtCnt := int(math.Sqrt(float64(len(splitKeys))))
coarseGrainedSplitKeys := make([][]byte, 0, sqrtCnt+1)
i := 0
for ; i < len(splitKeys); i += sqrtCnt {
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[i])
}
if i-sqrtCnt != len(splitKeys)-1 {
coarseGrainedSplitKeys = append(coarseGrainedSplitKeys, splitKeys[len(splitKeys)-1])
}
return coarseGrainedSplitKeys
}

func (local *Backend) splitAndScatterRegionInBatchesWithLimiter(
ctx context.Context,
splitKeys [][]byte,
batchCnt int,
limiter *rate.Limiter,
) error {
for i := 0; i < len(splitKeys); i += batchCnt {
batch := splitKeys[i:]
if len(batch) > batchCnt {
Expand Down
115 changes: 108 additions & 7 deletions pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (

type testSplitClient struct {
split.SplitClient
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*split.RegionInfo
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
nextRegionID uint64
splitCount atomic.Int32
hook clientHook
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*split.RegionInfo
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
nextRegionID uint64
splitCount atomic.Int32
splitKeysAndScatterF func(context.Context, [][]byte, int32) ([]*split.RegionInfo, error)
hook clientHook
}

func newTestSplitClient(
Expand Down Expand Up @@ -95,6 +96,18 @@ func (c *testSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*
return region, nil
}

func (c *testSplitClient) SplitKeysAndScatter(ctx context.Context, splitKeys [][]byte) ([]*split.RegionInfo, error) {
cnt := c.splitCount.Inc()
if c.splitKeysAndScatterF != nil {
return c.splitKeysAndScatterF(ctx, splitKeys, cnt)
}
return []*split.RegionInfo{
{
Region: &metapb.Region{Id: 1},
},
}, nil
}

func (c *testSplitClient) SplitWaitAndScatter(ctx context.Context, region *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -327,6 +340,7 @@ func TestStoreWriteLimiter(t *testing.T) {
wg.Wait()
}

<<<<<<< HEAD
func TestTuneStoreWriteLimiter(t *testing.T) {
limiter := newStoreWriteLimiter(100)
testLimiter := func(ctx context.Context, maxT int) {
Expand Down Expand Up @@ -362,4 +376,91 @@ func TestTuneStoreWriteLimiter(t *testing.T) {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*2)
defer cancel1()
testLimiter(ctx1, 200)
=======
func TestSplitAndScatterRegionInBatchesTwoLevel(t *testing.T) {
makeSplitKeys := func(n int) [][]byte {
keys := make([][]byte, n)
for i := 0; i < n; i++ {
keys[i] = []byte{byte(i >> 8), byte(i)}
}
return keys
}

t.Run("large split keys trigger coarse and fine layers", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
require.NoError(t, err)
// 121 keys => coarse pass(11 keys, 1 batch) + fine pass(121 keys, 3 batches) = 4 calls.
require.Equal(t, int32(4), splitCli.splitCount.Load())
})

t.Run("small split keys only use fine layer", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(coarseGrainedSplitKeysThreshold), 50, 0)
require.NoError(t, err)
require.Equal(t, int32(2), splitCli.splitCount.Load())
})

t.Run("coarse layer error returns immediately", func(t *testing.T) {
splitCli := &testSplitClient{
splitKeysAndScatterF: func(_ context.Context, _ [][]byte, splitCnt int32) ([]*split.RegionInfo, error) {
if splitCnt == 1 {
return nil, errors.New("mock split error")
}
return []*split.RegionInfo{
{
Region: &metapb.Region{Id: 1},
},
}, nil
},
}
local := &Backend{splitCli: splitCli}

err := local.splitAndScatterRegionInBatches(context.Background(), makeSplitKeys(121), 50, 0)
require.ErrorContains(t, err, "mock split error")
require.Equal(t, int32(1), splitCli.splitCount.Load())
})

t.Run("limiter is still enforced after restoring two levels", func(t *testing.T) {
splitCli := &testSplitClient{}
local := &Backend{splitCli: splitCli}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// maxCntPerSec=0.5 => burstPerSec=1, so after first batch, limiter blocks
// and should hit context deadline before entering fine-grained stage.
err := local.splitAndScatterRegionInBatches(ctx, makeSplitKeys(121), 50, 0.5)
require.ErrorContains(t, err, "context deadline")
require.Equal(t, int32(1), splitCli.splitCount.Load())
})
}

func TestGetCoarseGrainedSplitKeys(t *testing.T) {
makeSplitKeys := func(n int) [][]byte {
keys := make([][]byte, n)
for i := 0; i < n; i++ {
keys[i] = []byte{byte(i >> 8), byte(i)}
}
return keys
}

t.Run("last key selected in loop is not appended twice", func(t *testing.T) {
splitKeys := makeSplitKeys(122)
coarseGrainedSplitKeys := getCoarseGrainedSplitKeys(splitKeys)
lastKey := splitKeys[len(splitKeys)-1]

lastKeyCount := 0
for _, key := range coarseGrainedSplitKeys {
if bytes.Equal(key, lastKey) {
lastKeyCount++
}
}

require.Equal(t, 1, lastKeyCount)
})
>>>>>>> 4ffecda589a (lightning: restore two-level split/scatter while keeping limiter behavior (#66312))
}
Loading