Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions pkg/executor/importer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"go.uber.org/zap"
Expand Down Expand Up @@ -352,27 +351,30 @@ func (s *kvSizeSampler) sampleOneFile(
}()

var (
count int
readRowCache []types.Datum
readFn = parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey())
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
count int
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
)
for count < maxRowCount {
row, closed, readErr := readFn(ctx, readRowCache)
if readErr != nil {
return 0, 0, 0, readErr
}
if closed {
startPos, _ := parser.Pos()
if s.cfg.Format != DataFormatParquet && startPos >= chunk.Chunk.EndOffset {
break
}
readRowCache = row.row
if rowDelta := row.endOffset - row.startPos; rowDelta > 0 {
sourceSize += rowDelta

readErr := parser.ReadRow()
if readErr != nil {
if errors.Cause(readErr) == io.EOF {
break
}
return 0, 0, 0, common.ErrEncodeKV.Wrap(readErr).GenWithStackByArgs(chunk.GetKey(), startPos)
}
kvs, encodeErr := encoder.Encode(row.row, row.rowID)
row.resetFn()

lastRow := parser.LastRow()
sourceSize += s.sampledRowSourceSize(parser, startPos, lastRow)

kvs, encodeErr := encoder.Encode(lastRow.Row, lastRow.RowID)
parser.RecycleRow(lastRow)
if encodeErr != nil {
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), row.startPos)
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), startPos)
}
if _, err = kvBatch.add(kvs); err != nil {
return 0, 0, 0, err
Expand All @@ -382,3 +384,19 @@ func (s *kvSizeSampler) sampleOneFile(
dataKVSize, indexKVSize = kvBatch.groupChecksum.DataAndIndexSumSize()
return sourceSize, dataKVSize, indexKVSize, nil
}

func (s *kvSizeSampler) sampledRowSourceSize(parser mydump.Parser, startPos int64, row mydump.Row) int64 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Minor] Parquet and fallback source-size branches lack targeted regression coverage

Why
The patch introduces format-specific source-size logic in sampledRowSourceSize, but the new test only validates the SQL consumed-bytes path and does not exercise the newly added Parquet or non-positive delta fallback branches.

Scope
pkg/executor/importer/sampler.go:388; pkg/executor/importer/sampler_test.go:238

Risk if unchanged
Future parser position behavior changes can silently skew sampled source-size estimation for Parquet or edge parser offsets, which may mis-tune IMPORT resource planning without an obvious failure signal.

Evidence
sampledRowSourceSize adds if s.cfg.Format == DataFormatParquet { return int64(row.Length) } and if rowDelta := endPos - startPos; rowDelta > 0 { ... } fallback logic, while the added test sql_source_size_uses_consumed_bytes_not_buffered_progress covers only SQL input.

Change request
add UT for it: add a case for DataFormatParquet and a case for the non-parquet endPos <= startPos fallback path so each new sizing branch is pinned by deterministic assertions.

// Sampling needs per-row source bytes, not buffered reader progress.
// SQL/CSV parsers expose byte offsets through Pos(), including compressed
// input where Pos() tracks uncompressed bytes and stays aligned with the
// RealSize-based source totals. Parquet Pos() is row-count based and must
// fall back to the row-size estimate.
if s.cfg.Format == DataFormatParquet {
return int64(row.Length)
}
endPos, _ := parser.Pos()
if rowDelta := endPos - startPos; rowDelta > 0 {
return rowDelta
}
return int64(row.Length)
}
51 changes: 51 additions & 0 deletions pkg/executor/importer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,57 @@ func TestSampleIndexSizeRatio(t *testing.T) {
require.Error(t, err)
require.True(t, reader.closed)
})

t.Run("sql_source_size_uses_consumed_bytes_not_buffered_progress", func(t *testing.T) {
dir := t.TempDir()
var fileSB strings.Builder
fileSB.WriteString("INSERT INTO t VALUES\n")
for i := 0; i < 20; i++ {
_, err := fmt.Fprintf(&fileSB, "(%d,'v%d','w%d','x%d')", i, i, i, i)
require.NoError(t, err)
if i < 19 {
fileSB.WriteString(",\n")
continue
}
fileSB.WriteString(";\n")
}
content := fileSB.String()
require.NoError(t, os.WriteFile(filepath.Join(dir, "001.sql"), []byte(content), 0o644))

p := parser.New()
node, err := p.ParseOneStmt(`create table t (a int, b text, c text, d text, index idx(a));`, "", "")
require.NoError(t, err)
sctx := utilmock.NewContext()
tblInfo, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
require.NoError(t, err)
tblInfo.State = model.StatePublic
table := tables.MockTableFromMeta(tblInfo)

ctrl, err := NewLoadDataController(&Plan{
Path: filepath.Join(dir, "*.sql"),
Format: DataFormatSQL,
InImportInto: true,
}, table, &ASTArgs{})
require.NoError(t, err)
ctrl.logger = zap.Must(zap.NewDevelopment())
ctx := context.Background()
require.NoError(t, ctrl.InitDataFiles(ctx))

sampled, err := SampleFileImportKVSize(
ctx,
ctrl.buildKVSizeSampleConfig(),
table,
ctrl.dataStore,
ctrl.dataFiles,
nil,
ctrl.logger,
)
require.NoError(t, err)
require.Positive(t, sampled.SourceSize)
require.Positive(t, sampled.TotalKVSize())
require.Greater(t, sampled.SourceSize, int64(len(content)/2))
require.Less(t, sampled.SourceSize, int64(len(content)*2))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})
}
func TestSampleIndexSizeRatioVeryLongRows(t *testing.T) {
simpleTbl := `create table t (a int, b text, c text, d text, index idx(a));`
Expand Down
Loading