Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 32,
shard_count = 33,
deps = [
"//br/pkg/mock",
"//br/pkg/streamhelper",
Expand Down
140 changes: 139 additions & 1 deletion pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,126 @@ func initExternalStore(ctx context.Context, u *url.URL, target string) (storage.
return s, nil
}

func estimateCompressionRatio(
ctx context.Context,
filePath string,
fileSize int64,
tp mydump.SourceType,
store storage.ExternalStorage,
) (float64, error) {
if tp != mydump.SourceTypeParquet {
return 1.0, nil
}
failpoint.Inject("skipEstimateCompressionForParquet", func(val failpoint.Value) {
if v, ok := val.(bool); ok && v {
failpoint.Return(2.0, nil)
}
})
fileMeta := mydump.SourceFileMeta{
Path: filePath,
FileSize: fileSize,
Compression: mydump.ParseCompressionOnFileExtension(filePath),
Type: tp,
}
rows, rowSize, err := mydump.SampleParquetRowSize(ctx, fileMeta, store)
if err != nil {
return 1.0, err
}
// No row in the file, use 2.0 as default compression ratio.
if rowSize == 0 || rows == 0 {
return 2.0, nil
}

compressionRatio := (rowSize * float64(rows)) / float64(fileSize)
return compressionRatio, nil
}

// maxSampledCompressedFiles indicates the max number of files we used to sample
// compression ratio for each compression type. Consider the extreme case that
// user data contains all 3 compression types. Then we need to sample about 1,500
// files. Suppose each file costs 0.5 second (for example, cross region access),
// we still can finish in one minute with 16 concurrency.
const maxSampledCompressedFiles = 512

// compressionEstimator estimates compression ratio for different compression types.
// It uses harmonic mean to get the average compression ratio.
type compressionEstimator struct {
mu sync.Mutex
records map[mydump.Compression][]float64
ratio sync.Map
}

func newCompressionRecorder() *compressionEstimator {
return &compressionEstimator{
records: make(map[mydump.Compression][]float64),
}
}

func getHarmonicMean(rs []float64) float64 {
if len(rs) == 0 {
return 1.0
}
var (
sumInverse float64
count int
)
for _, r := range rs {
if r > 0 {
sumInverse += 1.0 / r
count++
}
}

if count == 0 {
return 1.0
}
return float64(count) / sumInverse
}

func (r *compressionEstimator) estimate(
ctx context.Context,
fileMeta mydump.SourceFileMeta,
store storage.ExternalStorage,
) float64 {
compressTp := mydump.ParseCompressionOnFileExtension(fileMeta.Path)
if compressTp == mydump.CompressionNone {
return 1.0
}
if v, ok := r.ratio.Load(compressTp); ok {
return v.(float64)
}

compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store)
if err != nil {
logutil.Logger(ctx).Error("fail to calculate data file compress ratio",
zap.String("category", "loader"),
zap.String("path", fileMeta.Path),
zap.Stringer("type", fileMeta.Type), zap.Error(err),
)
return 1.0
}

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.ratio.Load(compressTp); ok {
return compressRatio
}
Comment on lines +1312 to +1314
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Return the published aggregate after cache initialization.

If another worker stores r.ratio[compressTp] between the fast-path Load and taking mu, Line 1309 returns the just-sampled per-file ratio instead of the cached aggregate. That leaves a small tail of files with mixed estimates after the cache is initialized.

🐛 Proposed fix
-	if _, ok := r.ratio.Load(compressTp); ok {
-		return compressRatio
+	if v, ok := r.ratio.Load(compressTp); ok {
+		return v.(float64)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/executor/importer/import.go` around lines 1308 - 1310, The fast-path
returns the just-sampled per-file compressRatio if another worker stores
r.ratio[compressTp] between the initial r.ratio.Load(compressTp) check and
acquiring the mutex; to fix this, after acquiring the mutex (r.mu) re-check
r.ratio.Load(compressTp) and if an aggregate is now present return that cached
aggregate instead of the per-file compressRatio; otherwise proceed to initialize
and publish the aggregate as before (ensure you use the same
compressTp/compressRatio symbols and release the mutex after).


if r.records[compressTp] == nil {
r.records[compressTp] = make([]float64, 0, 256)
}
if len(r.records[compressTp]) < maxSampledCompressedFiles {
r.records[compressTp] = append(r.records[compressTp], compressRatio)
}
if len(r.records[compressTp]) >= maxSampledCompressedFiles {
// Using harmonic mean can better handle outlier values.
compressRatio = getHarmonicMean(r.records[compressTp])
r.ratio.Store(compressTp, compressRatio)
}
return compressRatio
}

// InitDataFiles initializes the data store and files.
// it will call InitDataStore internally.
func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
Expand Down Expand Up @@ -1260,6 +1380,9 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
var (
totalSize int64
sourceType mydump.SourceType
// sizeExpansionRatio is the estimated size expansion for parquet format.
// For non-parquet format, it's always 1.0.
sizeExpansionRatio = 1.0
)
dataFiles := []*mydump.SourceFileMeta{}
isAutoDetectingFormat := e.Format == DataFormatAuto
Expand Down Expand Up @@ -1322,17 +1445,32 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
}

var err error
var once sync.Once

ce := newCompressionRecorder()

if dataFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2,
func(ctx context.Context, f mydump.RawFile) (*mydump.SourceFileMeta, error) {
path, size := f.Path, f.Size
// pick arbitrary one file to detect the format.
var err2 error
once.Do(func() {
e.detectAndUpdateFormat(path)
sourceType = e.getSourceType()
sizeExpansionRatio, err2 = estimateCompressionRatio(ctx, path, size, sourceType, s)
})
if err2 != nil {
return nil, err2
}
compressTp := mydump.ParseCompressionOnFileExtension(path)
fileMeta := mydump.SourceFileMeta{
Path: path,
FileSize: size,
Compression: compressTp,
Type: sourceType,
}
fileMeta.RealSize = mydump.EstimateRealSizeForFile(ctx, fileMeta, s)
fileMeta.RealSize = int64(ce.estimate(ctx, fileMeta, s) * float64(fileMeta.FileSize))
fileMeta.RealSize = int64(float64(fileMeta.RealSize) * sizeExpansionRatio)
return &fileMeta, nil
}); err != nil {
return err
Expand Down
32 changes: 32 additions & 0 deletions pkg/executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -313,6 +314,37 @@ func TestGetLocalBackendCfg(t *testing.T) {
require.Equal(t, config.DefaultSwitchTiKVModeInterval, cfg.RaftKV2SwitchModeDuration)
}

func TestInitCompressedFiles(t *testing.T) {
username, err := user.Current()
require.NoError(t, err)
if username.Name == "root" {
t.Skip("it cannot run as root")
}
tempDir := t.TempDir()
ctx := context.Background()

for i := range 2048 {
fileName := filepath.Join(tempDir, fmt.Sprintf("test_%d.csv.gz", i))
require.NoError(t, os.WriteFile(fileName, []byte{}, 0o644))
}

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/lightning/mydump/SampleFileCompressPercentage", `return(250)`)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
c := LoadDataController{
Plan: &Plan{
Format: DataFormatCSV,
InImportInto: true,
Charset: &defaultCharacterSet,
LineFieldsInfo: newDefaultLineFieldsInfo(),
FieldNullDef: defaultFieldNullDef,
Parameters: &ImportParameters{},
},
logger: zap.NewExample(),
}

c.Path = filepath.Join(tempDir, "*.gz")
require.NoError(t, c.InitDataFiles(ctx))
}

func TestSupportedSuffixForServerDisk(t *testing.T) {
if kerneltype.IsNextGen() {
t.Skip("nextgen doesn't support import from server disk")
Expand Down