Skip to content

Commit cb1e1e6

Browse files
authored
ddl: release scan chunks earlier to cap the memory usage (pingcap#67452)
close pingcap#64911
1 parent 8b8bccf commit cb1e1e6

File tree

2 files changed

+82
-15
lines changed

2 files changed

+82
-15
lines changed

pkg/ddl/backfilling_operators.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,16 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
532532
idxResults []IndexRecordChunk
533533
execDetails kvutil.ExecDetails
534534
)
535+
// Local ingest may trigger partial import/reset while the scan transaction is
536+
// still open, so only the global-sort path can stream results immediately.
537+
enableStreaming := w.reorgMeta.UseCloudStorage
538+
sendResult := func(idxResult IndexRecordChunk) {
539+
sender(idxResult)
540+
if w.cpOp != nil {
541+
w.cpOp.UpdateChunk(task.ID, int(idxResult.tableScanRowCount), idxResult.Done)
542+
}
543+
w.totalCount.Add(idxResult.tableScanRowCount)
544+
}
535545
var scanCtx context.Context = w.ctx
536546
if scanCtx.Value(kvutil.ExecDetailsKey) == nil {
537547
scanCtx = context.WithValue(w.ctx, kvutil.ExecDetailsKey, &execDetails)
@@ -565,6 +575,11 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
565575
failpoint.InjectCall("beforeGetChunk")
566576
srcChk := w.getChunk()
567577
done, err = fetchTableScanResult(scanCtx, w.copCtx.GetBase(), rs, srcChk)
578+
failpoint.Inject("mockScanRecordPartialError", func(val failpoint.Value) {
579+
if shouldFail, _ := val.(bool); shouldFail {
580+
err = errors.New("mock partial scan error")
581+
}
582+
})
568583
if err != nil || scanCtx.Err() != nil {
569584
w.recycleChunk(srcChk)
570585
terror.Call(rs.Close)
@@ -574,19 +589,21 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
574589
execDetails = kvutil.ExecDetails{}
575590

576591
_, tableScanRowCount := distsqlCtx.RuntimeStatsColl.GetCopCountAndRows(tableScanCopID)
577-
idxResults = append(idxResults, IndexRecordChunk{ID: task.ID, Chunk: srcChk, Done: done, ctx: w.ctx, tableScanRowCount: tableScanRowCount - lastTableScanRowCount, conditionPushed: conditionPushed})
592+
idxResult := IndexRecordChunk{ID: task.ID, Chunk: srcChk, Done: done, ctx: w.ctx, tableScanRowCount: tableScanRowCount - lastTableScanRowCount, conditionPushed: conditionPushed}
578593
lastTableScanRowCount = tableScanRowCount
594+
if enableStreaming {
595+
sendResult(idxResult)
596+
} else {
597+
idxResults = append(idxResults, idxResult)
598+
}
579599
}
580600
return rs.Close()
581601
})
582602

583-
for i, idxResult := range idxResults {
584-
sender(idxResult)
585-
if w.cpOp != nil {
586-
done := i == len(idxResults)-1
587-
w.cpOp.UpdateChunk(task.ID, int(idxResult.tableScanRowCount), done)
603+
if !enableStreaming {
604+
for _, idxResult := range idxResults {
605+
sendResult(idxResult)
588606
}
589-
w.totalCount.Add(idxResult.tableScanRowCount)
590607
}
591608

592609
return err

tests/realtikvtest/addindextest4/integration_test.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,21 +443,28 @@ func TestCancelAfterReorgTimeout(t *testing.T) {
443443
}, 10*time.Second, 300*time.Millisecond)
444444
}
445445

446+
// setupAddIndexClassicVars enables fast-reorg and toggles the dist-task mode
447+
// in classic kernel. It is a no-op in next-gen.
448+
func setupAddIndexClassicVars(tk *testkit.TestKit, distTaskOn bool) {
449+
if !kerneltype.IsClassic() {
450+
return
451+
}
452+
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1")
453+
if distTaskOn {
454+
tk.MustExec("set global tidb_enable_dist_task = 1")
455+
} else {
456+
tk.MustExec("set global tidb_enable_dist_task = 0")
457+
}
458+
}
459+
446460
func TestAddIndexResumesFromCheckpointAfterPartialImport(t *testing.T) {
447461
runCase := func(t *testing.T, distTaskOn bool) {
448462
store := realtikvtest.CreateMockStoreAndSetup(t)
449463

450464
tk := testkit.NewTestKit(t, store)
451465
tk.MustExec("use test")
452466

453-
if kerneltype.IsClassic() {
454-
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1")
455-
if distTaskOn {
456-
tk.MustExec("set global tidb_enable_dist_task = 1")
457-
} else {
458-
tk.MustExec("set global tidb_enable_dist_task = 0")
459-
}
460-
}
467+
setupAddIndexClassicVars(tk, distTaskOn)
461468
ingest.ForceSyncFlagForTest.Store(true)
462469

463470
tk.MustExec("drop table if exists t")
@@ -489,3 +496,46 @@ func TestAddIndexResumesFromCheckpointAfterPartialImport(t *testing.T) {
489496
t.Run("dist_task_off", func(t *testing.T) { runCase(t, false) })
490497
t.Run("dist_task_on", func(t *testing.T) { runCase(t, true) })
491498
}
499+
500+
func TestAddIndexResumesFromCheckpointAfterPartialScan(t *testing.T) {
501+
runCase := func(t *testing.T, distTaskOn bool) {
502+
store := realtikvtest.CreateMockStoreAndSetup(t)
503+
504+
tk := testkit.NewTestKit(t, store)
505+
tk.MustExec("use test")
506+
507+
setupAddIndexClassicVars(tk, distTaskOn)
508+
ingest.ForceSyncFlagForTest.Store(true)
509+
510+
tk.MustExec("drop table if exists t")
511+
tk.MustExec("create table t (a bigint primary key, b bigint)")
512+
for i := range 2000 {
513+
tk.MustExec("insert into t values (?, ?)", i, i)
514+
}
515+
516+
// Let the first chunk fetch in scanRecords succeed, then inject an
517+
// error on the second fetch. The local-ingest path buffers scan
518+
// results until the scan finishes, so this exercises the partial-scan
519+
// checkpoint path where the retry must restart without a flushed chunk
520+
// advancing the checkpoint. Subsequent fetches (in the retry) see the
521+
// failpoint as already consumed and proceed normally.
522+
testfailpoint.Enable(t,
523+
"github.com/pingcap/tidb/pkg/ddl/mockScanRecordPartialError",
524+
"1*return(false)->1*return(true)")
525+
526+
tk.MustExec("alter table t add unique index idx_b(b)")
527+
528+
tblCntStr := tk.MustQuery("select count(*) from t").Rows()[0][0].(string)
529+
idxCntStr := tk.MustQuery("select count(*) from t use index(idx_b)").Rows()[0][0].(string)
530+
tblCnt, err := strconv.Atoi(tblCntStr)
531+
require.NoError(t, err)
532+
idxCnt, err := strconv.Atoi(idxCntStr)
533+
require.NoError(t, err)
534+
require.Equal(t, tblCnt, idxCnt)
535+
536+
tk.MustExec("admin check table t")
537+
}
538+
539+
t.Run("dist_task_off", func(t *testing.T) { runCase(t, false) })
540+
t.Run("dist_task_on", func(t *testing.T) { runCase(t, true) })
541+
}

0 commit comments

Comments
 (0)