Skip to content

Commit feed1c7

Browse files
committed
importsdk, importer: fix sampled source size in import estimate
1 parent a5ad421 commit feed1c7

File tree

2 files changed

+83
-16
lines changed

2 files changed

+83
-16
lines changed

pkg/executor/importer/sampler.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
plannercore "github.com/pingcap/tidb/pkg/planner/core"
4040
"github.com/pingcap/tidb/pkg/table"
4141
"github.com/pingcap/tidb/pkg/table/tables"
42-
"github.com/pingcap/tidb/pkg/types"
4342
contextutil "github.com/pingcap/tidb/pkg/util/context"
4443
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
4544
"go.uber.org/zap"
@@ -352,27 +351,30 @@ func (s *kvSizeSampler) sampleOneFile(
352351
}()
353352

354353
var (
355-
count int
356-
readRowCache []types.Datum
357-
readFn = parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey())
358-
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
354+
count int
355+
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
359356
)
360357
for count < maxRowCount {
361-
row, closed, readErr := readFn(ctx, readRowCache)
362-
if readErr != nil {
363-
return 0, 0, 0, readErr
364-
}
365-
if closed {
358+
startPos, _ := parser.Pos()
359+
if s.cfg.Format != DataFormatParquet && startPos >= chunk.Chunk.EndOffset {
366360
break
367361
}
368-
readRowCache = row.row
369-
if rowDelta := row.endOffset - row.startPos; rowDelta > 0 {
370-
sourceSize += rowDelta
362+
363+
readErr := parser.ReadRow()
364+
if readErr != nil {
365+
if errors.Cause(readErr) == io.EOF {
366+
break
367+
}
368+
return 0, 0, 0, common.ErrEncodeKV.Wrap(readErr).GenWithStackByArgs(chunk.GetKey(), startPos)
371369
}
372-
kvs, encodeErr := encoder.Encode(row.row, row.rowID)
373-
row.resetFn()
370+
371+
lastRow := parser.LastRow()
372+
sourceSize += s.sampledRowSourceSize(parser, startPos, lastRow)
373+
374+
kvs, encodeErr := encoder.Encode(lastRow.Row, lastRow.RowID)
375+
parser.RecycleRow(lastRow)
374376
if encodeErr != nil {
375-
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), row.startPos)
377+
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), startPos)
376378
}
377379
if _, err = kvBatch.add(kvs); err != nil {
378380
return 0, 0, 0, err
@@ -382,3 +384,17 @@ func (s *kvSizeSampler) sampleOneFile(
382384
dataKVSize, indexKVSize = kvBatch.groupChecksum.DataAndIndexSumSize()
383385
return sourceSize, dataKVSize, indexKVSize, nil
384386
}
387+
388+
func (s *kvSizeSampler) sampledRowSourceSize(parser mydump.Parser, startPos int64, row mydump.Row) int64 {
389+
// Sampling needs per-row source bytes, not buffered reader progress.
390+
// SQL/CSV parsers expose byte offsets through Pos(), while parquet Pos()
391+
// is row-count based and must fall back to the row-size estimate.
392+
if s.cfg.Format == DataFormatParquet {
393+
return int64(row.Length)
394+
}
395+
endPos, _ := parser.Pos()
396+
if rowDelta := endPos - startPos; rowDelta > 0 {
397+
return rowDelta
398+
}
399+
return int64(row.Length)
400+
}

pkg/executor/importer/sampler_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,57 @@ func TestSampleIndexSizeRatio(t *testing.T) {
238238
require.Error(t, err)
239239
require.True(t, reader.closed)
240240
})
241+
242+
t.Run("sql_source_size_uses_consumed_bytes_not_buffered_progress", func(t *testing.T) {
243+
dir := t.TempDir()
244+
var fileSB strings.Builder
245+
fileSB.WriteString("INSERT INTO t VALUES\n")
246+
for i := 0; i < 20; i++ {
247+
_, err := fmt.Fprintf(&fileSB, "(%d,'v%d','w%d','x%d')", i, i, i, i)
248+
require.NoError(t, err)
249+
if i < 19 {
250+
fileSB.WriteString(",\n")
251+
continue
252+
}
253+
fileSB.WriteString(";\n")
254+
}
255+
content := fileSB.String()
256+
require.NoError(t, os.WriteFile(filepath.Join(dir, "001.sql"), []byte(content), 0o644))
257+
258+
p := parser.New()
259+
node, err := p.ParseOneStmt(`create table t (a int, b text, c text, d text, index idx(a));`, "", "")
260+
require.NoError(t, err)
261+
sctx := utilmock.NewContext()
262+
tblInfo, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
263+
require.NoError(t, err)
264+
tblInfo.State = model.StatePublic
265+
table := tables.MockTableFromMeta(tblInfo)
266+
267+
ctrl, err := NewLoadDataController(&Plan{
268+
Path: filepath.Join(dir, "*.sql"),
269+
Format: DataFormatSQL,
270+
InImportInto: true,
271+
}, table, &ASTArgs{})
272+
require.NoError(t, err)
273+
ctrl.logger = zap.Must(zap.NewDevelopment())
274+
ctx := context.Background()
275+
require.NoError(t, ctrl.InitDataFiles(ctx))
276+
277+
sampled, err := SampleFileImportKVSize(
278+
ctx,
279+
ctrl.buildKVSizeSampleConfig(),
280+
table,
281+
ctrl.dataStore,
282+
ctrl.dataFiles,
283+
nil,
284+
ctrl.logger,
285+
)
286+
require.NoError(t, err)
287+
require.Positive(t, sampled.SourceSize)
288+
require.Positive(t, sampled.TotalKVSize())
289+
require.Greater(t, sampled.SourceSize, int64(len(content)/2))
290+
require.Less(t, sampled.SourceSize, int64(len(content)*2))
291+
})
241292
}
242293
func TestSampleIndexSizeRatioVeryLongRows(t *testing.T) {
243294
simpleTbl := `create table t (a int, b text, c text, d text, index idx(a));`

0 commit comments

Comments
 (0)