Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions pkg/executor/importer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"go.uber.org/zap"
Expand Down Expand Up @@ -352,27 +351,30 @@ func (s *kvSizeSampler) sampleOneFile(
}()

var (
count int
readRowCache []types.Datum
readFn = parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey())
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
count int
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
)
for count < maxRowCount {
row, closed, readErr := readFn(ctx, readRowCache)
if readErr != nil {
return 0, 0, 0, readErr
}
if closed {
startPos, _ := parser.Pos()
if s.cfg.Format != DataFormatParquet && startPos >= chunk.Chunk.EndOffset {
break
}
readRowCache = row.row
if rowDelta := row.endOffset - row.startPos; rowDelta > 0 {
sourceSize += rowDelta

readErr := parser.ReadRow()
if readErr != nil {
if errors.Cause(readErr) == io.EOF {
break
}
return 0, 0, 0, common.ErrEncodeKV.Wrap(readErr).GenWithStackByArgs(chunk.GetKey(), startPos)
}
kvs, encodeErr := encoder.Encode(row.row, row.rowID)
row.resetFn()

lastRow := parser.LastRow()
sourceSize += s.sampledRowSourceSize(parser, startPos, lastRow)

kvs, encodeErr := encoder.Encode(lastRow.Row, lastRow.RowID)
parser.RecycleRow(lastRow)
if encodeErr != nil {
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), row.startPos)
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), startPos)
}
if _, err = kvBatch.add(kvs); err != nil {
return 0, 0, 0, err
Expand All @@ -382,3 +384,19 @@ func (s *kvSizeSampler) sampleOneFile(
dataKVSize, indexKVSize = kvBatch.groupChecksum.DataAndIndexSumSize()
return sourceSize, dataKVSize, indexKVSize, nil
}

func (s *kvSizeSampler) sampledRowSourceSize(parser mydump.Parser, startPos int64, row mydump.Row) int64 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Minor] Parquet and fallback source-size branches lack targeted regression coverage

Why
The patch introduces format-specific source-size logic in sampledRowSourceSize, but the new test only validates the SQL consumed-bytes path and does not exercise the newly added Parquet or non-positive delta fallback branches.

Scope
pkg/executor/importer/sampler.go:388; pkg/executor/importer/sampler_test.go:238

Risk if unchanged
Future parser position behavior changes can silently skew sampled source-size estimation for Parquet or edge parser offsets, which may mis-tune IMPORT resource planning without an obvious failure signal.

Evidence
sampledRowSourceSize adds if s.cfg.Format == DataFormatParquet { return int64(row.Length) } and if rowDelta := endPos - startPos; rowDelta > 0 { ... } fallback logic, while the added test sql_source_size_uses_consumed_bytes_not_buffered_progress covers only SQL input.

Change request
add UT for it: add a case for DataFormatParquet and a case for the non-parquet endPos <= startPos fallback path so each new sizing branch is pinned by deterministic assertions.

// Sampling needs per-row source bytes, not buffered reader progress.
// SQL/CSV parsers expose byte offsets through Pos(), including compressed
// input where Pos() tracks uncompressed bytes and stays aligned with the
// RealSize-based source totals. Parquet Pos() is row-count based and must
// fall back to the row-size estimate.
if s.cfg.Format == DataFormatParquet {
return int64(row.Length)
}
endPos, _ := parser.Pos()
if rowDelta := endPos - startPos; rowDelta > 0 {
return rowDelta
}
return int64(row.Length)
}
51 changes: 51 additions & 0 deletions pkg/executor/importer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,57 @@ func TestSampleIndexSizeRatio(t *testing.T) {
require.Error(t, err)
require.True(t, reader.closed)
})

t.Run("sql_source_size_uses_consumed_bytes_not_buffered_progress", func(t *testing.T) {
dir := t.TempDir()
var fileSB strings.Builder
fileSB.WriteString("INSERT INTO t VALUES\n")
for i := 0; i < 20; i++ {
_, err := fmt.Fprintf(&fileSB, "(%d,'v%d','w%d','x%d')", i, i, i, i)
require.NoError(t, err)
if i < 19 {
fileSB.WriteString(",\n")
continue
}
fileSB.WriteString(";\n")
}
content := fileSB.String()
require.NoError(t, os.WriteFile(filepath.Join(dir, "001.sql"), []byte(content), 0o644))

p := parser.New()
node, err := p.ParseOneStmt(`create table t (a int, b text, c text, d text, index idx(a));`, "", "")
require.NoError(t, err)
sctx := utilmock.NewContext()
tblInfo, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
require.NoError(t, err)
tblInfo.State = model.StatePublic
table := tables.MockTableFromMeta(tblInfo)

ctrl, err := NewLoadDataController(&Plan{
Path: filepath.Join(dir, "*.sql"),
Format: DataFormatSQL,
InImportInto: true,
}, table, &ASTArgs{})
require.NoError(t, err)
ctrl.logger = zap.Must(zap.NewDevelopment())
ctx := context.Background()
require.NoError(t, ctrl.InitDataFiles(ctx))

sampled, err := SampleFileImportKVSize(
ctx,
ctrl.buildKVSizeSampleConfig(),
table,
ctrl.dataStore,
ctrl.dataFiles,
nil,
ctrl.logger,
)
require.NoError(t, err)
require.Positive(t, sampled.SourceSize)
require.Positive(t, sampled.TotalKVSize())
require.Greater(t, sampled.SourceSize, int64(len(content)/2))
require.Less(t, sampled.SourceSize, int64(len(content)*2))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})
}
func TestSampleIndexSizeRatioVeryLongRows(t *testing.T) {
simpleTbl := `create table t (a int, b text, c text, d text, index idx(a));`
Expand Down
51 changes: 47 additions & 4 deletions pkg/importsdk/file_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ func (s *fileScanner) buildEstimateTableInfo(ctx context.Context, tblMeta *mydum
}
p := parser.New()
p.SetSQLMode(s.config.sqlMode)
stmt, err := p.ParseOneStmt(schemaSQL, "", "")
stmts, _, err := p.ParseSQL(schemaSQL)
if err != nil {
return nil, errors.Trace(err)
}
createStmt, ok := stmt.(*ast.CreateTableStmt)
if !ok {
return nil, errors.Errorf("schema file %s does not contain a CREATE TABLE statement", tblMeta.SchemaFile.FileMeta.Path)
createStmt, err := buildEstimateCreateTableStmt(stmts, tblMeta)
if err != nil {
return nil, err
}
tableInfo, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), createStmt)
if err != nil {
Expand All @@ -408,6 +408,49 @@ func (s *fileScanner) buildEstimateTableInfo(ctx context.Context, tblMeta *mydum
return tableInfo, nil
}

func buildEstimateCreateTableStmt(stmts []ast.StmtNode, tblMeta *mydump.MDTableMeta) (*ast.CreateTableStmt, error) {
var (
firstCreateStmt *ast.CreateTableStmt
createStmtCount int
)
for _, stmt := range stmts {
createStmt, ok := stmt.(*ast.CreateTableStmt)
if !ok {
continue
}
if firstCreateStmt == nil {
firstCreateStmt = createStmt
}
createStmtCount++
if estimateCreateTableStmtMatchesMeta(createStmt, tblMeta) {
return createStmt, nil
}
}
if createStmtCount == 1 {
return firstCreateStmt, nil
}
if createStmtCount == 0 {
return nil, errors.Errorf("schema file %s does not contain a CREATE TABLE statement", tblMeta.SchemaFile.FileMeta.Path)
}
return nil, errors.Errorf(
"schema file %s contains %d CREATE TABLE statements but none match table %s.%s",
tblMeta.SchemaFile.FileMeta.Path,
createStmtCount,
tblMeta.DB,
tblMeta.Name,
)
}

func estimateCreateTableStmtMatchesMeta(createStmt *ast.CreateTableStmt, tblMeta *mydump.MDTableMeta) bool {
if !strings.EqualFold(createStmt.Table.Name.String(), tblMeta.Name) {
return false
}
if createStmt.Table.Schema.String() == "" {
return true
}
return strings.EqualFold(createStmt.Table.Schema.String(), tblMeta.DB)
Comment on lines +444 to +451
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle empty tblMeta.DB when matching schema-qualified CREATE TABLE statements.

If tblMeta.DB is empty, current logic rejects otherwise valid matches (CREATE TABLE db.tbl ...) and can fail estimation when multiple CREATE TABLE statements exist.

Suggested fix
 func estimateCreateTableStmtMatchesMeta(createStmt *ast.CreateTableStmt, tblMeta *mydump.MDTableMeta) bool {
 	if !strings.EqualFold(createStmt.Table.Name.String(), tblMeta.Name) {
 		return false
 	}
-	if createStmt.Table.Schema.String() == "" {
+	if createStmt.Table.Schema.String() == "" || tblMeta.DB == "" {
 		return true
 	}
 	return strings.EqualFold(createStmt.Table.Schema.String(), tblMeta.DB)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func estimateCreateTableStmtMatchesMeta(createStmt *ast.CreateTableStmt, tblMeta *mydump.MDTableMeta) bool {
if !strings.EqualFold(createStmt.Table.Name.String(), tblMeta.Name) {
return false
}
if createStmt.Table.Schema.String() == "" {
return true
}
return strings.EqualFold(createStmt.Table.Schema.String(), tblMeta.DB)
func estimateCreateTableStmtMatchesMeta(createStmt *ast.CreateTableStmt, tblMeta *mydump.MDTableMeta) bool {
if !strings.EqualFold(createStmt.Table.Name.String(), tblMeta.Name) {
return false
}
if createStmt.Table.Schema.String() == "" || tblMeta.DB == "" {
return true
}
return strings.EqualFold(createStmt.Table.Schema.String(), tblMeta.DB)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/importsdk/file_scanner.go` around lines 444 - 451, The matching logic in
estimateCreateTableStmtMatchesMeta incorrectly rejects matches when tblMeta.DB
is empty but createStmt.Table.Schema is non-empty; update
estimateCreateTableStmtMatchesMeta so that after verifying the table name
(createStmt.Table.Name) it treats an empty tblMeta.DB as a wildcard and returns
true (i.e., if tblMeta.DB == "" return true) before comparing
createStmt.Table.Schema.String() to tblMeta.DB; keep the existing behavior for
empty createStmt.Table.Schema.String().

}

func sourceTypeToImportFormat(tp mydump.SourceType) (string, error) {
switch tp {
case mydump.SourceTypeCSV:
Expand Down
35 changes: 35 additions & 0 deletions pkg/importsdk/file_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,41 @@ func TestFileScanner(t *testing.T) {
require.Equal(t, estimate.Tables[0].SourceSize, estimate.TotalSourceSize)
require.Equal(t, estimate.Tables[0].TiKVSize, estimate.TotalTiKVSize)
})

t.Run("EstimateImportDataSizeMultiStatementSchema", func(t *testing.T) {
estimateDir := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Join(estimateDir, "test_db-schema-create.sql"), []byte("CREATE DATABASE test_db;"), 0o644))
require.NoError(t, os.WriteFile(
filepath.Join(estimateDir, "test_db.users-schema.sql"),
[]byte(strings.Join([]string{
"CREATE DATABASE IF NOT EXISTS test_db;",
"USE test_db;",
"DROP TABLE IF EXISTS users;",
"CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255), KEY idx_name (name));",
}, "\n")),
0o644,
))
require.NoError(t, os.WriteFile(
filepath.Join(estimateDir, "test_db.users.001.csv"),
[]byte("1,alice\n2,bob\n"),
0o644,
))

cfg := defaultSDKConfig()
cfg.skipInvalidFiles = true
estimateScanner, err := NewFileScanner(ctx, "file://"+estimateDir, db, cfg)
require.NoError(t, err)
defer estimateScanner.Close()

estimate, err := estimateScanner.EstimateImportDataSize(ctx)
require.NoError(t, err)
require.Len(t, estimate.Tables, 1)
require.Equal(t, "users", estimate.Tables[0].Table)
require.Positive(t, estimate.Tables[0].SourceSize)
require.Positive(t, estimate.Tables[0].TiKVSize)
require.Equal(t, estimate.Tables[0].SourceSize, estimate.TotalSourceSize)
require.Equal(t, estimate.Tables[0].TiKVSize, estimate.TotalTiKVSize)
})
}

func TestFileScannerWithEstimateRealSize(t *testing.T) {
Expand Down
Loading