Skip to content

Commit 2098e75

Browse files
authored
importsdk, importer: fix sampled source size in import estimate (#67492)
ref #67240
1 parent 01ddf6c commit 2098e75

File tree

4 files changed

+167
-20
lines changed

4 files changed

+167
-20
lines changed

pkg/executor/importer/sampler.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
plannercore "github.com/pingcap/tidb/pkg/planner/core"
4040
"github.com/pingcap/tidb/pkg/table"
4141
"github.com/pingcap/tidb/pkg/table/tables"
42-
"github.com/pingcap/tidb/pkg/types"
4342
contextutil "github.com/pingcap/tidb/pkg/util/context"
4443
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
4544
"go.uber.org/zap"
@@ -352,27 +351,30 @@ func (s *kvSizeSampler) sampleOneFile(
352351
}()
353352

354353
var (
355-
count int
356-
readRowCache []types.Datum
357-
readFn = parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey())
358-
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
354+
count int
355+
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
359356
)
360357
for count < maxRowCount {
361-
row, closed, readErr := readFn(ctx, readRowCache)
362-
if readErr != nil {
363-
return 0, 0, 0, readErr
364-
}
365-
if closed {
358+
startPos, _ := parser.Pos()
359+
if s.cfg.Format != DataFormatParquet && startPos >= chunk.Chunk.EndOffset {
366360
break
367361
}
368-
readRowCache = row.row
369-
if rowDelta := row.endOffset - row.startPos; rowDelta > 0 {
370-
sourceSize += rowDelta
362+
363+
readErr := parser.ReadRow()
364+
if readErr != nil {
365+
if errors.Cause(readErr) == io.EOF {
366+
break
367+
}
368+
return 0, 0, 0, common.ErrEncodeKV.Wrap(readErr).GenWithStackByArgs(chunk.GetKey(), startPos)
371369
}
372-
kvs, encodeErr := encoder.Encode(row.row, row.rowID)
373-
row.resetFn()
370+
371+
lastRow := parser.LastRow()
372+
sourceSize += s.sampledRowSourceSize(parser, startPos, lastRow)
373+
374+
kvs, encodeErr := encoder.Encode(lastRow.Row, lastRow.RowID)
375+
parser.RecycleRow(lastRow)
374376
if encodeErr != nil {
375-
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), row.startPos)
377+
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), startPos)
376378
}
377379
if _, err = kvBatch.add(kvs); err != nil {
378380
return 0, 0, 0, err
@@ -382,3 +384,19 @@ func (s *kvSizeSampler) sampleOneFile(
382384
dataKVSize, indexKVSize = kvBatch.groupChecksum.DataAndIndexSumSize()
383385
return sourceSize, dataKVSize, indexKVSize, nil
384386
}
387+
388+
func (s *kvSizeSampler) sampledRowSourceSize(parser mydump.Parser, startPos int64, row mydump.Row) int64 {
389+
// Sampling needs per-row source bytes, not buffered reader progress.
390+
// SQL/CSV parsers expose byte offsets through Pos(), including compressed
391+
// input where Pos() tracks uncompressed bytes and stays aligned with the
392+
// RealSize-based source totals. Parquet Pos() is row-count based and must
393+
// fall back to the row-size estimate.
394+
if s.cfg.Format == DataFormatParquet {
395+
return int64(row.Length)
396+
}
397+
endPos, _ := parser.Pos()
398+
if rowDelta := endPos - startPos; rowDelta > 0 {
399+
return rowDelta
400+
}
401+
return int64(row.Length)
402+
}

pkg/executor/importer/sampler_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,57 @@ func TestSampleIndexSizeRatio(t *testing.T) {
238238
require.Error(t, err)
239239
require.True(t, reader.closed)
240240
})
241+
242+
t.Run("sql_source_size_uses_consumed_bytes_not_buffered_progress", func(t *testing.T) {
243+
dir := t.TempDir()
244+
var fileSB strings.Builder
245+
fileSB.WriteString("INSERT INTO t VALUES\n")
246+
for i := 0; i < 20; i++ {
247+
_, err := fmt.Fprintf(&fileSB, "(%d,'v%d','w%d','x%d')", i, i, i, i)
248+
require.NoError(t, err)
249+
if i < 19 {
250+
fileSB.WriteString(",\n")
251+
continue
252+
}
253+
fileSB.WriteString(";\n")
254+
}
255+
content := fileSB.String()
256+
require.NoError(t, os.WriteFile(filepath.Join(dir, "001.sql"), []byte(content), 0o644))
257+
258+
p := parser.New()
259+
node, err := p.ParseOneStmt(`create table t (a int, b text, c text, d text, index idx(a));`, "", "")
260+
require.NoError(t, err)
261+
sctx := utilmock.NewContext()
262+
tblInfo, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
263+
require.NoError(t, err)
264+
tblInfo.State = model.StatePublic
265+
table := tables.MockTableFromMeta(tblInfo)
266+
267+
ctrl, err := NewLoadDataController(&Plan{
268+
Path: filepath.Join(dir, "*.sql"),
269+
Format: DataFormatSQL,
270+
InImportInto: true,
271+
}, table, &ASTArgs{})
272+
require.NoError(t, err)
273+
ctrl.logger = zap.Must(zap.NewDevelopment())
274+
ctx := context.Background()
275+
require.NoError(t, ctrl.InitDataFiles(ctx))
276+
277+
sampled, err := SampleFileImportKVSize(
278+
ctx,
279+
ctrl.buildKVSizeSampleConfig(),
280+
table,
281+
ctrl.dataStore,
282+
ctrl.dataFiles,
283+
nil,
284+
ctrl.logger,
285+
)
286+
require.NoError(t, err)
287+
require.Positive(t, sampled.SourceSize)
288+
require.Positive(t, sampled.TotalKVSize())
289+
require.Greater(t, sampled.SourceSize, int64(len(content)/2))
290+
require.Less(t, sampled.SourceSize, int64(len(content)*2))
291+
})
241292
}
242293
func TestSampleIndexSizeRatioVeryLongRows(t *testing.T) {
243294
simpleTbl := `create table t (a int, b text, c text, d text, index idx(a));`

pkg/importsdk/file_scanner.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,13 @@ func (s *fileScanner) buildEstimateTableInfo(ctx context.Context, tblMeta *mydum
393393
}
394394
p := parser.New()
395395
p.SetSQLMode(s.config.sqlMode)
396-
stmt, err := p.ParseOneStmt(schemaSQL, "", "")
396+
stmts, _, err := p.ParseSQL(schemaSQL)
397397
if err != nil {
398398
return nil, errors.Trace(err)
399399
}
400-
createStmt, ok := stmt.(*ast.CreateTableStmt)
401-
if !ok {
402-
return nil, errors.Errorf("schema file %s does not contain a CREATE TABLE statement", tblMeta.SchemaFile.FileMeta.Path)
400+
createStmt, err := buildEstimateCreateTableStmt(stmts, tblMeta)
401+
if err != nil {
402+
return nil, err
403403
}
404404
tableInfo, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), createStmt)
405405
if err != nil {
@@ -408,6 +408,49 @@ func (s *fileScanner) buildEstimateTableInfo(ctx context.Context, tblMeta *mydum
408408
return tableInfo, nil
409409
}
410410

411+
func buildEstimateCreateTableStmt(stmts []ast.StmtNode, tblMeta *mydump.MDTableMeta) (*ast.CreateTableStmt, error) {
412+
var (
413+
firstCreateStmt *ast.CreateTableStmt
414+
createStmtCount int
415+
)
416+
for _, stmt := range stmts {
417+
createStmt, ok := stmt.(*ast.CreateTableStmt)
418+
if !ok {
419+
continue
420+
}
421+
if firstCreateStmt == nil {
422+
firstCreateStmt = createStmt
423+
}
424+
createStmtCount++
425+
if estimateCreateTableStmtMatchesMeta(createStmt, tblMeta) {
426+
return createStmt, nil
427+
}
428+
}
429+
if createStmtCount == 1 {
430+
return firstCreateStmt, nil
431+
}
432+
if createStmtCount == 0 {
433+
return nil, errors.Errorf("schema file %s does not contain a CREATE TABLE statement", tblMeta.SchemaFile.FileMeta.Path)
434+
}
435+
return nil, errors.Errorf(
436+
"schema file %s contains %d CREATE TABLE statements but none match table %s.%s",
437+
tblMeta.SchemaFile.FileMeta.Path,
438+
createStmtCount,
439+
tblMeta.DB,
440+
tblMeta.Name,
441+
)
442+
}
443+
444+
func estimateCreateTableStmtMatchesMeta(createStmt *ast.CreateTableStmt, tblMeta *mydump.MDTableMeta) bool {
445+
if !strings.EqualFold(createStmt.Table.Name.String(), tblMeta.Name) {
446+
return false
447+
}
448+
if createStmt.Table.Schema.String() == "" {
449+
return true
450+
}
451+
return strings.EqualFold(createStmt.Table.Schema.String(), tblMeta.DB)
452+
}
453+
411454
func sourceTypeToImportFormat(tp mydump.SourceType) (string, error) {
412455
switch tp {
413456
case mydump.SourceTypeCSV:

pkg/importsdk/file_scanner_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,41 @@ func TestFileScanner(t *testing.T) {
266266
require.Equal(t, estimate.Tables[0].SourceSize, estimate.TotalSourceSize)
267267
require.Equal(t, estimate.Tables[0].TiKVSize, estimate.TotalTiKVSize)
268268
})
269+
270+
t.Run("EstimateImportDataSizeMultiStatementSchema", func(t *testing.T) {
271+
estimateDir := t.TempDir()
272+
require.NoError(t, os.WriteFile(filepath.Join(estimateDir, "test_db-schema-create.sql"), []byte("CREATE DATABASE test_db;"), 0o644))
273+
require.NoError(t, os.WriteFile(
274+
filepath.Join(estimateDir, "test_db.users-schema.sql"),
275+
[]byte(strings.Join([]string{
276+
"CREATE DATABASE IF NOT EXISTS test_db;",
277+
"USE test_db;",
278+
"DROP TABLE IF EXISTS users;",
279+
"CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255), KEY idx_name (name));",
280+
}, "\n")),
281+
0o644,
282+
))
283+
require.NoError(t, os.WriteFile(
284+
filepath.Join(estimateDir, "test_db.users.001.csv"),
285+
[]byte("1,alice\n2,bob\n"),
286+
0o644,
287+
))
288+
289+
cfg := defaultSDKConfig()
290+
cfg.skipInvalidFiles = true
291+
estimateScanner, err := NewFileScanner(ctx, "file://"+estimateDir, db, cfg)
292+
require.NoError(t, err)
293+
defer estimateScanner.Close()
294+
295+
estimate, err := estimateScanner.EstimateImportDataSize(ctx)
296+
require.NoError(t, err)
297+
require.Len(t, estimate.Tables, 1)
298+
require.Equal(t, "users", estimate.Tables[0].Table)
299+
require.Positive(t, estimate.Tables[0].SourceSize)
300+
require.Positive(t, estimate.Tables[0].TiKVSize)
301+
require.Equal(t, estimate.Tables[0].SourceSize, estimate.TotalSourceSize)
302+
require.Equal(t, estimate.Tables[0].TiKVSize, estimate.TotalTiKVSize)
303+
})
269304
}
270305

271306
func TestFileScannerWithEstimateRealSize(t *testing.T) {

0 commit comments

Comments
 (0)