Skip to content

Commit 8bbcdd8

Browse files
qw4990jackysp
authored andcommitted
executor: control Chunk size for TableReader&IndexReader&IndexLookup (pingcap#10169)
1 parent 8295bc0 commit 8bbcdd8

File tree

5 files changed

+296
-17
lines changed

5 files changed

+296
-17
lines changed

executor/builder.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,7 +1622,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
16221622
filter: outerFilter,
16231623
},
16241624
innerCtx: innerCtx{
1625-
readerBuilder: &dataReaderBuilder{innerPlan, b},
1625+
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
16261626
rowTypes: innerTypes,
16271627
},
16281628
workerWg: new(sync.WaitGroup),
@@ -1851,6 +1851,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
18511851
type dataReaderBuilder struct {
18521852
plannercore.Plan
18531853
*executorBuilder
1854+
1855+
selectResultHook // for testing
18541856
}
18551857

18561858
func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum,
@@ -1892,7 +1894,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
18921894
return nil, errors.Trace(err)
18931895
}
18941896
e.resultHandler = &tableResultHandler{}
1895-
result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback)
1897+
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback)
18961898
if err != nil {
18971899
return nil, errors.Trace(err)
18981900
}
@@ -1921,11 +1923,11 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
19211923
if err != nil {
19221924
return nil, errors.Trace(err)
19231925
}
1924-
kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
1926+
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
19251927
if err != nil {
19261928
return nil, errors.Trace(err)
19271929
}
1928-
err = e.open(ctx, kvRanges)
1930+
err = e.open(ctx)
19291931
return e, errors.Trace(err)
19301932
}
19311933

executor/distsql.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ type IndexReaderExecutor struct {
233233
idxCols []*expression.Column
234234
colLens []int
235235
plans []plannercore.PhysicalPlan
236+
237+
selectResultHook // for testing
236238
}
237239

238240
// Close clears all resources hold by current object.
@@ -294,7 +296,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
294296
e.feedback.Invalidate()
295297
return errors.Trace(err)
296298
}
297-
e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
299+
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
298300
if err != nil {
299301
e.feedback.Invalidate()
300302
return errors.Trace(err)
@@ -328,6 +330,9 @@ type IndexLookUpExecutor struct {
328330
tblWorkerWg sync.WaitGroup
329331
finished chan struct{}
330332

333+
kvRanges []kv.KeyRange
334+
workerStarted bool
335+
331336
resultCh chan *lookupTableTask
332337
resultCurr *lookupTableTask
333338
feedback *statistics.QueryFeedback
@@ -356,19 +361,19 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
356361
return errors.Trace(err)
357362
}
358363
}
359-
kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback)
364+
e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback)
360365
if err != nil {
361366
e.feedback.Invalidate()
362367
return errors.Trace(err)
363368
}
364-
err = e.open(ctx, kvRanges)
369+
err = e.open(ctx)
365370
if err != nil {
366371
e.feedback.Invalidate()
367372
}
368373
return errors.Trace(err)
369374
}
370375

371-
func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
376+
func (e *IndexLookUpExecutor) open(ctx context.Context) error {
372377
// We have to initialize "memTracker" and other execution resources in here
373378
// instead of in function "Open", because this "IndexLookUpExecutor" may be
374379
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
@@ -393,20 +398,22 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
393398
return errors.Trace(err)
394399
}
395400
}
401+
return nil
402+
}
396403

404+
func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
397405
// indexWorker will write to workCh and tableWorker will read from workCh,
398406
// so fetching index and getting table data can run concurrently.
399407
workCh := make(chan *lookupTableTask, 1)
400-
err = e.startIndexWorker(ctx, kvRanges, workCh)
401-
if err != nil {
408+
if err := e.startIndexWorker(ctx, e.kvRanges, workCh, initBatchSize); err != nil {
402409
return errors.Trace(err)
403410
}
404411
e.startTableWorker(ctx, workCh)
412+
e.workerStarted = true
405413
return nil
406414
}
407415

408-
// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
409-
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error {
416+
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error {
410417
var builder distsql.RequestBuilder
411418
kvReq, err := builder.SetKeyRanges(kvRanges).
412419
SetDAGRequest(e.dagPB).
@@ -425,11 +432,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
425432
}
426433
result.Fetch(ctx)
427434
worker := &indexWorker{
435+
idxLookup: e,
428436
workCh: workCh,
429437
finished: e.finished,
430438
resultCh: e.resultCh,
431439
keepOrder: e.keepOrder,
432-
batchSize: e.maxChunkSize,
440+
batchSize: initBatchSize,
433441
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
434442
maxChunkSize: e.maxChunkSize,
435443
}
@@ -503,7 +511,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
503511

504512
// Close implements Exec Close interface.
505513
func (e *IndexLookUpExecutor) Close() error {
506-
if e.finished == nil {
514+
if !e.workerStarted || e.finished == nil {
507515
return nil
508516
}
509517

@@ -515,6 +523,7 @@ func (e *IndexLookUpExecutor) Close() error {
515523
e.idxWorkerWg.Wait()
516524
e.tblWorkerWg.Wait()
517525
e.finished = nil
526+
e.workerStarted = false
518527
e.memTracker.Detach()
519528
e.memTracker = nil
520529
return nil
@@ -526,6 +535,11 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
526535
start := time.Now()
527536
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
528537
}
538+
if !e.workerStarted {
539+
if err := e.startWorkers(ctx, chk.RequiredRows()); err != nil {
540+
return errors.Trace(err)
541+
}
542+
}
529543
chk.Reset()
530544
for {
531545
resultTask, err := e.getResultTask()
@@ -538,7 +552,7 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
538552
for resultTask.cursor < len(resultTask.rows) {
539553
chk.AppendRow(resultTask.rows[resultTask.cursor])
540554
resultTask.cursor++
541-
if chk.NumRows() >= e.maxChunkSize {
555+
if chk.IsFull() {
542556
return nil
543557
}
544558
}
@@ -567,6 +581,7 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
567581

568582
// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
569583
type indexWorker struct {
584+
idxLookup *IndexLookUpExecutor
570585
workCh chan<- *lookupTableTask
571586
finished <-chan struct{}
572587
resultCh chan<- *lookupTableTask
@@ -599,7 +614,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
599614
}
600615
}
601616
}()
602-
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize)
617+
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
603618
for {
604619
handles, err := w.extractTaskHandles(ctx, chk, result)
605620
if err != nil {
@@ -628,6 +643,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
628643
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) {
629644
handles = make([]int64, 0, w.batchSize)
630645
for len(handles) < w.batchSize {
646+
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
631647
err = errors.Trace(idxResult.Next(ctx, chk))
632648
if err != nil {
633649
return handles, err

executor/executor_required_rows_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func defaultCtx() sessionctx.Context {
202202
ctx.GetSessionVars().MaxChunkSize = 1024
203203
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
204204
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery)
205+
ctx.GetSessionVars().SnapshotTS = uint64(1)
205206
return ctx
206207
}
207208

executor/table_reader.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ import (
1919
"github.com/pingcap/errors"
2020
"github.com/pingcap/parser/model"
2121
"github.com/pingcap/tidb/distsql"
22+
"github.com/pingcap/tidb/kv"
2223
plannercore "github.com/pingcap/tidb/planner/core"
24+
"github.com/pingcap/tidb/sessionctx"
2325
"github.com/pingcap/tidb/statistics"
2426
"github.com/pingcap/tidb/table"
27+
"github.com/pingcap/tidb/types"
2528
"github.com/pingcap/tidb/util/chunk"
2629
"github.com/pingcap/tidb/util/ranger"
2730
tipb "github.com/pingcap/tipb/go-tipb"
@@ -31,6 +34,20 @@ import (
3134
// make sure `TableReaderExecutor` implements `Executor`.
3235
var _ Executor = &TableReaderExecutor{}
3336

37+
// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing.
38+
type selectResultHook struct {
39+
selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
40+
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error)
41+
}
42+
43+
func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
44+
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) {
45+
if sr.selectResultFunc == nil {
46+
return distsql.Select(ctx, sctx, kvReq, fieldTypes, fb)
47+
}
48+
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb)
49+
}
50+
3451
// TableReaderExecutor sends DAG request and reads table data from kv layer.
3552
type TableReaderExecutor struct {
3653
baseExecutor
@@ -55,6 +72,8 @@ type TableReaderExecutor struct {
5572
// corColInAccess tells whether there's correlated column in access conditions.
5673
corColInAccess bool
5774
plans []plannercore.PhysicalPlan
75+
76+
selectResultHook // for testing
5877
}
5978

6079
// Open initialzes necessary variables for using this executor.
@@ -132,7 +151,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
132151
if err != nil {
133152
return nil, errors.Trace(err)
134153
}
135-
result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
154+
result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
136155
if err != nil {
137156
return nil, errors.Trace(err)
138157
}

0 commit comments

Comments
 (0)