Conversation
…ad timing Add timedReadSeekCloser and atomicTimedReader to read_timings.go, thread a *readerTimings parameter through newLoadDataParser / GetParser / getParser, and update all call sites so the encode loop can later observe s3_read latency per chunk. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Wire *readerTimings through NewFileChunkProcessor -> newChunkEncoder so that each batch flush swaps per-batch S3/decompress deltas and emits ChunkProcessOpS3Read / ChunkProcessOpDecompress / ChunkProcessOpParse histograms. Fixes the undefined metric.ChunkProcessOpRead build break left by Task 1. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Rename unexported readerTimings to ReaderTimings and add NewReaderTimings() constructor so the external test package can drive the split-histogram observation path. Add "emits split read labels" subtest inside TestFileChunkProcess that verifies all four ChunkProcessSecondsHistogram label buckets (s3_read, decompress, parse, encode) receive at least one observation during a real processor run. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move the write/read benchmark core (RunWrite, RunRead, Cleanup and internal helpers) into pkg/objstore/perfbench so both the CLI tool and the upcoming ADMIN BENCHMARK S3 SQL statement can share one implementation. tools/objstore-perf/main.go is now a thin wrapper; --help output and runtime behavior are unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add AdminBenchmarkS3Read and AdminBenchmarkS3Write enum values, the BenchmarkS3Options struct with its Restore method, a BenchmarkS3 pointer field on AdminStmt, and the corresponding Restore switch cases. Grammar wiring (Task 4) will consume these types. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds BENCHMARK keyword (non-reserved) and the grammar productions for ADMIN BENCHMARK S3 READ 'url' [WITH (key=val, ...)] ADMIN BENCHMARK S3 WRITE 'url' [WITH (key=val, ...)] parsing into ast.AdminStmt with Tp AdminBenchmarkS3Read/Write and BenchmarkS3 options field. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add parser round-trip test cases for the new ADMIN BENCHMARK S3 READ and ADMIN BENCHMARK S3 WRITE statements. Include positive cases to verify grammar acceptance and round-trip restore correctness, plus negative cases for malformed statements and invalid keywords. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Wire AdminBenchmarkS3Exec into the executor builder; parses WITH options into perfbench.Config, opens the object store, calls RunRead/RunWrite with wall-time measurement, and returns a (metric, value) result set. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@joechenrh I've received your pull request and will start the review. I'll conduct a thorough review covering code quality, potential issues, and implementation details. ⏳ This process typically takes 10-30 minutes depending on the complexity of the changes. ℹ️ Learn more details on Pantheon AI. |
|
Hi @joechenrh. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces S3 object-store benchmarking functionality via the new Changes
Sequence DiagramsequenceDiagram
participant Client
participant Parser
participant Planner
participant Executor as AdminBenchmarkS3Executor
participant ObjStore as ObjectStore
participant Perfbench
Client->>Parser: ADMIN BENCHMARK S3 READ/WRITE<br/>'s3://bucket/path' WITH (...)
Parser->>Parser: Parse statement & options
Parser-->>Planner: AdminStmt{AdminBenchmarkS3Read/Write,<br/>BenchmarkS3Options}
Planner->>Planner: buildAdmin() recognizes<br/>AdminBenchmarkS3Read/Write
Planner-->>Executor: AdminBenchmarkS3 plan<br/>(IsWrite, Options, Schema)
Executor->>Executor: Next(ctx, req):<br/>build perfbench.Config
Executor->>ObjStore: ParseBackend(URL)
ObjStore-->>Executor: Object store instance
Executor->>ObjStore: Open store with<br/>access recording
ObjStore-->>Executor: Ready
alt IsWrite
Executor->>Perfbench: RunWrite(ctx, store, cfg)
Perfbench->>ObjStore: Create objects<br/>(concurrent workers)
ObjStore-->>Perfbench: Status
Perfbench-->>Executor: Result{Bytes, Objects,<br/>Elapsed}
else IsRead
Executor->>Perfbench: RunRead(ctx, store, cfg)
Perfbench->>ObjStore: Open & read objects<br/>(concurrent workers)
ObjStore-->>Perfbench: Status
Perfbench-->>Executor: Result{Bytes, Objects,<br/>Elapsed}
end
Executor->>Executor: Compute MB/s throughput
Executor->>Executor: Format rows<br/>(mode, bytes, ops, duration, MB/s, concurrency)
Executor-->>Client: Result rows via Next()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
🧹 Nitpick comments (9)
pkg/objstore/perfbench/perfbench.go (2)
219-223: Nit:min(cfg.Workers, cfg.PrepareFiles)silently no-ops when either is 0.If a caller sets
PrepareRead=truebut forgetsPrepareFiles(orWorkers),prepareReadFilesreturns with no files written andRunReadthen fails later with"no input files for read mode". Consider returning an explicit error up-front whenPrepareFiles <= 0(paired with the validation suggested above).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/objstore/perfbench/perfbench.go` around lines 219 - 223, The loop uses workers := min(cfg.Workers, cfg.PrepareFiles) which silently becomes 0 when either cfg.Workers or cfg.PrepareFiles is 0; add explicit validation in the start of prepareReadFiles (or the caller path that handles PrepareRead) to return an error when cfg.PrepareFiles <= 0 (and/or cfg.Workers <= 0) instead of proceeding, and ensure you check PrepareRead together with PrepareFiles so prepareReadFiles returns a clear error (e.g., "PrepareFiles must be > 0 when PrepareRead is true") before computing workers or starting workerFn goroutines; update any callers (RunRead) to propagate that error.
196-217: Style: useatomic.Uint64for consistency with the rest of the file.Everywhere else in this file counters use
atomic.Uint64(bytesTotal,objectTotal,nextIdx).prepareReadFilesuses the olderatomic.AddUint64(&nextTask, 1)form. Consider switching toatomic.Uint64for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/objstore/perfbench/perfbench.go` around lines 196 - 217, Replace the plain uint64 counter with an atomic.Uint64 to match other counters (change "var nextTask uint64" to "var nextTask atomic.Uint64") and update the increment in workerFn from the old call "int(atomic.AddUint64(&nextTask, 1) - 1)" to use the typed API "int(nextTask.Add(1) - 1)"; keep the rest of workerFn (including errOnce/runErr and path/writeOneObject) unchanged so behavior is identical but consistent with bytesTotal/objectTotal/nextIdx usage.pkg/parser/misc.go (1)
204-206: Minor: alphabetical order of new keyword.Per the file comment ("Please try to keep the map in alphabetical order"),
"BENCHMARK"should appear before"BETWEEN"(and the surroundingBETWEEN/BERNOULLIpair is already slightly out of order). Consider tightening ordering while touching this area.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/parser/misc.go` around lines 204 - 206, The map entries for SQL keywords are out of alphabetical order: adjust the ordering so "BENCHMARK" comes before "BETWEEN" and ensure "BERNOULLI" is placed correctly; specifically, in the map where the keys "BETWEEN", "BENCHMARK", and "BERNOULLI" appear, reorder them to alphabetical order (e.g., "BENCHMARK", "BETWEEN", "BERNOULLI") to satisfy the file's alphabetical convention.pkg/parser/parser_test.go (1)
556-559: Add coverage for the remaining BENCHMARK S3 option contract.Lines 556-559 cover core happy paths, but they don’t explicitly validate
cleanup_synthor option-name quoting round-trip behavior. Adding those cases here would better lock in the parser/restore contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/parser/parser_test.go` around lines 556 - 559, Extend the BENCHMARK S3 test cases in parser_test.go by adding rows that cover the cleanup_synth option and verify option-name quoting round-trip behavior: add an input like "admin benchmark s3 read 's3://bucket/p/' WITH (cleanup_synth=true)" expecting the normalized SQL to include "CLEANUP_SYNTH=true", and add a case that ensures quoted option names (e.g., "WITH ('file-size'='256MiB')") parse and restore to the canonical quoted form; update the test table where the existing BENCHMARK S3 cases live (the rows with inputs like "admin benchmark s3 write 's3://bucket/p/' WITH (concurrency=16, file_size='256MiB', file_count=32)") so the parser/restore contract for cleanup_synth and quoted option names is asserted.pkg/executor/admin_benchmark_s3_test.go (1)
28-39: Expand coverage to include WRITE and option variants.Line 32 currently exercises only one READ statement. Adding table-driven cases for
READ+WRITEand at least one keyword option path would better protect the new grammar/planner/executor wiring.Based on learnings, for SQL behavior changes in executor, perform targeted unit test plus relevant integration test.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/admin_benchmark_s3_test.go` around lines 28 - 39, Update TestAdminBenchmarkS3ParseAndExec to be table-driven and exercise both "READ" and "WRITE" verbs plus at least one option variant: create a slice of test cases with SQL strings (e.g., ADMIN BENCHMARK S3 READ ..., ADMIN BENCHMARK S3 WRITE ..., and one with a WITH (...) option different from duration) and iterate running tk.ExecToErr for each; for each result assert the error does not contain "syntax error" (reuse the existing require.NotContains assertion) and add subtest names so failures are isolated; keep the same store/tk setup and ensure the comment about storage-layer errors remains.pkg/executor/admin_benchmark_s3.go (1)
76-81:AccessStatsis collected but never surfaced.
accis wired into the store viaAccessRecordingbut never read after the benchmark finishes, so the access/traffic counts it records are discarded. Either drop it (to avoid misleading readers and unnecessary recording overhead) or include its counters in the result rows — they'd be useful signals for an S3 diagnostic (request count, retries, bytes at the objstore layer).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/admin_benchmark_s3.go` around lines 76 - 81, The AccessStats instance (acc) is created and passed into objstore.New via storeapi.Options{AccessRecording: acc} but its counters are never read, so either remove it or include its metrics in the benchmark results. Fix by reading the recorded counters from acc after the benchmark run (where results rows are assembled) and add relevant fields (request count, retries, bytes transferred, etc.) into the result rows; alternatively, if you choose to drop recording, remove recording.AccessStats creation and the AccessRecording option passed to objstore.New to avoid extra overhead. Locate the symbols acc, recording.AccessStats, objstore.New and storeapi.Options in admin_benchmark_s3.go to implement the change.pkg/executor/importer/chunk_process.go (3)
609-610: Redundant back-to-backtime.Now()calls.
startanddataWriteStartare captured at the same instant. Reusestartfor the data-write start point.♻️ Proposed diff
- start := time.Now() - dataWriteStart := time.Now() + start := time.Now() if err := p.dataWriter.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvBatch.dataKVs)); err != nil { if !common.IsContextCanceledError(err) { p.logger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - dataWriteDur := time.Since(dataWriteStart) + dataWriteDur := time.Since(start) indexWriteStart := time.Now()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/chunk_process.go` around lines 609 - 610, The code captures two identical timestamps with back-to-back time.Now() calls (variables start and dataWriteStart); remove the redundant call and set dataWriteStart to reuse start (i.e., assign dataWriteStart = start) where these are declared in the chunk processing routine so only one time.Now() is invoked for both start and dataWriteStart.
484-484: Potentially duplicative INFO line per chunk.
task.End(...)below already emits all the per-duration fields viasummaryFields(), so this adds a second INFO line for every chunk. Fine for perf runs (as called out in the PR), but on large imports with many chunks this doubles chunk-completion log volume. ConsiderDebugby default and gating on a perf flag, or folding the breakdown string intosummaryFieldsso onlytask.Endlogs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/chunk_process.go` at line 484, The p.logger.Info call that logs p.enc.summaryBreakdown() creates a duplicate per-chunk INFO log because task.End(...) already emits summaryFields(); either convert that Info to Debug and gate it behind the existing perf flag (e.g., check the perf/config flag before calling p.logger.Debug with p.enc.summaryBreakdown()), or move the breakdown text into the fields returned by summaryFields() so task.End(...) is the sole emitter; locate and modify the p.logger.Info(...) line, p.enc.summaryBreakdown(), and/or the summaryFields() usage to implement one of these fixes and ensure the perf flag controls verbose logging if you choose the Debug route.
441-463: Percentages denominator excludes deliver time.
total = readTotalDur + encodeTotalDur + sendTotalDuromits deliver, which runs in a parallel goroutine and is typically the dominant tail for IMPORT INTO. The doc comment notes the total can exceed wall time due to parallelism, but it may be surprising that*_pctvalues don't reflect deliver at all given this is emitted frombaseChunkProcessor.Processright next top.deliver.summaryFields(). Consider either including deliver in the denominator (and in the one-liner) or clarifying in the comment that the breakdown covers only the encode subtask.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/importer/chunk_process.go` around lines 441 - 463, The summaryBreakdown in chunkEncoder omits the deliver duration from the percentage denominator; update summaryBreakdown() to include p.deliverTotalDur in total (change total := p.readTotalDur + p.encodeTotalDur + p.sendTotalDur to also add p.deliverTotalDur), add a "deliver=%s" and "deliver_pct=%d%%" placeholder/value to the fmt.Sprintf call, and pass pct(p.deliverTotalDur) into the final argument list; also update the function comment to state that the breakdown now includes the deliver time (this function is summaryBreakdown on chunkEncoder and is emitted near p.deliver.summaryFields()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/executor/admin_benchmark_s3.go`:
- Around line 100-113: The metric labeled "mb_per_sec" is calculated using
1024*1024 (mebibytes/sec) but named as MB/s; update the rows in the benchmark
output to be accurate: either change the key "mb_per_sec" to "mib_per_sec" to
reflect MiB/s (leave the calculation using 1024*1024 and the mbs variable as-is)
or change the calculation of mbs to divide by 1e6 (1000000.0) so the value
represents decimal MB/s and keep the "mb_per_sec" label; locate the mbs variable
and the e.rows assignment (the {"mb_per_sec", fmt.Sprintf("%.2f", mbs)} entry)
to make the corresponding rename or calculation change.
- Around line 179-180: The case handling for "cleanup_synth" currently coerces
values via v == "true" || v == "1" which silently accepts only a couple forms
and treats most inputs as false; change this to use strconv.ParseBool on the
incoming string (in the switch case handling "cleanup_synth" in
admin_benchmark_s3.go), set cfg.Cleanup to the parsed bool, and return/propagate
a parse error if strconv.ParseBool fails so invalid inputs are rejected
consistently (matching the existing concurrency/duration parsing behavior); also
double-check the option key used throughout (ensure the parser/AST and any docs
use "cleanup_synth" rather than "cleanup").
In `@pkg/executor/builder.go`:
- Around line 349-350: Add a unit test that exercises the builder dispatch path
which constructs AdminBenchmarkS3Exec: create table-driven cases that build an
instance of the planner node type AdminBenchmarkS3 with different option
combinations (read vs write, varying concurrency, duration, file_size) and call
the builder method buildAdminBenchmarkS3 (or the public builder entry that
dispatches to it) to produce an executor; assert the returned executor is an
*AdminBenchmarkS3Exec and verify its fields/config (mode, concurrency, duration,
fileSize) match the input plan for each case so changes to executor construction
are caught quickly.
In `@pkg/executor/importer/chunk_process.go`:
- Around line 350-363: The current logic only clamps parseDur to zero when
readDur - s3Dur - decompressDur < 0, which can let s3ReadTotalDur +
decompressTotalDur + parseTotalDur exceed readDur; change the computation so if
s3Dur+decompressDur > readDur you proportionally scale down s3Dur and
decompressDur to fit readDur (scale = readDur / (s3Dur+decompressDur)), set
parseDur = 0, otherwise compute parseDur = readDur - s3Dur - decompressDur as
before; keep the same accumulation into p.s3ReadTotalDur, p.decompressTotalDur
and p.parseTotalDur and preserve the p.readerTimings nil guard and types (s3Dur,
decompressDur, parseDur, readDur, p.s3ReadTotalDur, p.decompressTotalDur,
p.parseTotalDur).
In `@pkg/objstore/perfbench/perfbench.go`:
- Around line 62-70: Validate Config fields early to avoid zero-length buffers
and infinite loops: in RunWrite, RunRead and prepareReadFiles add checks (using
cfg.BlockSize, cfg.ObjectSize, cfg.Workers, cfg.Duration) to return an error if
any of these are <= 0; specifically ensure cfg.BlockSize > 0 before creating
writeBuf/readBuf and cfg.ObjectSize > 0 before calling writeOneObject/read
loops, and reject non-positive Workers and Duration as well. Update error
messages to name the offending field so callers can fix inputs.
In `@pkg/parser/ast/misc.go`:
- Around line 2697-2716: The Restore implementation on BenchmarkS3Options
currently writes the raw b.URL which can contain credentials; update
BenchmarkS3Options.Restore to sanitize the URL before calling ctx.WriteString by
parsing b.URL, stripping any userinfo (username:password) and removing or
redacting sensitive query parameters (e.g., access_key, secret_key, token,
signature, etc.), then reconstructing a safe URL string and writing that
instead; leave the rest of the options loop intact and ensure the sanitized
string is what's passed to ctx.WriteString in Restore.
In `@pkg/parser/parser.y`:
- Line 360: The grammar added a BENCHMARK token but didn't include it in the
UnReservedKeyword production, which makes BENCHMARK falsely reserved and can
break identifiers; update the UnReservedKeyword production (the
UnReservedKeyword rule) to include BENCHMARK (or "BENCHMARK") so that the lexer
token remains available as an unreserved identifier, ensuring the token
declaration 'benchmark "BENCHMARK"' and the UnReservedKeyword production are
kept in sync.
- Around line 12049-12059: The semantic action for the grammar alternative
Identifier "=" stringLit currently wraps $3 with single quotes without escaping
embedded single quotes; modify the action in that production so it first
replaces every single-quote in $3 with two single-quotes (i.e., escape internal
' as '') and then wrap the escaped value in outer single quotes before assigning
to $$ (keep the strings.ToLower($1) key); ensure the strings utility used for
replacement is available in the file.
In `@tools/objstore-perf/BUILD.bazel`:
- Around line 8-14: The BUILD dependency list for the tools/objstore-perf target
is missing the perfbench package referenced by tools/objstore-perf/main.go; open
the deps array in BUILD.bazel (the list containing "//pkg/objstore",
"//pkg/objstore/recording", etc.) and add the missing "//pkg/objstore/perfbench"
entry so Bazel can resolve the import; alternatively run make bazel_prepare to
regenerate BUILD deps if you prefer automated sync, but ensure
"//pkg/objstore/perfbench" is present in deps to fix the missing-dep build
error.
In `@tools/objstore-perf/main.go`:
- Around line 61-136: The CLI currently returns from main on errors (in
parseFlags, objstore.ParseBackend, objstore.New, perfbench.RunWrite/RunRead, and
perfbench.Cleanup) which yields exit code 0; change those error paths to exit
with non-zero status: replace the simple fmt.Printf + return patterns in main
with either os.Exit(1) after logging or use log.Fatalf for fatal errors (e.g.
failures from parseFlags, objstore.New, perfbench.RunWrite/RunRead and when
cfg.cleanup && perfbench.Cleanup fails) so failures in main produce a non-zero
exit code.
---
Nitpick comments:
In `@pkg/executor/admin_benchmark_s3_test.go`:
- Around line 28-39: Update TestAdminBenchmarkS3ParseAndExec to be table-driven
and exercise both "READ" and "WRITE" verbs plus at least one option variant:
create a slice of test cases with SQL strings (e.g., ADMIN BENCHMARK S3 READ
..., ADMIN BENCHMARK S3 WRITE ..., and one with a WITH (...) option different
from duration) and iterate running tk.ExecToErr for each; for each result assert
the error does not contain "syntax error" (reuse the existing
require.NotContains assertion) and add subtest names so failures are isolated;
keep the same store/tk setup and ensure the comment about storage-layer errors
remains.
In `@pkg/executor/admin_benchmark_s3.go`:
- Around line 76-81: The AccessStats instance (acc) is created and passed into
objstore.New via storeapi.Options{AccessRecording: acc} but its counters are
never read, so either remove it or include its metrics in the benchmark results.
Fix by reading the recorded counters from acc after the benchmark run (where
results rows are assembled) and add relevant fields (request count, retries,
bytes transferred, etc.) into the result rows; alternatively, if you choose to
drop recording, remove recording.AccessStats creation and the AccessRecording
option passed to objstore.New to avoid extra overhead. Locate the symbols acc,
recording.AccessStats, objstore.New and storeapi.Options in
admin_benchmark_s3.go to implement the change.
In `@pkg/executor/importer/chunk_process.go`:
- Around line 609-610: The code captures two identical timestamps with
back-to-back time.Now() calls (variables start and dataWriteStart); remove the
redundant call and set dataWriteStart to reuse start (i.e., assign
dataWriteStart = start) where these are declared in the chunk processing routine
so only one time.Now() is invoked for both start and dataWriteStart.
- Line 484: The p.logger.Info call that logs p.enc.summaryBreakdown() creates a
duplicate per-chunk INFO log because task.End(...) already emits
summaryFields(); either convert that Info to Debug and gate it behind the
existing perf flag (e.g., check the perf/config flag before calling
p.logger.Debug with p.enc.summaryBreakdown()), or move the breakdown text into
the fields returned by summaryFields() so task.End(...) is the sole emitter;
locate and modify the p.logger.Info(...) line, p.enc.summaryBreakdown(), and/or
the summaryFields() usage to implement one of these fixes and ensure the perf
flag controls verbose logging if you choose the Debug route.
- Around line 441-463: The summaryBreakdown in chunkEncoder omits the deliver
duration from the percentage denominator; update summaryBreakdown() to include
p.deliverTotalDur in total (change total := p.readTotalDur + p.encodeTotalDur +
p.sendTotalDur to also add p.deliverTotalDur), add a "deliver=%s" and
"deliver_pct=%d%%" placeholder/value to the fmt.Sprintf call, and pass
pct(p.deliverTotalDur) into the final argument list; also update the function
comment to state that the breakdown now includes the deliver time (this function
is summaryBreakdown on chunkEncoder and is emitted near
p.deliver.summaryFields()).
In `@pkg/objstore/perfbench/perfbench.go`:
- Around line 219-223: The loop uses workers := min(cfg.Workers,
cfg.PrepareFiles) which silently becomes 0 when either cfg.Workers or
cfg.PrepareFiles is 0; add explicit validation in the start of prepareReadFiles
(or the caller path that handles PrepareRead) to return an error when
cfg.PrepareFiles <= 0 (and/or cfg.Workers <= 0) instead of proceeding, and
ensure you check PrepareRead together with PrepareFiles so prepareReadFiles
returns a clear error (e.g., "PrepareFiles must be > 0 when PrepareRead is
true") before computing workers or starting workerFn goroutines; update any
callers (RunRead) to propagate that error.
- Around line 196-217: Replace the plain uint64 counter with an atomic.Uint64 to
match other counters (change "var nextTask uint64" to "var nextTask
atomic.Uint64") and update the increment in workerFn from the old call
"int(atomic.AddUint64(&nextTask, 1) - 1)" to use the typed API
"int(nextTask.Add(1) - 1)"; keep the rest of workerFn (including errOnce/runErr
and path/writeOneObject) unchanged so behavior is identical but consistent with
bytesTotal/objectTotal/nextIdx usage.
In `@pkg/parser/misc.go`:
- Around line 204-206: The map entries for SQL keywords are out of alphabetical
order: adjust the ordering so "BENCHMARK" comes before "BETWEEN" and ensure
"BERNOULLI" is placed correctly; specifically, in the map where the keys
"BETWEEN", "BENCHMARK", and "BERNOULLI" appear, reorder them to alphabetical
order (e.g., "BENCHMARK", "BETWEEN", "BERNOULLI") to satisfy the file's
alphabetical convention.
In `@pkg/parser/parser_test.go`:
- Around line 556-559: Extend the BENCHMARK S3 test cases in parser_test.go by
adding rows that cover the cleanup_synth option and verify option-name quoting
round-trip behavior: add an input like "admin benchmark s3 read 's3://bucket/p/'
WITH (cleanup_synth=true)" expecting the normalized SQL to include
"CLEANUP_SYNTH=true", and add a case that ensures quoted option names (e.g.,
"WITH ('file-size'='256MiB')") parse and restore to the canonical quoted form;
update the test table where the existing BENCHMARK S3 cases live (the rows with
inputs like "admin benchmark s3 write 's3://bucket/p/' WITH (concurrency=16,
file_size='256MiB', file_count=32)") so the parser/restore contract for
cleanup_synth and quoted option names is asserted.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 91aafffc-2184-4b32-ac63-c0ba5131014a
📒 Files selected for processing (27)
pkg/executor/admin_benchmark_s3.gopkg/executor/admin_benchmark_s3_test.gopkg/executor/builder.gopkg/executor/importer/chunk_process.gopkg/executor/importer/chunk_process_testkit_test.gopkg/executor/importer/engine_process.gopkg/executor/importer/import.gopkg/executor/importer/import_test.gopkg/executor/importer/read_timings.gopkg/executor/importer/read_timings_test.gopkg/executor/importer/sampler.gopkg/executor/importer/sampler_test.gopkg/executor/importer/table_import.gopkg/executor/load_data.gopkg/lightning/metric/metric.gopkg/lightning/metric/metric_test.gopkg/objstore/perfbench/perfbench.gopkg/objstore/perfbench/perfbench_test.gopkg/parser/ast/misc.gopkg/parser/misc.gopkg/parser/parser.gopkg/parser/parser.ypkg/parser/parser_test.gopkg/planner/core/admin_benchmark_s3_plan.gopkg/planner/core/planbuilder.gotools/objstore-perf/BUILD.bazeltools/objstore-perf/main.go
| mbs := 0.0 | ||
| if res.Elapsed > 0 { | ||
| mbs = float64(res.Bytes) / (1024 * 1024) / res.Elapsed.Seconds() | ||
| } | ||
| mode := "read" | ||
| if e.IsWrite { | ||
| mode = "write" | ||
| } | ||
| e.rows = [][2]string{ | ||
| {"mode", mode}, | ||
| {"total_bytes", strconv.FormatUint(res.Bytes, 10)}, | ||
| {"op_count", strconv.FormatUint(res.Objects, 10)}, | ||
| {"duration_ms", strconv.FormatInt(res.Elapsed.Milliseconds(), 10)}, | ||
| {"mb_per_sec", fmt.Sprintf("%.2f", mbs)}, |
There was a problem hiding this comment.
mb_per_sec is actually MiB/s.
The divisor is 1024 * 1024, not 1_000_000, so the reported metric is mebibytes/sec. Either rename the row label to mib_per_sec or divide by 1e6 to match the MB label. Small, but this is a user-facing diagnostic number and will otherwise mismatch tools/dashboards using decimal MB.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/admin_benchmark_s3.go` around lines 100 - 113, The metric
labeled "mb_per_sec" is calculated using 1024*1024 (mebibytes/sec) but named as
MB/s; update the rows in the benchmark output to be accurate: either change the
key "mb_per_sec" to "mib_per_sec" to reflect MiB/s (leave the calculation using
1024*1024 and the mbs variable as-is) or change the calculation of mbs to divide
by 1e6 (1000000.0) so the value represents decimal MB/s and keep the
"mb_per_sec" label; locate the mbs variable and the e.rows assignment (the
{"mb_per_sec", fmt.Sprintf("%.2f", mbs)} entry) to make the corresponding rename
or calculation change.
| case "cleanup_synth": | ||
| cfg.Cleanup = v == "true" || v == "1" |
There was a problem hiding this comment.
Loose boolean parsing for cleanup_synth.
Values like TRUE, True, yes, on, 0, false, or typos are silently coerced to false with no error. Prefer strconv.ParseBool so inputs are canonical and bad values are rejected explicitly — consistent with how concurrency/duration report parse errors.
Proposed fix
- case "cleanup_synth":
- cfg.Cleanup = v == "true" || v == "1"
+ case "cleanup_synth":
+ b, err := strconv.ParseBool(v)
+ if err != nil {
+ return errors.Annotatef(err, "option cleanup_synth=%q", raw)
+ }
+ cfg.Cleanup = bAlso note: the PR summary advertises this option as cleanup_synth — please double-check the parser/AST uses the same key (not cleanup, as the enriched AI summary suggests).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case "cleanup_synth": | |
| cfg.Cleanup = v == "true" || v == "1" | |
| case "cleanup_synth": | |
| b, err := strconv.ParseBool(v) | |
| if err != nil { | |
| return errors.Annotatef(err, "option cleanup_synth=%q", raw) | |
| } | |
| cfg.Cleanup = b |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/admin_benchmark_s3.go` around lines 179 - 180, The case handling
for "cleanup_synth" currently coerces values via v == "true" || v == "1" which
silently accepts only a couple forms and treats most inputs as false; change
this to use strconv.ParseBool on the incoming string (in the switch case
handling "cleanup_synth" in admin_benchmark_s3.go), set cfg.Cleanup to the
parsed bool, and return/propagate a parse error if strconv.ParseBool fails so
invalid inputs are rejected consistently (matching the existing
concurrency/duration parsing behavior); also double-check the option key used
throughout (ensure the parser/AST and any docs use "cleanup_synth" rather than
"cleanup").
| case *plannercore.AdminBenchmarkS3: | ||
| return b.buildAdminBenchmarkS3(v) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Looking for tests that cover ADMIN BENCHMARK S3 planning/execution..."
rg -n --type=go -C2 'AdminBenchmarkS3|ADMIN BENCHMARK S3|buildAdminBenchmarkS3' -g '*_test.go'
echo
echo "Looking for parser/planner/executor test references to the new statement..."
rg -n --type=go -C2 'BENCHMARK S3|AdminBenchmarkS3' pkg/parser pkg/planner pkg/executor -g '*_test.go'Repository: pingcap/tidb
Length of output: 4708
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Reading executor test file for AdminBenchmarkS3..."
wc -l pkg/executor/admin_benchmark_s3_test.go
echo -e "\n=== Full test file contents ==="
cat -n pkg/executor/admin_benchmark_s3_test.go
echo -e "\n=== Checking for additional test functions or subtests ==="
rg -n "func Test|t\.Run\(" pkg/executor/admin_benchmark_s3_test.goRepository: pingcap/tidb
Length of output: 2120
Add a targeted unit test for the executor dispatch and AdminBenchmarkS3Exec construction.
An integration test (TestAdminBenchmarkS3ParseAndExec) confirms end-to-end parse-plan-execute flow, but there is no unit test for the executor dispatch itself. Per the repo's expectation for executor SQL behavior changes, add a targeted unit test that directly tests the builder's construction of AdminBenchmarkS3Exec with different option combinations (e.g., read vs. write, varying concurrency/duration/file_size settings).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/builder.go` around lines 349 - 350, Add a unit test that
exercises the builder dispatch path which constructs AdminBenchmarkS3Exec:
create table-driven cases that build an instance of the planner node type
AdminBenchmarkS3 with different option combinations (read vs write, varying
concurrency, duration, file_size) and call the builder method
buildAdminBenchmarkS3 (or the public builder entry that dispatches to it) to
produce an executor; assert the returned executor is an *AdminBenchmarkS3Exec
and verify its fields/config (mode, concurrency, duration, fileSize) match the
input plan for each case so changes to executor construction are caught quickly.
| var s3Dur, decompressDur time.Duration | ||
| if p.readerTimings != nil { | ||
| s3Dur = time.Duration(p.readerTimings.s3Read.Swap(0)) | ||
| decompressDur = time.Duration(p.readerTimings.decompress.Swap(0)) | ||
| } | ||
| parseDur := readDur - s3Dur - decompressDur | ||
| if parseDur < 0 { | ||
| // Defensive clamp; can happen if the timed reader wrapped calls that | ||
| // weren't part of readFn's readDur window. | ||
| parseDur = 0 | ||
| } | ||
| p.s3ReadTotalDur += s3Dur | ||
| p.decompressTotalDur += decompressDur | ||
| p.parseTotalDur += parseDur |
There was a problem hiding this comment.
Clamping parse only may cause split durations to exceed readTotalDur.
When readDur - s3Dur - decompressDur < 0, parseDur is clamped to 0 but the raw s3Dur/decompressDur are still accumulated. Consequently s3ReadTotalDur + decompressTotalDur + parseTotalDur can exceed readTotalDur, which makes the percentages in summaryBreakdown (denominated by readTotalDur + encodeTotalDur + sendTotalDur) slightly inconsistent with the component sums.
Not critical since the known source of negative values (compressed sources) is called out, but consider proportionally scaling s3Dur/decompressDur down to readDur so the sub-totals remain bounded by readTotalDur.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/executor/importer/chunk_process.go` around lines 350 - 363, The current
logic only clamps parseDur to zero when readDur - s3Dur - decompressDur < 0,
which can let s3ReadTotalDur + decompressTotalDur + parseTotalDur exceed
readDur; change the computation so if s3Dur+decompressDur > readDur you
proportionally scale down s3Dur and decompressDur to fit readDur (scale =
readDur / (s3Dur+decompressDur)), set parseDur = 0, otherwise compute parseDur =
readDur - s3Dur - decompressDur as before; keep the same accumulation into
p.s3ReadTotalDur, p.decompressTotalDur and p.parseTotalDur and preserve the
p.readerTimings nil guard and types (s3Dur, decompressDur, parseDur, readDur,
p.s3ReadTotalDur, p.decompressTotalDur, p.parseTotalDur).
| func RunWrite(ctx context.Context, store storeapi.Storage, cfg Config) (Result, error) { | ||
| deadlineCtx, cancel := context.WithTimeout(ctx, cfg.Duration) | ||
| defer cancel() | ||
|
|
||
| writeBuf := make([]byte, cfg.BlockSize) | ||
| for i := range writeBuf { | ||
| writeBuf[i] = byte(i) | ||
| } | ||
|
|
There was a problem hiding this comment.
Guard against BlockSize <= 0 to avoid an infinite loop in writeOneObject.
writeBuf is sized from cfg.BlockSize. If a caller passes BlockSize <= 0, writeBuf is empty, so the for written < objectSize loop in writeOneObject keeps calling writer.Write(ctx, chunk) with an empty slice and written never advances — a hot infinite loop until context cancel. Same applies to RunRead's readBuf (line 132) and prepareReadFiles (line 191). The CLI validates >0, but since perfbench is exported and now also driven by the SQL executor with its own defaults, it is safer to validate in Config at entry (and likewise reject non-positive ObjectSize, Workers, Duration).
🛡️ Suggested validation
func RunWrite(ctx context.Context, store storeapi.Storage, cfg Config) (Result, error) {
+ if cfg.BlockSize <= 0 || cfg.ObjectSize <= 0 || cfg.Workers <= 0 || cfg.Duration <= 0 {
+ return Result{}, fmt.Errorf("invalid perfbench config: block=%d object=%d workers=%d duration=%s",
+ cfg.BlockSize, cfg.ObjectSize, cfg.Workers, cfg.Duration)
+ }
deadlineCtx, cancel := context.WithTimeout(ctx, cfg.Duration)Also applies to: 228-263
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/objstore/perfbench/perfbench.go` around lines 62 - 70, Validate Config
fields early to avoid zero-length buffers and infinite loops: in RunWrite,
RunRead and prepareReadFiles add checks (using cfg.BlockSize, cfg.ObjectSize,
cfg.Workers, cfg.Duration) to return an error if any of these are <= 0;
specifically ensure cfg.BlockSize > 0 before creating writeBuf/readBuf and
cfg.ObjectSize > 0 before calling writeOneObject/read loops, and reject
non-positive Workers and Duration as well. Update error messages to name the
offending field so callers can fix inputs.
| func (b *BenchmarkS3Options) Restore(ctx *format.RestoreCtx) error { | ||
| ctx.WriteString(b.URL) | ||
| if len(b.Options) == 0 { | ||
| return nil | ||
| } | ||
| ctx.WriteKeyWord(" WITH ") | ||
| ctx.WritePlain("(") | ||
| keys := make([]string, 0, len(b.Options)) | ||
| for k := range b.Options { | ||
| keys = append(keys, k) | ||
| } | ||
| sort.Strings(keys) | ||
| for i, k := range keys { | ||
| if i > 0 { | ||
| ctx.WritePlain(", ") | ||
| } | ||
| ctx.WritePlain(k) | ||
| ctx.WritePlain("=") | ||
| ctx.WritePlain(b.Options[k]) | ||
| } |
There was a problem hiding this comment.
Redact BENCHMARK S3 URLs during restore to avoid credential leakage.
Line 2698 writes the raw URL back into restored SQL. If credentials are embedded in query params, they can leak via processlist/logging paths.
🔒 Proposed fix
func (b *BenchmarkS3Options) Restore(ctx *format.RestoreCtx) error {
- ctx.WriteString(b.URL)
+ // Avoid exposing credentials in SQL text surfaces (e.g. processlist/logs).
+ ctx.WriteString(RedactURL(b.URL))
if len(b.Options) == 0 {
return nil
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/parser/ast/misc.go` around lines 2697 - 2716, The Restore implementation
on BenchmarkS3Options currently writes the raw b.URL which can contain
credentials; update BenchmarkS3Options.Restore to sanitize the URL before
calling ctx.WriteString by parsing b.URL, stripping any userinfo
(username:password) and removing or redacting sensitive query parameters (e.g.,
access_key, secret_key, token, signature, etc.), then reconstructing a safe URL
string and writing that instead; leave the rest of the options loop intact and
ensure the sanitized string is what's passed to ctx.WriteString in Restore.
| backups "BACKUPS" | ||
| bdr "BDR" | ||
| begin "BEGIN" | ||
| benchmark "BENCHMARK" |
There was a problem hiding this comment.
Keep BENCHMARK truly unreserved to avoid parser regressions.
Line 360 introduces BENCHMARK as an unreserved token, but it is not added to the UnReservedKeyword production list (around Line 7244+). That can break existing SQL that uses benchmark as an unquoted identifier.
🔧 Proposed fix
UnReservedKeyword:
"ACTION"
...
| "BEGIN"
+| "BENCHMARK"
| "BIT"
...📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| benchmark "BENCHMARK" | |
| UnReservedKeyword: | |
| "ACTION" | |
| ... | |
| | "BEGIN" | |
| | "BENCHMARK" | |
| | "BIT" | |
| ... |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/parser/parser.y` at line 360, The grammar added a BENCHMARK token but
didn't include it in the UnReservedKeyword production, which makes BENCHMARK
falsely reserved and can break identifiers; update the UnReservedKeyword
production (the UnReservedKeyword rule) to include BENCHMARK (or "BENCHMARK") so
that the lexer token remains available as an unreserved identifier, ensuring the
token declaration 'benchmark "BENCHMARK"' and the UnReservedKeyword production
are kept in sync.
| Identifier "=" stringLit | ||
| { | ||
| // Preserve single quotes in the stored value so Restore reproduces the | ||
| // original form and so the executor can tell string literals from | ||
| // numerics without a separate type marker. | ||
| $$ = [2]string{strings.ToLower($1), "'" + $3 + "'"} | ||
| } | ||
| | Identifier "=" NUM | ||
| { | ||
| $$ = [2]string{strings.ToLower($1), strconv.FormatUint(getUint64FromNUM($3), 10)} | ||
| } |
There was a problem hiding this comment.
Escape embedded single quotes before re-wrapping string literals.
At Line 12054, wrapping with "'" + $3 + "'" does not escape internal ', which can produce invalid restored SQL for values like a'b.
🔧 Proposed fix
AdminBenchmarkS3OptionItem:
Identifier "=" stringLit
{
// Preserve single quotes in the stored value so Restore reproduces the
// original form and so the executor can tell string literals from
// numerics without a separate type marker.
- $$ = [2]string{strings.ToLower($1), "'" + $3 + "'"}
+ escaped := strings.ReplaceAll($3, "'", "''")
+ $$ = [2]string{strings.ToLower($1), "'" + escaped + "'"}
}
| Identifier "=" NUM📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Identifier "=" stringLit | |
| { | |
| // Preserve single quotes in the stored value so Restore reproduces the | |
| // original form and so the executor can tell string literals from | |
| // numerics without a separate type marker. | |
| $$ = [2]string{strings.ToLower($1), "'" + $3 + "'"} | |
| } | |
| | Identifier "=" NUM | |
| { | |
| $$ = [2]string{strings.ToLower($1), strconv.FormatUint(getUint64FromNUM($3), 10)} | |
| } | |
| Identifier "=" stringLit | |
| { | |
| // Preserve single quotes in the stored value so Restore reproduces the | |
| // original form and so the executor can tell string literals from | |
| // numerics without a separate type marker. | |
| escaped := strings.ReplaceAll($3, "'", "''") | |
| $$ = [2]string{strings.ToLower($1), "'" + escaped + "'"} | |
| } | |
| | Identifier "=" NUM | |
| { | |
| $$ = [2]string{strings.ToLower($1), strconv.FormatUint(getUint64FromNUM($3), 10)} | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/parser/parser.y` around lines 12049 - 12059, The semantic action for the
grammar alternative Identifier "=" stringLit currently wraps $3 with single
quotes without escaping embedded single quotes; modify the action in that
production so it first replaces every single-quote in $3 with two single-quotes
(i.e., escape internal ' as '') and then wrap the escaped value in outer single
quotes before assigning to $$ (keep the strings.ToLower($1) key); ensure the
strings utility used for replacement is available in the file.
| deps = [ | ||
| "//pkg/lightning/backend/external", | ||
| "//pkg/objstore", | ||
| "//pkg/objstore/recording", | ||
| "//pkg/objstore/storeapi", | ||
| "@com_github_docker_go_units//:go-units", | ||
| ], |
There was a problem hiding this comment.
Missing //pkg/objstore/perfbench dependency — Bazel build will fail.
tools/objstore-perf/main.go imports github.com/pingcap/tidb/pkg/objstore/perfbench but this dep is not listed here, so bazel build //tools/objstore-perf/... will fail with a missing-dep error. Re-running make bazel_prepare should regenerate this (and it's listed in the PR's deferred tasks).
🔧 Proposed fix
deps = [
"//pkg/lightning/backend/external",
"//pkg/objstore",
+ "//pkg/objstore/perfbench",
"//pkg/objstore/recording",
"//pkg/objstore/storeapi",
"@com_github_docker_go_units//:go-units",
],📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| deps = [ | |
| "//pkg/lightning/backend/external", | |
| "//pkg/objstore", | |
| "//pkg/objstore/recording", | |
| "//pkg/objstore/storeapi", | |
| "@com_github_docker_go_units//:go-units", | |
| ], | |
| deps = [ | |
| "//pkg/lightning/backend/external", | |
| "//pkg/objstore", | |
| "//pkg/objstore/perfbench", | |
| "//pkg/objstore/recording", | |
| "//pkg/objstore/storeapi", | |
| "@com_github_docker_go_units//:go-units", | |
| ], |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/objstore-perf/BUILD.bazel` around lines 8 - 14, The BUILD dependency
list for the tools/objstore-perf target is missing the perfbench package
referenced by tools/objstore-perf/main.go; open the deps array in BUILD.bazel
(the list containing "//pkg/objstore", "//pkg/objstore/recording", etc.) and add
the missing "//pkg/objstore/perfbench" entry so Bazel can resolve the import;
alternatively run make bazel_prepare to regenerate BUILD deps if you prefer
automated sync, but ensure "//pkg/objstore/perfbench" is present in deps to fix
the missing-dep build error.
| func main() { | ||
| cfg, err := parseFlags() | ||
| if err != nil { | ||
| fmt.Printf("config error: %v\n", err) | ||
| return | ||
| } | ||
|
|
||
| baseCtx := context.Background() | ||
| acc := &recording.AccessStats{} | ||
| backend, err := objstore.ParseBackend(cfg.url, nil) | ||
| if err != nil { | ||
| fmt.Printf("parse url failed: %v\n", err) | ||
| return | ||
| } | ||
| store, err := objstore.New(baseCtx, backend, &storeapi.Options{AccessRecording: acc}) | ||
| if err != nil { | ||
| fmt.Printf("open object store failed: %v\n", err) | ||
| return | ||
| } | ||
| defer store.Close() | ||
|
|
||
| fmt.Printf("mode=%s url=%s prefix=%s workers=%d duration=%s object-size=%s block-size=%s writer-concurrency=%d part-size=%s\n", | ||
| cfg.mode, cfg.url, cfg.prefix, cfg.workers, cfg.duration, | ||
| units.BytesSize(float64(cfg.objectSize)), units.BytesSize(float64(cfg.blockSize)), cfg.writerConcurrency, | ||
| units.BytesSize(float64(cfg.partSize))) | ||
|
|
||
| pbCfg := perfbench.Config{ | ||
| Mode: cfg.mode, | ||
| Prefix: cfg.prefix, | ||
| Duration: cfg.duration, | ||
| Workers: cfg.workers, | ||
| ObjectSize: cfg.objectSize, | ||
| BlockSize: cfg.blockSize, | ||
| WriterConcurrency: cfg.writerConcurrency, | ||
| PartSize: cfg.partSize, | ||
| PrepareRead: cfg.prepareRead, | ||
| PrepareFiles: cfg.prepareFiles, | ||
| PrefetchSize: cfg.prefetchSize, | ||
| Cleanup: cfg.cleanup, | ||
| } | ||
|
|
||
| var runRes perfbench.Result | ||
| start := time.Now() | ||
| switch cfg.mode { | ||
| case modeWrite: | ||
| runRes, err = perfbench.RunWrite(baseCtx, store, pbCfg) | ||
| case modeRead: | ||
| runRes, err = perfbench.RunRead(baseCtx, store, pbCfg) | ||
| default: | ||
| err = fmt.Errorf("unsupported mode: %s", cfg.mode) | ||
| } | ||
| if err != nil { | ||
| fmt.Printf("run failed: %v\n", err) | ||
| return | ||
| } | ||
| runRes.Elapsed = time.Since(start) | ||
|
|
||
| mb := float64(runRes.Bytes) / units.MiB | ||
| sec := runRes.Elapsed.Seconds() | ||
| if sec <= 0 { | ||
| sec = 1 | ||
| } | ||
| fmt.Printf( | ||
| "done: bytes=%d (%.2f MiB) objects=%d elapsed=%s throughput=%.2f MiB/s\n", | ||
| runRes.Bytes, mb, runRes.Objects, runRes.Elapsed, mb/sec, | ||
| ) | ||
| fmt.Printf("objstore access: requests=%s traffic=%s\n", acc.Requests.String(), acc.Traffic.String()) | ||
|
|
||
| if cfg.cleanup { | ||
| if err := perfbench.Cleanup(baseCtx, store, cfg.prefix); err != nil { | ||
| fmt.Printf("cleanup failed: %v\n", err) | ||
| return | ||
| } | ||
| fmt.Println("cleanup done") | ||
| } | ||
| } |
There was a problem hiding this comment.
CLI exits 0 on errors; use a non-zero exit code on failures.
Every failure path prints and returns from main, which means the process exits with status 0 even when config parsing, store open, or the benchmark run failed. For a CLI driven from scripts/CI this hides failures. Prefer os.Exit(1) (or log.Fatalf) on error paths, including cleanup failed.
🔧 Example
- cfg, err := parseFlags()
- if err != nil {
- fmt.Printf("config error: %v\n", err)
- return
- }
+ cfg, err := parseFlags()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "config error: %v\n", err)
+ os.Exit(1)
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/objstore-perf/main.go` around lines 61 - 136, The CLI currently returns
from main on errors (in parseFlags, objstore.ParseBackend, objstore.New,
perfbench.RunWrite/RunRead, and perfbench.Cleanup) which yields exit code 0;
change those error paths to exit with non-zero status: replace the simple
fmt.Printf + return patterns in main with either os.Exit(1) after logging or use
log.Fatalf for fatal errors (e.g. failures from parseFlags, objstore.New,
perfbench.RunWrite/RunRead and when cfg.cleanup && perfbench.Cleanup fails) so
failures in main produce a non-zero exit code.
Adding `benchmark` to the keyword token table shifted `SELECT BENCHMARK(...)`
off the `identifier '(' ...)` grammar path and broke existing parser tests
(e.g. TestBuiltin's `SELECT BENCHMARK(1000000, AES_ENCRYPT(...))` case).
Register BENCHMARK in UnReservedKeyword so it remains usable as a bare
identifier, and in FunctionNameConflict so the keyword token still reaches
FunctionCallKeyword. Regenerate parser.go.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
parser/ast: redact secret tokens in BenchmarkS3Options.Restore. The raw URL carried through EXPLAIN / slow-log / audit output contained access-key, secret-access-key, and session-token query parameters, matching the leak class BackupStmt and LoadDataStmt already redact. objstore/perfbench: - allocate the read buffer per worker instead of sharing one buffer across goroutines (data race under -race with Workers > 1). - populate Result.Elapsed from the worker-pool window; previously the read elapsed accidentally rolled in the prepareReadFiles phase when measured by the caller, producing systematically low throughput. - drop the Mode and Cleanup fields from Config; both were dead API (callers pick read vs write by calling RunRead/RunWrite, and cleanup is the caller's responsibility via the Cleanup function). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Whitelist object-store URL schemes (s3/ks3/oss/gs/gcs/azure/azblob); without this a SUPER-privileged user could feed file:/// or no-scheme URLs and have the executor write up to Workers * Duration bytes to the TiDB server's local disk while the statement name still advertises "S3". - Validate WITH-clause values (concurrency > 0, duration > 0, positive file_size, positive file_count for READ) so typos like duration='0' don't silently return an empty result. - Mark the executor done on runBenchmark failure so a second Next() can't re-trigger a 30-second real-S3 probe on retry. - Namespace the default prefix with a nanosecond timestamp so two concurrent sessions don't collide on identical worker/seq keys. - Clean up synthetic objects after the run (default cleanup_synth=true, overridable); otherwise every READ leaked ~8 GiB of seed files into the bucket. - Populate requests/traffic rows from AccessStats for CLI parity. - Log benchmark start/finish with the redacted URL so a SUPER-priv run leaves a server-side trail. - Expand tests to cover: scheme whitelist, unknown option, bad duration, and regression coverage for SELECT BENCHMARK(...). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pick up the new admin_benchmark_s3 srcs/deps on pkg/executor, the new pkg/objstore/perfbench package, and the executor/importer additions from the read-timings work (`read_timings.go`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Branch 1 of the encode-step optimization plan: reduce per-worker S3 read
time by issuing concurrent range GETs instead of a single sequential
connection. Encode CPU work also hides behind the async prefetch buffer.
Changes:
- pkg/util/prefetch: new ParallelReader that reads a byte range via N
concurrent range fetches and reassembles the stream in order. Includes
unit tests for ordering under jitter, non-aligned tails, small files,
error propagation, close cancellation, and bounded outstanding fetches.
- pkg/objstore/{s3like,s3store,gcs}: when PrefetchSize > 16 MiB, switch
from the sequential double-buffered prefetch.Reader to ParallelReader
with 8 MiB blocks and up to 8 concurrent fetches. Applied in Open() and
in the Seek-reopen path.
- pkg/lightning/mydump: OpenReader takes a new prefetchSize parameter
and plumbs it through the ReaderOption.
- pkg/executor/importer: new DefaultEncodeReadPrefetchSize = 32 MiB;
IMPORT INTO encode-path OpenReader callers pass it. Sampler and legacy
lightning callers pass 0 (no behavior change).
Expected impact on per-worker encode-step wall time: 34 MiB/s single-
connection read → approaches pod NIC ceiling; encode CPU (1.6 s/chunk)
overlaps with read instead of serializing after it. Projected cluster
wall-time saving on the observed 5000-file sbtest workload: ~10-13 min.
Adds optional startOffset/endOffset tracking on byteReader and an io.Reader Read() method delegating to readNBytes. Existing newByteReader and its callers are unchanged; new bounded reads use the new newBoundedByteReader helper. For M2, consumers of MultipleFilesStatV2 segments will use this to read a byte range inside a shared physical data file.
Each flush creates a fresh OneFileWriter, sorts KVs per kvGroup, streams them in deterministic order (KVGroupData first, indexIDs ascending), and emits a V2 segment sidecar alongside the V1 stat file. Adds OneFileWriter.WrittenKVBytes() — exposes kvStore.offset for same- package callers that need cumulative byte offsets (the existing writtenBytes field resets every 16 MiB for metrics).
AppendRow swaps active halves when the current half exceeds HalfMemSizeLimit, enqueuing the just-deactivated half for background flush. flushCh has capacity 1, so a second auto-flush blocks until the first completes — natural backpressure without busy-waiting or extra condition variables. Close now flushes any data remaining in the active half before closing the flush channel. TestUnifiedWriter_AppendRowContinuesDuringFlush asserts that AppendRow returns in <50 ms even when the underlying storage Create takes 150 ms (via a slowStorage test wrapper), verifying the ping-pong hides flush latency from the caller.
- external/BUILD.bazel: picks up stat_v2.go, unified_writer.go, and the three new test files from M1 Tasks 2, 4, 5, 6, 7. - prefetch/BUILD.bazel: catches up parallel_reader.go + its test file (added during Branch 1 on feature/encode-step-timers; prior bazel_prepare run had not been done). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Earlier M1 commits introduced a parallel MultipleFilesStatV2 struct and a .v2 sidecar file. Use the existing MultipleFilesStat name instead, with a new Segments []KVGroupSegment field populated by UnifiedWriter. Segments flow to downstream consumers in memory via WriterSummary / MultiFileStats — the on-disk sidecar (and its encoder/decoder) is removed because the existing in-memory propagation path is sufficient. Dropped: MultipleFilesStatV2, statFormatVersion, statFormatV2, encodeMultipleFilesStatV2, decodeMultipleFilesStatV2, storeapiWriteAdapter, stat_v2.go, stat_v2_test.go, the findV2Sidecar test helper, and the .v2 file emission in UnifiedWriter.flushHalf. Kept: KVGroupID, KVGroupData, KVGroupSegment (new types, no existing parallel), UnifiedWriter, byteReader endOffset, and all passing tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…KVGroup Adds FinishKVGroup which forces the current range-property block to flush and returns the cumulative stat-file offset. UnifiedWriter calls it between kvGroups so each segment's [StatStart, StatEnd) contains only that kvGroup's range properties. Adds statWrittenBytes to OneFileWriter as the cumulative counter, updated at each statWriter.Write (including the pre-existing buffer-exhaustion path and closeImpl path).
…-bounded reads Consumers of MultipleFilesStat.Segments need to read a specific byte range of a shared physical data/stat file. Adds endOffset int64 to both constructors: when > 0, the underlying byteReader enforces the bound and returns io.EOF after the last byte in the segment is read. endOffset == 0 preserves the pre-segment unbounded behavior; existing callers pass 0 as the new argument. newStatsReader also gains an explicit startOffset (was implicitly 0) so callers can point at a mid-file segment's stat byte range. Keeps the per-reader prefetch (oneThird*2 for KVReader, 250 KiB for statsReader) instead of routing through newBoundedByteReader, which disables prefetch.
… present When MultipleFilesStat.Segments is populated (UnifiedWriter output), MergePropIter opens stat readers with per-segment byte bounds instead of reading whole stat files, and RangeSplitter resolves the active data/stat path from the segment rather than Filenames[idx2]. Callers that want per-kvGroup ranges must filter the input MultipleFilesStat so Segments contains only the target kvGroup's entries; Filenames should carry the unique (dataFile, statFile) pairs for downstream path lookup. Legacy callers pass MultipleFilesStat with empty Segments; the iterator falls through to the current whole-file behavior and RangeSplitter keeps using Filenames[idx2] as before. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Integration test: UnifiedWriter produces multi-kvGroup files; after M2 Task 3's segment-aware RangeSplitter, MergeOverlappingFilesV2 consumes those files correctly when the caller filters multiFileStat to one kvGroup (Segments containing only that kvGroup's entries, Filenames containing the unique (dataFile, statFile) pairs). No production code changes: V2's existing pipeline works transparently with segment-populated MultipleFilesStat thanks to T3.
…n encode step The global-sort encode step now routes all KVs (data + every index) through a single external.UnifiedWriter per worker. The per-worker memory formula changes from 0.5 × memPerCore / (N+3) shares to 2 × 0.3 × memPerCore (double-buffered halves). IndexRouteWriter and its writerFactory are deleted; dataDeliver directly drives AppendRow(kvGroup, key, val) per KV. A thin package-local KVWriter interface (AppendDataKVs / AppendIndexKVs) unifies both sort modes' delivery path. engineKVWriter wraps the legacy pair of backend.EngineWriter (local sort); unifiedKVWriter wraps *external.UnifiedWriter (global sort). chunkWorker holds the per-worker UnifiedWriter and, on Close, decomposes its aggregate MultiFileStats into per-kvGroup SortedKVMeta entries that downstream merge / ingest steps consume. The non-global-sort (local-sort) path is unchanged — ProcessChunk still opens local engine writers and wraps them with NewEngineKVWriter. MiniTaskExecutor.Run / ProcessChunkWithWriter / NewFileChunkProcessor / newQueryChunkProcessor now take a single KVWriter in place of two backend.EngineWriter parameters. getWriterMemorySizeLimit is retained only for the still-present merge-sort step executor; the encode step uses the new unifiedMemFormula. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the encode step uses UnifiedWriter (M2), each kvGroup's MultipleFilesStat entries carry per-kvGroup byte segments. Propagate these segments into WriteIngestStepMeta.Segments so the ingest subtask can open each segment via bounded reads (Task 8 consumes this field). The field is omitempty — legacy (non-UnifiedWriter) producers omit it and the ingest path falls through to the DataFiles/StatFiles flow.
ExternalEngineConfig now carries a Segments field populated by the IMPORT INTO ingest subtask from WriteIngestStepMeta.Segments (produced upstream in M2 Task 7). Engine gains a SetSegments accessor for future per-segment bounded reads, wired from engine_mgr.closeEngine. Correctness of ingest does not yet depend on Segments: the read path (RangeSplitter via MergePropIter) filters by key range, and kvGroup key namespaces are disjoint in the tidbkv encoding. The pre-existing TestRangeSplitter_SegmentFiltering (pkg/lightning/backend/external/split_segments_test.go) already asserts this invariant against a multi-kvGroup UnifiedWriter physical file, so no new hypothesis test is introduced in this commit. Segments are therefore carried for observability and to unlock per-segment bounded-read optimizations in a follow-up.
New test exercises the chunk-processor pipeline in global-sort mode (UnifiedWriter) and asserts: MultiFileStats populated, Segments cover both data and index kvGroups, per-segment byte ranges are contiguous, and KVs round-trip through NewKVReader with segment bounds. Verifies the encode pipeline changes from M2 T5+T6 produce the segment format downstream consumers (ingest, Task 8) expect.
Fixes silent data loss: index-kvGroup ingest subtasks returned zero KVs because the engine's read path (readOneFile + getReadRangeFromProps) opened files at offset 0 unbounded, hitting data-kvGroup bytes first (byte-order != key-order: _i < _r but UnifiedWriter writes data before index). When Engine.segments is non-empty, each dataFile/statFile pair is scoped to its segment's byte range via startOffset/endOffset (data) and StatStart/StatEnd (stat). Legacy callers pass nil for the new per-file bound parameters. New test TestEngine_SegmentedIngest_RealTidbKeys exercises real tablecodec row and index keys end-to-end through UnifiedWriter and Engine, asserting the index subtask reads exactly its kvGroup's KVs.
- byteReader.readNBytes: bounded reads now error with ErrUnexpectedEOF when requested size exceeds remaining bytes, instead of silently returning fewer bytes. Prevents corrupted uint64 length prefixes in KV framing. io.Reader Read([]byte) is unaffected (uses next/reload directly). New test TestByteReader_EndOffset_ReadNBytesStrict. - planner.splitForOneSubtask: WriteIngestStepMeta.Segments is now filtered to the subtask's [startKey, endKey). Large imports no longer copy the full cluster-wide segment list into every subtask. - UnifiedWriter.Flush: documented single-goroutine thread-safety contract (already satisfied by the production deliverLoop). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fixes a second silent-corruption variant of the cross-kvGroup bug: the merge-sort subtask read UnifiedWriter files unbounded, producing non-monotonic input to NewMergeKVIter. The merge heap does not detect non-monotonic inputs; output interleaved data-kvGroup and index-kvGroup keys, corrupting sorted output used by downstream ingest. Fix mirrors the ingest-path fix b612740: MergeSortStepMeta carries Segments; generateMergeSortSpecs filters segments per subtask; MergeOverlappingFiles accepts segments; NewMergeKVIter accepts pathsEndOffset; each leaf reader is bound to its kvGroup's byte range. Also fixes: - switchToConcurrentReader clamps prefetch range at endOffset (wastes I/O otherwise — correctness preserved by readNBytes clamp) - engine.segmentBoundsFor lookup miss is now an error when segments are populated (was silently falling through unbounded) - Three stale doc comments that claimed segments are unused New test TestMergeOverlappingFiles_SegmentedInput exercises a real tablecodec-keyed UnifiedWriter file + single-subtask merge against just the index kvGroup; asserts the merged output is exactly the index kvGroup's sorted KVs (not interleaved data bytes). With the fix disabled the test sees 8 KVs (5 data + 3 index) instead of 3. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Nextgen keyspace-prefixed writes need tikv.Codec.EncodeKey on every key; ingest rejects unencoded keys with "missing keyspace_id in key". The old batch Writer applied this at writer.go:497; UnifiedWriter skipped it, causing cluster ingest to fail. Adds UnifiedWriterConfig.TiKVCodec (optional). encode_and_sort_operator now plumbs op.tableImporter.Backend().GetTiKVCodec() through, matching the prior SetTiKVCodec call on WriterBuilder. Encoding happens before buffering so downstream sort, segment bounds, and range properties all see the encoded key. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…VCodec test Fix 1: Add w.pool.Destroy() at the end of UnifiedWriter.Close() to prevent per-worker memory leak of the membuf.Pool. Fix 2: Remove the unused Concurrency field from UnifiedWriterConfig struct and its initialization in NewUnifiedWriter. Downstream code (OneFileWriter) consumes maxUploadWorkersPerThread directly, making this field dead code. Fix 3: Add TestUnifiedWriter_TiKVCodec_EncodesKeysBeforeStorage to verify that TiKVCodec is applied to keys before buffering and writing. This regression test covers the path that was causing nextgen ingest failures due to missing keyspace_id prefix encoding. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dedup happens in UnifiedWriter.flushHalf after per-kvGroup sort, before streaming to OneFileWriter. Lazy-init one dup side-file per kvGroup only when duplicates actually occur. OneFileWriter stays a dumb sorted-stream sink (Option A's one-file-per-flush invariant preserved). Per-kvGroup ConflictInfo (count + dup-file path) surfaces via UnifiedWriter.ConflictInfos(); encode_and_sort_operator plumbs it into per-kvGroup SortedKVMeta so the conflict-resolution step finds dup files — restoring parity with the pre-M2 encode-step semantics. Fast path (OnDuplicateKeyIgnore, current default) is unchanged: every KV written directly, no pivot state, no dup infra. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: ref #N/A
Problem Summary:
To decide where to spend optimization effort on IMPORT INTO global sort (next-gen on AWS), we need two diagnostics that are currently missing:
tools/objstore-perf) already measures this, but the target workers run in a read-only pod where a new binary cannot be deployed. We need a SQL-level entry point.chunk_process_operation_secondshistogram collapses S3 read + decompress + parse into onereadlabel, making it impossible to tell whether the bottleneck is S3 I/O or CPU.What changed and how does it work?
Part A —
ADMIN BENCHMARK S3statement (internal diagnostic):pkg/objstore/perfbenchpackage carved out oftools/objstore-perf/main.go's core; the CLI now calls into this library.ADMIN BENCHMARK S3 READ|WRITE '<url>' [WITH (option = value, ...)].concurrency,duration,file_size,file_count,cleanup_synth(all optional; executor-side defaults).(metric, value)with rows formode,total_bytes,op_count,duration_ms,mb_per_sec,concurrency.Part B — Encode-step fine-grained timers:
tidb_lightning_chunk_process_operation_seconds{operation=...}now emits three labels in place of the previous singlereadlabel:s3_read— raw S3-wait time (when uncompressed; includes decompress CPU when compressed — see limitation below)decompress— always 0 for this iterationparse— derived asreadDur − s3_read − decompressencode,send,write_data,write_index,deliverare unchanged.encode subtask done {"breakdown": "s3_read=... decompress=... parse=... encode=... send=... total=... *_pct=..."}for grepping during perf runs.ReaderTimingsthreaded throughnewLoadDataParser→GetParser→getParserso the encode loop can swap/observe per-batch deltas without racing the parser goroutine.Known limitation (to be addressed in a follow-up): when the source file is compressed, decompression happens inside
objstore.WithCompression.Open, below the wrap point. Thes3_readlabel therefore bundles S3-wait + decompress CPU for compressed sources. Splitting them cleanly requires instrumenting insidepkg/objstore/compress.go, which is deferred to a separate PR.Check List
Tests
Manual test plan (run after deploying to a target cluster):
Side effects
The `ChunkProcessOpRead` constant was removed; the `tidb_lightning_chunk_process_operation_seconds{operation="read"}` series will stop being produced. Dashboards that grouped on this label need to sum `s3_read` + `decompress` + `parse` instead.
Documentation
`ADMIN BENCHMARK S3 READ|WRITE` is a new statement. Internal diagnostic — no user docs planned.
Release note
```release-note
None
```
Deferred before merge:
Summary by CodeRabbit
New Features
ADMIN BENCHMARK S3 READ|WRITESQL statement for benchmarking S3 storage performance with configurable options including concurrency, duration, and file parameters.Chores