Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lightning/pkg/importinto/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func NewImporter(
importsdk.WithFileRouters(cfg.Mydumper.FileRouters),
importsdk.WithRoutes(cfg.Routes),
importsdk.WithCharset(cfg.Mydumper.CharacterSet),
importsdk.WithDataCharacterSet(cfg.Mydumper.DataCharacterSet),
importsdk.WithCSVConfig(cfg.Mydumper.CSV),
importsdk.WithLogger(imp.logger),
}
sdk, err := importsdk.NewImportSDK(ctx, cfg.Mydumper.SourceDir, db, sdkOpts...)
Expand Down
132 changes: 92 additions & 40 deletions pkg/executor/importer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ package importer

import (
"context"
goerrors "errors"
"math/rand"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/types"
"go.uber.org/zap"
)

Expand All @@ -50,46 +52,88 @@ var (
// if total files < maxSampleFileCount, the total file size is small, the
// accuracy of the ratio is not that important.
maxSampleFileSize int64 = 10 * units.MiB
stopIterErr = goerrors.New("stop iteration")
)

// SampledKVSizeResult contains the sampled source bytes and encoded KV sizes.
type SampledKVSizeResult struct {
SourceSize int64
DataKVSize uint64
IndexKVSize uint64
}

// TotalKVSize returns the total encoded KV size in the sample.
func (r *SampledKVSizeResult) TotalKVSize() int64 {
return int64(r.DataKVSize + r.IndexKVSize)
}

// SampleFileImportKVSize samples source rows with nextgen's KV encoder and returns
// the sampled source bytes and encoded KV sizes.
func SampleFileImportKVSize(
ctx context.Context,
plan *Plan,
tbl table.Table,
astArgs *ASTArgs,
dataStore storeapi.Storage,
dataFiles []*mydump.SourceFileMeta,
ksCodec []byte,
options ...Option,
) (*SampledKVSizeResult, error) {
ctrl, err := NewLoadDataController(plan, tbl, astArgs, options...)
if err != nil {
return nil, err
}
ctrl.dataStore = dataStore
ctrl.dataFiles = dataFiles
return ctrl.sampleKVSize(ctx, ksCodec)
}

func (e *LoadDataController) sampleIndexSizeRatio(
ctx context.Context,
ksCodec []byte,
) (float64, error) {
if len(e.dataFiles) == 0 {
result, err := e.sampleKVSize(ctx, ksCodec)
if err != nil {
return 0, err
Comment thread
joechenrh marked this conversation as resolved.
}
if result.DataKVSize == 0 {
return 0, nil
}
return float64(result.IndexKVSize) / float64(result.DataKVSize), nil
}

func (e *LoadDataController) sampleKVSize(
ctx context.Context,
ksCodec []byte,
) (*SampledKVSizeResult, error) {
if len(e.dataFiles) == 0 {
return &SampledKVSizeResult{}, nil
}
perm := rand.Perm(len(e.dataFiles))
files := make([]*mydump.SourceFileMeta, min(len(e.dataFiles), maxSampleFileCount))
for i := range files {
files[i] = e.dataFiles[perm[i]]
}
rowsPerFile := totalSampleRowCount / len(files)
var (
totalDataKVSize, totalIndexKVSize uint64
firstErr error
)
result := &SampledKVSizeResult{}
var firstErr error
for _, file := range files {
dataKVSize, indexKVSize, err := e.sampleIndexRatioForOneFile(ctx, file, ksCodec, rowsPerFile)
sourceSize, dataKVSize, indexKVSize, err := e.sampleKVSizeForOneFile(ctx, file, ksCodec, rowsPerFile)
if firstErr == nil {
firstErr = err
}
totalDataKVSize += dataKVSize
totalIndexKVSize += indexKVSize
}
if totalDataKVSize == 0 {
return 0, firstErr
result.SourceSize += sourceSize
result.DataKVSize += dataKVSize
result.IndexKVSize += indexKVSize
}
return float64(totalIndexKVSize) / float64(totalDataKVSize), firstErr
return result, firstErr
}

func (e *LoadDataController) sampleIndexRatioForOneFile(
func (e *LoadDataController) sampleKVSizeForOneFile(
ctx context.Context,
file *mydump.SourceFileMeta,
ksCodec []byte,
maxRowCount int,
) (dataKVSize, indexKVSize uint64, err error) {
) (sourceSize int64, dataKVSize, indexKVSize uint64, err error) {
chunk := &checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: file.Path},
FileMeta: *file,
Expand All @@ -99,11 +143,11 @@ func (e *LoadDataController) sampleIndexRatioForOneFile(
idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta())
if err != nil {
return 0, 0, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name)
return 0, 0, 0, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name)
}
encoder, err := e.getKVEncoder(e.logger, chunk, tbl)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
defer func() {
if err2 := encoder.Close(); err2 != nil {
Expand All @@ -112,34 +156,42 @@ func (e *LoadDataController) sampleIndexRatioForOneFile(
}()
parser, err := e.getParser(ctx, chunk)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
defer func() {
if err2 := parser.Close(); err2 != nil {
e.logger.Warn("close parser failed", zap.Error(err2))
}
}()

var count int
sendFn := func(context.Context, *encodedKVGroupBatch) error {
count++
if count >= maxRowCount {
return stopIterErr
var (
count int
readRowCache []types.Datum
readFn = parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey())
kvBatch = newEncodedKVGroupBatch(ksCodec, maxRowCount)
)
for count < maxRowCount {
row, closed, readErr := readFn(ctx, readRowCache)
if readErr != nil {
return 0, 0, 0, readErr
}
return nil
}
chunkEnc := &chunkEncoder{
chunkName: chunk.GetKey(),
readFn: parserEncodeReader(parser, chunk.Chunk.EndOffset, chunk.GetKey()),
sendFn: sendFn,
encoder: encoder,
keyspace: ksCodec,
groupChecksum: verify.NewKVGroupChecksumWithKeyspace(ksCodec),
}
err = chunkEnc.encodeLoop(ctx)
if goerrors.Is(err, stopIterErr) {
err = nil
if closed {
break
}
readRowCache = row.row
if rowDelta := row.endOffset - row.startPos; rowDelta > 0 {
sourceSize += rowDelta
}
kvs, encodeErr := encoder.Encode(row.row, row.rowID)
row.resetFn()
if encodeErr != nil {
return 0, 0, 0, common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(chunk.GetKey(), row.startPos)
}
if _, err = kvBatch.add(kvs); err != nil {
return 0, 0, 0, err
}
count++
}
dataKVSize, indexKVSize = chunkEnc.groupChecksum.DataAndIndexSumSize()
return dataKVSize, indexKVSize, err
dataKVSize, indexKVSize = kvBatch.groupChecksum.DataAndIndexSumSize()
return sourceSize, dataKVSize, indexKVSize, nil
}
8 changes: 8 additions & 0 deletions pkg/importsdk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/importsdk",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl",
"//pkg/executor/importer",
"//pkg/lightning/common",
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/lightning/mydump",
"//pkg/meta/model",
"//pkg/objstore",
"//pkg/objstore/storeapi",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/table/tables",
"//pkg/util/mock",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
Expand Down
29 changes: 25 additions & 4 deletions pkg/importsdk/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type SDKConfig struct {
routes config.Routes
filter []string
charset string
csvConfig config.CSVConfig
dataCharacterSet string
maxScanFiles *int
skipInvalidFiles bool
estimateRealSize bool
Expand All @@ -41,11 +43,14 @@ type SDKConfig struct {
}

func defaultSDKConfig() *SDKConfig {
defaultCfg := config.NewConfig()
return &SDKConfig{
concurrency: 4,
filter: config.GetDefaultFilter(),
logger: log.L(),
charset: "auto",
concurrency: 4,
filter: config.GetDefaultFilter(),
logger: log.L(),
charset: "auto",
csvConfig: defaultCfg.Mydumper.CSV,
dataCharacterSet: defaultCfg.Mydumper.DataCharacterSet,
// Estimate the real size (uncompressed / row-oriented) for compressed/parquet data files by default.
estimateRealSize: true,
}
Expand Down Expand Up @@ -104,6 +109,22 @@ func WithCharset(cs string) SDKOption {
}
}

// WithCSVConfig specifies the CSV parsing configuration used for size estimation.
func WithCSVConfig(csvCfg config.CSVConfig) SDKOption {
return func(cfg *SDKConfig) {
cfg.csvConfig = csvCfg
}
}

// WithDataCharacterSet specifies the source data character set used for CSV parsing.
func WithDataCharacterSet(charset string) SDKOption {
return func(cfg *SDKConfig) {
if charset != "" {
cfg.dataCharacterSet = charset
}
}
}

// WithMaxScanFiles specifies custom file scan limitation
func WithMaxScanFiles(limit int) SDKOption {
return func(cfg *SDKConfig) {
Expand Down
Loading