importer: sample a portion of compressed files to speed up import spec generation (#64769)#67654
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
@D3Hunter This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
📝 WalkthroughWalkthroughAdds sampling-based compression-aware real-size estimation to importer initialization, using per-format sampling (Parquet special-case) and a cached harmonic-mean compression estimator; also increases a test shard count and adds a test for scanning many compressed files. Changes
Sequence DiagramsequenceDiagram
participant Client
participant InitDataFiles
participant Detector
participant Sampler
participant Estimator
participant SizeCalc
Client->>InitDataFiles: InitDataFiles(globPattern)
InitDataFiles->>Detector: detectAndUpdateFormat() [sync.Once]
Detector-->>InitDataFiles: sourceType
InitDataFiles->>Sampler: sample files (bounded)
Sampler-->>Estimator: sampled stats
Estimator->>Estimator: compute harmonic mean ratio
Estimator-->>InitDataFiles: sizeExpansionRatio
InitDataFiles->>SizeCalc: estimate(file) * fileSize * sizeExpansionRatio
SizeCalc-->>InitDataFiles: estimated real sizes
InitDataFiles-->>Client: completed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/unhold |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
pkg/executor/importer/import_test.go (1)
325-328: Trim the fixture count to the actual sampling boundary.Creating 2048 files is heavier than needed, and the magic number will drift if
maxSampledCompressedFileschanges.maxSampledCompressedFiles + 1is enough to cross the new cutoff and keeps the test targeted.♻️ Proposed fix
- for i := range 2048 { + for i := 0; i < maxSampledCompressedFiles+1; i++ { fileName := filepath.Join(tempDir, fmt.Sprintf("test_%d.csv.gz", i)) require.NoError(t, os.WriteFile(fileName, []byte{}, 0o644)) }As per coding guidelines "Keep test changes minimal and deterministic; avoid broad golden/testdata churn unless required."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/import_test.go` around lines 325 - 328, The test currently creates 2048 files unnecessarily; replace the hardcoded range(2048) with a minimal deterministic value based on the sampling boundary (use maxSampledCompressedFiles + 1) so the test only produces one more than the cutoff and remains correct if maxSampledCompressedFiles changes; update the loop that builds fileName and writes empty files (the block creating test_%d.csv.gz) to iterate up to maxSampledCompressedFiles+1 instead of 2048.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/executor/importer/import_test.go`:
- Line 330: The test uses testfailpoint.Enable (in import_test.go) but the
package isn't imported; add the missing import for the testfailpoint package
(e.g., import "github.com/pingcap/tidb/util/testfailpoint") to the test's import
block so testfailpoint.Enable is defined and the test compiles.
In `@pkg/executor/importer/import.go`:
- Around line 1454-1485: The RealSize calculation is inconsistent between glob
and exact-path imports: the glob branch uses ce.estimate(...) combined with
sizeExpansionRatio (from
detectAndUpdateFormat/getSourceType/estimateCompressionRatio) while the
exact-path branch still calls mydump.EstimateRealSizeForFile; factor the new
logic into a shared helper (e.g., computeRealSize(ctx, ce, path, size,
sourceType, s) or similar) that calls
detectAndUpdateFormat/getSourceType/estimateCompressionRatio as needed, uses
mydump.ParseCompressionOnFileExtension and ce.estimate to compute base size and
then applies sizeExpansionRatio, and replace both usages of
mydump.EstimateRealSizeForFile and the inline ce.estimate/sizeExpansionRatio
code in the ParallelProcess lambda and the exact-path branch to call this helper
and set SourceFileMeta.RealSize.
- Around line 1308-1310: The fast-path returns the just-sampled per-file
compressRatio if another worker stores r.ratio[compressTp] between the initial
r.ratio.Load(compressTp) check and acquiring the mutex; to fix this, after
acquiring the mutex (r.mu) re-check r.ratio.Load(compressTp) and if an aggregate
is now present return that cached aggregate instead of the per-file
compressRatio; otherwise proceed to initialize and publish the aggregate as
before (ensure you use the same compressTp/compressRatio symbols and release the
mutex after).
- Around line 1227-1229: The parquet sampling error should be handled as a
best-effort fallback instead of returning an error that aborts InitDataFiles: in
estimateCompressionRatio, catch errors from mydump.SampleStatisticsFromParquet
(the call that currently returns rows, rowSize, err) and on failure log or warn
and fall back to using FileSize (or a default compression ratio) to compute and
return the compression estimate rather than propagating the error; ensure the
once.Do path in InitDataFiles no longer receives an error from
estimateCompressionRatio so one unreadable/corrupt parquet file won't abort spec
generation.
---
Nitpick comments:
In `@pkg/executor/importer/import_test.go`:
- Around line 325-328: The test currently creates 2048 files unnecessarily;
replace the hardcoded range(2048) with a minimal deterministic value based on
the sampling boundary (use maxSampledCompressedFiles + 1) so the test only
produces one more than the cutoff and remains correct if
maxSampledCompressedFiles changes; update the loop that builds fileName and
writes empty files (the block creating test_%d.csv.gz) to iterate up to
maxSampledCompressedFiles+1 instead of 2048.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: cdafe954-5d67-439b-840b-23f2deb13b79
📒 Files selected for processing (3)
pkg/executor/importer/BUILD.bazelpkg/executor/importer/import.gopkg/executor/importer/import_test.go
| rows, rowSize, err := mydump.SampleStatisticsFromParquet(ctx, filePath, store) | ||
| if err != nil { | ||
| return 1.0, err |
There was a problem hiding this comment.
Keep parquet size estimation best-effort.
estimateCompressionRatio now returns an error when parquet sampling fails, and the once.Do path propagates that out of InitDataFiles. The old path degraded to FileSize on estimation errors, so one unreadable or corrupt sampled parquet file now aborts spec generation instead of just losing the optimization.
🐛 Proposed fix
rows, rowSize, err := mydump.SampleStatisticsFromParquet(ctx, filePath, store)
if err != nil {
- return 1.0, err
+ logutil.Logger(ctx).Warn("fail to sample parquet statistics, fallback to file size",
+ zap.String("path", filePath),
+ zap.Error(err),
+ )
+ return 1.0, nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rows, rowSize, err := mydump.SampleStatisticsFromParquet(ctx, filePath, store) | |
| if err != nil { | |
| return 1.0, err | |
| rows, rowSize, err := mydump.SampleStatisticsFromParquet(ctx, filePath, store) | |
| if err != nil { | |
| logutil.Logger(ctx).Warn("fail to sample parquet statistics, fallback to file size", | |
| zap.String("path", filePath), | |
| zap.Error(err), | |
| ) | |
| return 1.0, nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/importer/import.go` around lines 1227 - 1229, The parquet
sampling error should be handled as a best-effort fallback instead of returning
an error that aborts InitDataFiles: in estimateCompressionRatio, catch errors
from mydump.SampleStatisticsFromParquet (the call that currently returns rows,
rowSize, err) and on failure log or warn and fall back to using FileSize (or a
default compression ratio) to compute and return the compression estimate rather
than propagating the error; ensure the once.Do path in InitDataFiles no longer
receives an error from estimateCompressionRatio so one unreadable/corrupt
parquet file won't abort spec generation.
| if _, ok := r.ratio.Load(compressTp); ok { | ||
| return compressRatio | ||
| } |
There was a problem hiding this comment.
Return the published aggregate after cache initialization.
If another worker stores r.ratio[compressTp] between the fast-path Load and taking mu, Line 1309 returns the just-sampled per-file ratio instead of the cached aggregate. That leaves a small tail of files with mixed estimates after the cache is initialized.
🐛 Proposed fix
- if _, ok := r.ratio.Load(compressTp); ok {
- return compressRatio
+ if v, ok := r.ratio.Load(compressTp); ok {
+ return v.(float64)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/importer/import.go` around lines 1308 - 1310, The fast-path
returns the just-sampled per-file compressRatio if another worker stores
r.ratio[compressTp] between the initial r.ratio.Load(compressTp) check and
acquiring the mutex; to fix this, after acquiring the mutex (r.mu) re-check
r.ratio.Load(compressTp) and if an aggregate is now present return that cached
aggregate instead of the per-file compressRatio; otherwise proceed to initialize
and publish the aggregate as before (ensure you use the same
compressTp/compressRatio symbols and release the mutex after).
There was a problem hiding this comment.
♻️ Duplicate comments (3)
pkg/executor/importer/import.go (3)
1231-1233:⚠️ Potential issue | 🟠 MajorKeep parquet expansion sampling best-effort.
estimateCompressionRationow returns theSampleParquetRowSizeerror, and theonce.Dopath propagates that out ofInitDataFiles. One unreadable/corrupt sampled parquet file now aborts spec generation even though this value is only used to estimateRealSize. Falling back to1.0/file size with a warning would preserve the old behavior.🐛 Proposed fix
rows, rowSize, err := mydump.SampleParquetRowSize(ctx, fileMeta, store) if err != nil { - return 1.0, err + logutil.Logger(ctx).Warn("fail to sample parquet statistics, fallback to file size", + zap.String("path", filePath), + zap.Error(err), + ) + return 1.0, nil }Also applies to: 1457-1464
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/import.go` around lines 1231 - 1233, The parquet sampling call in estimateCompressionRatio (calling mydump.SampleParquetRowSize) currently returns its error and causes the once.Do path in InitDataFiles to abort spec generation; change estimateCompressionRatio to treat SampleParquetRowSize failures as non-fatal: catch the error, log a warning including fileMeta/err, and fall back to using compression ratio = 1.0 (or file size-based RealSize) instead of returning the error so InitDataFiles/once.Do won't propagate the failure; update both call sites (around the rows,rowSize assignment and the duplicate block at the other location) to preserve the old best-effort behavior.
1295-1314:⚠️ Potential issue | 🟡 MinorReturn the cached aggregate after the locked re-check.
If another worker stores
r.ratio[compressTp]between the fast-pathLoadand takingmu, Lines 1312-1314 still return this file's sampled ratio instead of the published harmonic mean. That leaves a small tail of files with mixed estimates.🐛 Proposed fix
- if _, ok := r.ratio.Load(compressTp); ok { - return compressRatio + if v, ok := r.ratio.Load(compressTp); ok { + return v.(float64) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/import.go` around lines 1295 - 1314, The code does a second r.ratio.Load(compressTp) check under r.mu but still returns the local compressRatio; change the locked re-check in the function handling file sampling so that if r.ratio.Load(compressTp) is present after acquiring r.mu it returns the cached aggregate (the value stored in r.ratio for compressTp) instead of returning the just-sampled compressRatio; locate symbols r.ratio, compressTp, r.mu, and compressRatio in the import.go sampling/ratio logic and update the control flow to return the stored value when present, otherwise continue to store/use compressRatio.
1404-1413:⚠️ Potential issue | 🟠 MajorUse the same
RealSizecalculation for exact-path imports.The glob branch now applies
ce.estimate(...) * sizeExpansionRatio, but the exact-path branch still callsmydump.EstimateRealSizeForFileat Line 1413.IMPORT INTO '/a.parquet'andIMPORT INTO '/a*.parquet'can therefore derive differentRealSizeand chunk sizing for the same source. Please factor the new logic into a shared helper and call it from both branches.Also applies to: 1452-1474
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/import.go` around lines 1404 - 1413, The exact-path branch is still calling mydump.EstimateRealSizeForFile while the glob branch uses the new ce.estimate(...) * sizeExpansionRatio logic, causing inconsistent RealSize and chunking; extract the new RealSize computation into a shared helper (e.g., computeRealSize(ctx, engineContext, fileMeta, sizeExpansionRatio, s)) that encapsulates the ce.estimate(...) * sizeExpansionRatio fallback to mydump.EstimateRealSizeForFile, then replace the direct call to mydump.EstimateRealSizeForFile in the exact-path code that sets fileMeta.RealSize (after detectAndUpdateFormat/getSourceType/ParseCompressionOnFileExtension) with a call to this helper, and make the same replacement in the other affected block (around the 1452-1474 region) so both glob and exact-path use the identical RealSize logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pkg/executor/importer/import.go`:
- Around line 1231-1233: The parquet sampling call in estimateCompressionRatio
(calling mydump.SampleParquetRowSize) currently returns its error and causes the
once.Do path in InitDataFiles to abort spec generation; change
estimateCompressionRatio to treat SampleParquetRowSize failures as non-fatal:
catch the error, log a warning including fileMeta/err, and fall back to using
compression ratio = 1.0 (or file size-based RealSize) instead of returning the
error so InitDataFiles/once.Do won't propagate the failure; update both call
sites (around the rows,rowSize assignment and the duplicate block at the other
location) to preserve the old best-effort behavior.
- Around line 1295-1314: The code does a second r.ratio.Load(compressTp) check
under r.mu but still returns the local compressRatio; change the locked re-check
in the function handling file sampling so that if r.ratio.Load(compressTp) is
present after acquiring r.mu it returns the cached aggregate (the value stored
in r.ratio for compressTp) instead of returning the just-sampled compressRatio;
locate symbols r.ratio, compressTp, r.mu, and compressRatio in the import.go
sampling/ratio logic and update the control flow to return the stored value when
present, otherwise continue to store/use compressRatio.
- Around line 1404-1413: The exact-path branch is still calling
mydump.EstimateRealSizeForFile while the glob branch uses the new
ce.estimate(...) * sizeExpansionRatio logic, causing inconsistent RealSize and
chunking; extract the new RealSize computation into a shared helper (e.g.,
computeRealSize(ctx, engineContext, fileMeta, sizeExpansionRatio, s)) that
encapsulates the ce.estimate(...) * sizeExpansionRatio fallback to
mydump.EstimateRealSizeForFile, then replace the direct call to
mydump.EstimateRealSizeForFile in the exact-path code that sets
fileMeta.RealSize (after
detectAndUpdateFormat/getSourceType/ParseCompressionOnFileExtension) with a call
to this helper, and make the same replacement in the other affected block
(around the 1452-1474 region) so both glob and exact-path use the identical
RealSize logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2f2788ff-222b-4b12-b814-2199797e5e27
📒 Files selected for processing (2)
pkg/executor/importer/import.gopkg/executor/importer/import_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/executor/importer/import_test.go
|
/retest |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Benjamin2037, joechenrh The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
1 similar comment
|
/retest |
|
/retest |
1 similar comment
|
/retest |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release-nextgen-20251011 #67654 +/- ##
=============================================================
Coverage ? 71.8595%
=============================================================
Files ? 1833
Lines ? 493020
Branches ? 0
=============================================================
Hits ? 354282
Misses ? 115390
Partials ? 23348
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
/retest |
2 similar comments
|
/retest |
|
/retest |
133f195
into
pingcap:release-nextgen-20251011
This is an automated cherry-pick of #64769
What problem does this PR solve?
Issue Number: close #64770
Problem Summary:
What changed and how does it work?
For compressed files, it may be time consuming to get compression ratio for each file. Since the ratio we got is also a rough value, here we only sample first 512 (maybe make it configurable) files for each compression type and use harmonic mean to get the average compression ratio.
Check List
Tests
Create 10,000 zstd files on ks3, and import with a 8C instance.
Before:
After:
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
Tests