feat(loadgen): implement load test harness for messaging workers#117
feat(loadgen): implement load test harness for messaging workers#117
Conversation
Design doc for a capacity-baseline load test for the single-site messaging pipeline (message-gatekeeper → MESSAGES_CANONICAL → message-worker + broadcast-worker). https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
15-task implementation plan for the tools/loadgen load generator and its docker-compose harness. Each task is TDD: failing test → minimal implementation → green → commit. https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
Replace fmt.Sscanf with direct roomIdx parameter, pass Preset by pointer internally to fix hugeParam warnings, convert range loops to indexed form to fix rangeValCopy warnings. Configure gocritic hugeParam sizeThreshold to 128 bytes in .golangci.yml so the exported by-value API stays stable. https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
Rename snapshotLatencies → snapshotLatenciesLocked and move mutex acquisition into E1Samples/E2Samples so the slice header is read only while the lock is held, eliminating the data race spotted by the race detector. Add TestCollector_E2UnknownIgnored, TestCollector_SamplesReturnedSorted, and TestCollector_ConcurrentRecordAndSnapshot to cover the unknown-ID guard, the sort comparator, and concurrent mutation + snapshot under -race. All collector.go functions now at 100% coverage. https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
Covers lastToken, counterValue, counterValueLabeled, writeCSVFile, newNatsCorePublisher, Metrics.Handler, NewConsumerSampler, and Snapshot; raises total unit-test coverage from 44.4% to 51.9%. https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
…dcasts, bad-reply counts - seed.go: create roomId, u.account, and compound indexes on subscriptions after insertion so large-preset queries avoid full collection scans - main.go: fix actualRate==0 in canonical mode by falling back to `sent` (byReqID is never populated in that path) - main.go: add second E2 subscription for chat.user.*.event.room to capture DM broadcasts emitted by broadcast-worker on UserRoomEvent subjects - main.go: count malformed E1 replies as bad_reply metric instead of silently dropping them - generator.go: document SenderDist (Zipf) deferral with an explanatory comment - main.go: document why signal.NotifyContext is used instead of pkg/shutdown.Wait https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR introduces a comprehensive load-testing infrastructure for the messaging pipeline, including a Go-based load generator CLI tool ( Changes
Sequence Diagram(s)sequenceDiagram
participant LoadGen as LoadGen<br/>(Publisher)
participant NATS as NATS<br/>(Core/JetStream)
participant Gatekeeper as Gatekeeper<br/>(E1 Handler)
participant Worker as Broadcast Worker<br/>(E2 Handler)
participant Collector as Collector<br/>(Latency Tracker)
participant Metrics as Prometheus<br/>Metrics
LoadGen->>NATS: Publish SendMessageRequest<br/>(frontdoor inject)
activate NATS
NATS->>Gatekeeper: Route to handler subject
deactivate NATS
Gatekeeper->>LoadGen: RecordPublish(requestID,<br/>messageID, now)
Collector->>Collector: Store in map
Gatekeeper->>NATS: Publish MessageEvent<br/>(E1 reply/gatekeeper ack)
activate NATS
NATS->>Collector: Subscribe to E1 reply subject
deactivate NATS
Collector->>Collector: RecordReply(requestID, now)<br/>Compute E1 latency
Collector->>Metrics: Observe E1Latency histogram
NATS->>Worker: Publish MessageEvent to<br/>canonical stream
activate NATS
NATS->>Worker: JetStream consumer drains
deactivate NATS
Worker->>NATS: Publish RoomEvent<br/>(E2 broadcast)
activate NATS
NATS->>Collector: Subscribe to room event subject
deactivate NATS
Collector->>Collector: RecordBroadcast(messageID, now)<br/>Compute E2 latency
Collector->>Metrics: Observe E2Latency histogram
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (10)
tools/loadgen/README.md (1)
9-26: Nit: specify languages on fenced code blocks (MD040).The three shell-command blocks (lines 9, 17, 24) trigger markdownlint MD040. Tag them as
```bash(or```sh) to silence the warning and get syntax highlighting.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/README.md` around lines 9 - 26, Update the three fenced code blocks in tools/loadgen/README.md that contain the make commands (the blocks with "make -C tools/loadgen/deploy up", "make -C tools/loadgen/deploy run-dashboards ..." and "make -C tools/loadgen/deploy down") to specify a shell language marker by replacing the opening ``` with ```bash (or ```sh) so markdownlint MD040 is satisfied and the blocks get shell syntax highlighting.tools/loadgen/deploy/grafana/dashboards/loadtest.json (1)
38-44: Consumer pending panels lack a stream label in the legend.
loadgen_consumer_pending/loadgen_consumer_ack_pendingare labeled with bothstreamanddurable(seetools/loadgen/metrics.go). If multiple streams are ever scraped,{{durable}}alone can collapse distinct series. Consider{{stream}}/{{durable}}for disambiguation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/deploy/grafana/dashboards/loadtest.json` around lines 38 - 44, The "Consumer pending" and "Consumer ack pending" Grafana panels use legendFormat "{{durable}}" which can collapse series when multiple streams exist; update the targets for metrics loadgen_consumer_pending and loadgen_consumer_ack_pending to use legendFormat "{{stream}}/{{durable}}" (i.e., change the legendFormat field in those panel target objects) so series are disambiguated by stream and durable.tools/loadgen/deploy/Makefile (1)
19-21: Makerun-dashboardsmatch the normal startup path.This target starts containers before validating
PRESET, and it omits--build, so dashboard runs can use stale loadgen/worker images.Proposed fix
run-dashboards: - $(COMPOSE) --profile dashboards up -d + `@test` -n "$(PRESET)" || (echo "PRESET=<name> required" && exit 1) + $(COMPOSE) --profile dashboards up -d --build $(MAKE) run PRESET=$(PRESET) RATE=$(RATE) DURATION=$(DURATION)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/deploy/Makefile` around lines 19 - 21, run-dashboards currently brings up dashboards before validating PRESET and omits image rebuild; change the order to invoke the existing run target (so PRESET/RATE/DURATION validation and any setup in the run target happens first) and then start the dashboards with compose including --build; specifically, call $(MAKE) run PRESET=$(PRESET) RATE=$(RATE) DURATION=$(DURATION) first, then use $(COMPOSE) --profile dashboards up --build -d to start dashboards so you get validation and fresh images (references: Makefile target run-dashboards, variables PRESET/RATE/DURATION, target run, and $(COMPOSE) --profile dashboards up --build -d).tools/loadgen/integration_test.go (2)
101-101: Useloadgen_test(or similar service-scoped) DB name per integration-test guideline.The test connects to
db := client.Database("chat"), but the repo convention is<service>_testfor integration tests to avoid collisions with any locally running service. Rename to e.g.loadgen_test.As per coding guidelines: "Use
<service>_testas database name in integration tests to avoid collisions".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/integration_test.go` at line 101, The test currently sets db := client.Database("chat") which can collide with local services; update the Database call to use the service-scoped test DB name, e.g. db := client.Database("loadgen_test"), so the db variable and any setup/teardown in this integration test use "loadgen_test" instead of "chat".
69-72: Silently-discarded errors onnc.Drain()/sub.Unsubscribe().Per the repo's Go guideline, discarded errors should either be explicitly assigned with a comment or wrapped. The simplest fix is
defer func() { _ = nc.Drain() }()(and similar forUnsubscribe) so the intent is explicit and linters stay quiet.As per coding guidelines: "Never ignore errors silently — comment if intentionally discarded".
Also applies to: 131-132, 151-152
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/integration_test.go` around lines 69 - 72, Several deferred calls are silently discarding errors (nc.Drain() and sub.Unsubscribe()); update each defer to explicitly ignore the error with a deliberate assignment/comment such as wrapping in a short func that does `_ = nc.Drain()` (and `_ = sub.Unsubscribe()` for the subscription), so linters know the discard is intentional. Locate the places where `nc.Drain()` is deferred (after `nats.Connect`/`nc`) and where `sub.Unsubscribe()` is deferred (after subscription creation) and replace the plain `defer nc.Drain()` / `defer sub.Unsubscribe()` with `defer func() { _ = nc.Drain() }()` / `defer func() { _ = sub.Unsubscribe() }()` (or equivalent with a brief comment) to satisfy the repository error-handling guideline.tools/loadgen/consumerlag.go (1)
20-26: Optional: makeSnapshot()unconditionally safe with a small mutex.The current contract that
Snapshot()may only be called afterRun's goroutine has exited is documented, and the existing call site usessync.WaitGroup.Wait()which does establish the necessary happens-before. However, it's easy for a future caller (or a future unit test) to invokeSnapshot()against a still-running sampler and silently produce a data race. Adding async.Mutexguarding the six observation fields would remove the footgun at negligible cost —sampleOncealready has a natural critical section where it mutates min/peak/final.Also applies to: 86-99
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/consumerlag.go` around lines 20 - 26, Add a sync.Mutex (e.g., mu) to the sampler struct and use it to protect the six observation fields (minPending, peakPending, finalPending, peakAckPending, finalRedelivered, hasSample): hold mu while mutating them in sampleOnce and hold mu in Snapshot() while reading them so Snapshot() is safe to call concurrently with Run; leave Run's goroutine coordination (WaitGroup) as-is but ensure the new mu is used around the existing critical section in sampleOnce and around the read in Snapshot().tools/loadgen/report_test.go (1)
116-135: Test name/comment is misleading — this doesn't actually exercise a mid-stream row-write failure.Since
pw.Close()happens on Line 120 beforeWriteCSVis called, every subsequent write on the pipe fails withio.ErrClosedPipe. And becausecsv.Writerbuffers internally, header and rows are not written one-by-one — they're all flushed together, and the error surfaces throughcw.Error()onFlush. So this is effectively a duplicate ofTestWriteCSV_WriterError.If you want a genuine "header succeeds, row fails" path, you'd need a writer that allows the first
Writeto succeed but fails on a later one (e.g., bump the threshold onfailWriteror close the pipe after observing the first flushed byte).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/report_test.go` around lines 116 - 135, The test TestWriteCSV_RowWriteError currently closes the pipe before calling WriteCSV so header and rows all fail together; change it to simulate a mid-stream row write failure by using a writer that allows the first Write to succeed but fails on subsequent writes (e.g., reuse or extend the existing failWriter to succeed once then return an error, or open the pipe and close pw from a goroutine after the header is flushed), then call WriteCSV(rows) and assert an error; ensure you reference the test name TestWriteCSV_RowWriteError and the WriteCSV function when implementing the writer/failure trigger so the header write succeeds while a later row write fails.tools/loadgen/main_test.go (1)
99-109: Tests overlap —TestNewNatsCorePublisher_FieldWiringalready covers the two scenarios asserted separately above.
TestNewNatsCorePublisher_FieldWiringalready verifies both theInjectCanonicalandInjectFrontdoorpaths (includinguseJetStreamflag and nilnc/js). The precedingTestNewNatsCorePublisher_CanonicalSetsUseJetStreamandTestNewNatsCorePublisher_FrontdoorDoesNotSetUseJetStreamtests are redundant subsets. Consider collapsing to a single table-driven test to tighten the suite.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/main_test.go` around lines 99 - 109, The three tests are redundant; remove TestNewNatsCorePublisher_CanonicalSetsUseJetStream and TestNewNatsCorePublisher_FrontdoorDoesNotSetUseJetStream and consolidate their assertions into a single table-driven test (keep or refactor TestNewNatsCorePublisher_FieldWiring) that iterates cases for InjectCanonical and InjectFrontdoor, calls newNatsCorePublisher(nil, <inject>, nil) and asserts p.nc == nil, p.js == nil and p.useJetStream equals the expected value for each case; reference the newNatsCorePublisher constructor and the existing TestNewNatsCorePublisher_FieldWiring test name when making the change.tools/loadgen/preset.go (1)
138-147: DM branch panics iflen(users) < 2.
r.Intn(len(users))panics on zero, andr.Intn(len(users) - 1)panics whenlen(users) == 1. All built-in presets haveUsers >= 10today, but sinceBuildFixturesis exported and the unit tests already exercise synthetic presets, a future preset with only 1 user plusRoomSizeDist: DistMixedwould crash. A one-line guard avoids the footgun:if room.Type == model.RoomTypeDM { if len(users) < 2 { return users // or simply skip — DM rooms require >=2 } // ... existing code }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/preset.go` around lines 138 - 147, In pickMembers, the DM branch can panic when len(users) < 2 due to r.Intn; add a guard at the start of the model.RoomTypeDM branch (e.g., if len(users) < 2) and return early (return users or an empty slice as appropriate for your fixture semantics) before calling r.Intn, keeping the existing selection logic (variables i and j) unchanged otherwise.tools/loadgen/main.go (1)
268-272: Trailing-draintime.Sleepignores context cancellation.
time.Sleep(2 * time.Second)blocks unconditionally — if the user hit SIGINT to abort the run, the CLI still stalls 2s before printing the summary. It also violates the repo's "never usetime.Sleepfor synchronization" rule since this is effectively synchronizing with trailing NATS deliveries. Use a select on a timer +ctx.Done()so Ctrl-C is responsive, and prefer draining the connection (which flushes/processes pending messages) before finalizing.♻️ Proposed fix
- genErr := gen.Run(runCtx) - // Wait up to 2 seconds for trailing replies and broadcasts to arrive. - time.Sleep(2 * time.Second) + genErr := gen.Run(runCtx) + // Wait up to 2 seconds for trailing replies and broadcasts to arrive, + // but respect base-ctx cancellation so Ctrl-C is responsive. + trailer := time.NewTimer(2 * time.Second) + select { + case <-trailer.C: + case <-ctx.Done(): + trailer.Stop() + } collector.DiscardBefore(warmupDeadline)As per coding guidelines: "Never use
time.Sleepfor goroutine synchronization — use proper sync primitives (channels,sync.WaitGroup,sync.Mutex)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/main.go` around lines 268 - 272, Replace the unconditional time.Sleep after gen.Run(runCtx) with a context-aware wait: start a timer (time.After) for 2s and use select to return early on runCtx.Done() so Ctrl-C is responsive; then, before calling collector.DiscardBefore(warmupDeadline) and collector.Finalize(), drain any NATS connection used by the tool (call the connection's Drain/Flush/Close method as appropriate) to let pending messages be processed rather than relying on Sleep; ensure the new logic uses runCtx for cancellation and only falls back to the 2s timer if not cancelled.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/superpowers/specs/2026-04-21-load-test-messaging-workers-design.md`:
- Around line 452-472: The docs claim Prometheus will scrape NATS via
prometheus-nats-exporter or native NATS Prometheus output but the compose and
prometheus config lack that exporter; either add the prometheus-nats-exporter
service to the load test compose stack and wire it into prometheus.yml (so /varz
and /jsz are translated to Prometheus endpoints) or remove/update the "NATS
health" dashboard promise and the sentence referencing prometheus-nats-exporter;
ensure changes reference the prometheus-nats-exporter service name and the "NATS
health" dashboard panel so reviewers can verify consistency.
In `@pkg/subject/subject.go`:
- Around line 180-190: Add unit tests for the three new exported helpers: create
tests in the package test file that call UserResponseWildcard,
RoomEventWildcard, and UserRoomEventWildcard and assert they return exactly
"chat.user.*.response.>", "chat.room.*.event", and "chat.user.*.event.room"
respectively; implement either three small Test... functions or a table-driven
TestWildcards that uses testing.T and t.Fatalf/t.Errorf to fail on mismatches,
referencing the functions by name so the test imports the same package and
verifies the exact string values.
In `@tools/loadgen/deploy/docker-compose.loadtest.yml`:
- Around line 12-16: Add proper Docker healthchecks for the mongodb and nats
services (mirroring the existing cassandra healthcheck pattern) and change all
dependent services (message-gatekeeper, message-worker, broadcast-worker,
loadgen) to use depends_on with condition: service_healthy instead of the
default service_started; ensure the mongodb service block includes a
CMD/healthcheck that verifies Mongo responds (e.g., mongo --eval or similar) and
the nats service block includes a healthcheck that verifies NATS is accepting
connections so the Go services only start once those healthchecks are healthy.
In `@tools/loadgen/deploy/Dockerfile`:
- Around line 13-16: The Dockerfile runs the binary as root; create a non-root
user and switch to it: add steps in the Dockerfile (before ENTRYPOINT) to create
a group/user (e.g., addgroup/adduser or addgroup -S and adduser -S on alpine),
chown the /loadgen binary to that user, and set USER to that non-root username
so ENTRYPOINT ["/loadgen"] runs as the non-root user; ensure permissions allow
execution by that user.
In `@tools/loadgen/generator_test.go`:
- Around line 90-92: Tests in generator_test.go are silently discarding errors
from g.Run(ctx); update each call (e.g., the g.Run(ctx) at the locations noted)
to check the returned error and fail the test if non-nil — for example, replace
the ignored call with an assertion like t.Fatalf or require.NoError(t, err)
after capturing err := g.Run(ctx). Ensure you do this at every occurrence
(including the calls near lines mentioned) so generator failures surface instead
of being hidden.
- Around line 210-235: The test is flaky because it relies on a statistical 10%
mention rate; make it deterministic by forcing mentions in the preset before
creating the generator: after p is returned from BuiltinPreset("realistic") set
the preset's mention probability to 1 (e.g., p.MentionRate = 1.0 or
p.MentionProbability = 1.0 depending on the field name) so
NewGenerator(GeneartorConfig{Preset: &p, ...}) will always inject mentions; keep
the rest of the test unchanged.
In `@tools/loadgen/generator.go`:
- Around line 135-149: In content(), guard against malformed presets by clamping
the computed size to the actual buffer length and validating Users before
calling Intn: after computing size from g.cfg.Preset.ContentBytes.Min/Max, set
size = max(1, min(size, len(g.maxBody))) (reference: function content(),
g.cfg.Preset.ContentBytes, g.maxBody, and size) and only call
g.rng.Intn(g.cfg.Preset.Users) when g.cfg.Preset.Users > 0 (reference:
MentionRate and Users); if Users == 0 but MentionRate triggers, skip mentioning
or handle it deterministically to avoid rng.Intn(0) panics.
In `@tools/loadgen/integration_test.go`:
- Around line 170-180: Replace the fixed time.Sleep(2 * time.Second) with a
bounded poll that repeatedly calls js.Consumer(ctx, canonical.Name, durable) ->
cons.Info(ctx) and checks info.NumPending until it reaches 0 or a deadline is
exceeded; for each durable in the existing loop
("message-worker","broadcast-worker") create a deadline (e.g.
time.Now().Add(15*time.Second)), poll every short interval (e.g. 100ms) and
require.NoError on cons.Info errors, breaking when info.NumPending==0 and
failing with require.False(time.Now().After(deadline), "durable %s still has %d
pending", durable, info.NumPending) if the deadline passes.
In `@tools/loadgen/main.go`:
- Around line 265-304: The measured elapsed time uses the configured duration
instead of actual wall-clock runtime, so when runCtx is cancelled early (runCtx,
gen.Run) the computed measured := *duration - *warmup underreports rate; fix by
recording the generator start time (e.g., before calling gen.Run), after gen.Run
returns compute elapsed := time.Since(start) and compute measured := elapsed -
*warmup (clamped to >=0) before using it in actualRate calculation, leaving the
InjectCanonical switch and actualRate assignment logic unchanged.
- Around line 159-168: The NATS connection opened by natsutil.Connect is not
cleaned up if jetstream.New or subsequent Subscribe calls fail; update runRun to
guarantee cleanup by draining/closing nc and shutting down metricsSrv on all
error paths — either call nc.Drain() (and nc.Close() if needed) and stop
metricsSrv immediately before returning on jetstream.New error and any Subscribe
failures, or defer an unconditional cleanup (e.g., defer func(){ if nc!=nil {
nc.Drain(); nc.Close() } ; if metricsSrv!=nil { metricsSrv.Shutdown(ctx) } })
immediately after a successful natsutil.Connect so nc and metricsSrv are always
cleaned up even on early returns; reference natsutil.Connect, jetstream.New,
nc.Drain (and Subscribe calls) when applying the change.
In `@tools/loadgen/seed.go`:
- Around line 38-45: The Seed function returns bare errors from insertDocs for
each collection; wrap each returned error with operation-level context using
fmt.Errorf so callers know which fixture phase failed. Specifically, change the
three returns that propagate errors from insertDocs(ctx, db.Collection("users"),
f.Users), insertDocs(ctx, db.Collection("rooms"), f.Rooms), and insertDocs(ctx,
db.Collection("subscriptions"), f.Subscriptions) to return fmt.Errorf("seed
users: %w", err), fmt.Errorf("seed rooms: %w", err), and fmt.Errorf("seed
subscriptions: %w", err) respectively (or similar descriptive messages) to
include collection/phase context.
---
Nitpick comments:
In `@tools/loadgen/consumerlag.go`:
- Around line 20-26: Add a sync.Mutex (e.g., mu) to the sampler struct and use
it to protect the six observation fields (minPending, peakPending, finalPending,
peakAckPending, finalRedelivered, hasSample): hold mu while mutating them in
sampleOnce and hold mu in Snapshot() while reading them so Snapshot() is safe to
call concurrently with Run; leave Run's goroutine coordination (WaitGroup) as-is
but ensure the new mu is used around the existing critical section in sampleOnce
and around the read in Snapshot().
In `@tools/loadgen/deploy/grafana/dashboards/loadtest.json`:
- Around line 38-44: The "Consumer pending" and "Consumer ack pending" Grafana
panels use legendFormat "{{durable}}" which can collapse series when multiple
streams exist; update the targets for metrics loadgen_consumer_pending and
loadgen_consumer_ack_pending to use legendFormat "{{stream}}/{{durable}}" (i.e.,
change the legendFormat field in those panel target objects) so series are
disambiguated by stream and durable.
In `@tools/loadgen/deploy/Makefile`:
- Around line 19-21: run-dashboards currently brings up dashboards before
validating PRESET and omits image rebuild; change the order to invoke the
existing run target (so PRESET/RATE/DURATION validation and any setup in the run
target happens first) and then start the dashboards with compose including
--build; specifically, call $(MAKE) run PRESET=$(PRESET) RATE=$(RATE)
DURATION=$(DURATION) first, then use $(COMPOSE) --profile dashboards up --build
-d to start dashboards so you get validation and fresh images (references:
Makefile target run-dashboards, variables PRESET/RATE/DURATION, target run, and
$(COMPOSE) --profile dashboards up --build -d).
In `@tools/loadgen/integration_test.go`:
- Line 101: The test currently sets db := client.Database("chat") which can
collide with local services; update the Database call to use the service-scoped
test DB name, e.g. db := client.Database("loadgen_test"), so the db variable and
any setup/teardown in this integration test use "loadgen_test" instead of
"chat".
- Around line 69-72: Several deferred calls are silently discarding errors
(nc.Drain() and sub.Unsubscribe()); update each defer to explicitly ignore the
error with a deliberate assignment/comment such as wrapping in a short func that
does `_ = nc.Drain()` (and `_ = sub.Unsubscribe()` for the subscription), so
linters know the discard is intentional. Locate the places where `nc.Drain()` is
deferred (after `nats.Connect`/`nc`) and where `sub.Unsubscribe()` is deferred
(after subscription creation) and replace the plain `defer nc.Drain()` / `defer
sub.Unsubscribe()` with `defer func() { _ = nc.Drain() }()` / `defer func() { _
= sub.Unsubscribe() }()` (or equivalent with a brief comment) to satisfy the
repository error-handling guideline.
In `@tools/loadgen/main_test.go`:
- Around line 99-109: The three tests are redundant; remove
TestNewNatsCorePublisher_CanonicalSetsUseJetStream and
TestNewNatsCorePublisher_FrontdoorDoesNotSetUseJetStream and consolidate their
assertions into a single table-driven test (keep or refactor
TestNewNatsCorePublisher_FieldWiring) that iterates cases for InjectCanonical
and InjectFrontdoor, calls newNatsCorePublisher(nil, <inject>, nil) and asserts
p.nc == nil, p.js == nil and p.useJetStream equals the expected value for each
case; reference the newNatsCorePublisher constructor and the existing
TestNewNatsCorePublisher_FieldWiring test name when making the change.
In `@tools/loadgen/main.go`:
- Around line 268-272: Replace the unconditional time.Sleep after
gen.Run(runCtx) with a context-aware wait: start a timer (time.After) for 2s and
use select to return early on runCtx.Done() so Ctrl-C is responsive; then,
before calling collector.DiscardBefore(warmupDeadline) and collector.Finalize(),
drain any NATS connection used by the tool (call the connection's
Drain/Flush/Close method as appropriate) to let pending messages be processed
rather than relying on Sleep; ensure the new logic uses runCtx for cancellation
and only falls back to the 2s timer if not cancelled.
In `@tools/loadgen/preset.go`:
- Around line 138-147: In pickMembers, the DM branch can panic when len(users) <
2 due to r.Intn; add a guard at the start of the model.RoomTypeDM branch (e.g.,
if len(users) < 2) and return early (return users or an empty slice as
appropriate for your fixture semantics) before calling r.Intn, keeping the
existing selection logic (variables i and j) unchanged otherwise.
In `@tools/loadgen/README.md`:
- Around line 9-26: Update the three fenced code blocks in
tools/loadgen/README.md that contain the make commands (the blocks with "make -C
tools/loadgen/deploy up", "make -C tools/loadgen/deploy run-dashboards ..." and
"make -C tools/loadgen/deploy down") to specify a shell language marker by
replacing the opening ``` with ```bash (or ```sh) so markdownlint MD040 is
satisfied and the blocks get shell syntax highlighting.
In `@tools/loadgen/report_test.go`:
- Around line 116-135: The test TestWriteCSV_RowWriteError currently closes the
pipe before calling WriteCSV so header and rows all fail together; change it to
simulate a mid-stream row write failure by using a writer that allows the first
Write to succeed but fails on subsequent writes (e.g., reuse or extend the
existing failWriter to succeed once then return an error, or open the pipe and
close pw from a goroutine after the header is flushed), then call WriteCSV(rows)
and assert an error; ensure you reference the test name
TestWriteCSV_RowWriteError and the WriteCSV function when implementing the
writer/failure trigger so the header write succeeds while a later row write
fails.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b7edc3aa-6b7d-4f7e-9220-be26f13841ae
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (27)
docs/superpowers/plans/2026-04-21-load-test-messaging-workers.mddocs/superpowers/specs/2026-04-21-load-test-messaging-workers-design.mdgo.modpkg/subject/subject.gotools/loadgen/README.mdtools/loadgen/collector.gotools/loadgen/collector_test.gotools/loadgen/consumerlag.gotools/loadgen/consumerlag_test.gotools/loadgen/deploy/Dockerfiletools/loadgen/deploy/Makefiletools/loadgen/deploy/docker-compose.loadtest.ymltools/loadgen/deploy/grafana/dashboards/loadtest.jsontools/loadgen/deploy/grafana/provisioning/dashboards/loadtest.yamltools/loadgen/deploy/grafana/provisioning/datasources/prometheus.yamltools/loadgen/deploy/prometheus/prometheus.ymltools/loadgen/generator.gotools/loadgen/generator_test.gotools/loadgen/integration_test.gotools/loadgen/main.gotools/loadgen/main_test.gotools/loadgen/metrics.gotools/loadgen/preset.gotools/loadgen/preset_test.gotools/loadgen/report.gotools/loadgen/report_test.gotools/loadgen/seed.go
| ### Grafana dashboard (opt-in) | ||
|
|
||
| Activated with `docker compose --profile dashboards up` (or | ||
| `make run-dashboards`). Prometheus is provisioned to scrape: | ||
|
|
||
| - The loadgen's `/metrics` endpoint. | ||
| - The NATS server's monitoring endpoint (`/varz` and `/jsz`) via the | ||
| community `prometheus-nats-exporter`, or directly via NATS's own | ||
| Prometheus output if configured. | ||
|
|
||
| A pre-baked dashboard JSON at | ||
| `tools/loadgen/deploy/grafana/dashboards/loadtest.json` is | ||
| provisioned via Grafana's file provisioner and includes these panels: | ||
|
|
||
| 1. **Throughput** — `rate(loadgen_published_total[10s])` vs target rate. | ||
| 2. **E1 gatekeeper ack latency** — P50/P95/P99 histogram quantiles over time. | ||
| 3. **E2 broadcast latency** — P50/P95/P99 histogram quantiles over time. | ||
| 4. **Consumer pending** — `loadgen_consumer_pending` stacked by durable. | ||
| 5. **Ack pending** — `loadgen_consumer_ack_pending` by durable. | ||
| 6. **Error rate** — `rate(loadgen_publish_errors_total[10s])` by reason. | ||
| 7. **NATS health** — connections, slow consumers, JetStream bytes. |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether the loadgen deploy stack contains NATS Prometheus exporter wiring.
rg -n -C3 'prometheus-nats-exporter|nats-exporter|/varz|/jsz|8222|nats.*metrics|metrics.*nats' tools/loadgen/deploy docs/superpowers/specs/2026-04-21-load-test-messaging-workers-design.mdRepository: hmchangw/chat
Length of output: 2888
🏁 Script executed:
cat tools/loadgen/deploy/docker-compose.loadtest.ymlRepository: hmchangw/chat
Length of output: 3168
Remove or update the NATS metrics promise in the dashboard section, or add the missing prometheus-nats-exporter service to the compose stack.
The spec promises Prometheus scraping of NATS monitoring endpoints (/varz, /jsz) via prometheus-nats-exporter or native NATS Prometheus output, but the docker-compose.loadtest.yml does not include either. The prometheus.yml config explicitly comments that "NATS monitoring on :8222 serves JSON — not Prometheus" and notes to "Add prometheus-nats-exporter as a sidecar if NATS metrics are needed." Either add the exporter service to the compose stack or remove the NATS-health dashboard promise from this section.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/superpowers/specs/2026-04-21-load-test-messaging-workers-design.md`
around lines 452 - 472, The docs claim Prometheus will scrape NATS via
prometheus-nats-exporter or native NATS Prometheus output but the compose and
prometheus config lack that exporter; either add the prometheus-nats-exporter
service to the load test compose stack and wire it into prometheus.yml (so /varz
and /jsz are translated to Prometheus endpoints) or remove/update the "NATS
health" dashboard promise and the sentence referencing prometheus-nats-exporter;
ensure changes reference the prometheus-nats-exporter service name and the "NATS
health" dashboard panel so reviewers can verify consistency.
| func UserResponseWildcard() string { | ||
| return "chat.user.*.response.>" | ||
| } | ||
|
|
||
| func RoomEventWildcard() string { | ||
| return "chat.room.*.event" | ||
| } | ||
|
|
||
| func UserRoomEventWildcard() string { | ||
| return "chat.user.*.event.room" | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add unit tests for the new exported helpers.
As per coding guidelines: "Every exported function in pkg/ must have corresponding test cases." Please add tests for UserResponseWildcard, RoomEventWildcard, and UserRoomEventWildcard in pkg/subject/subject_test.go asserting the exact returned strings.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/subject/subject.go` around lines 180 - 190, Add unit tests for the three
new exported helpers: create tests in the package test file that call
UserResponseWildcard, RoomEventWildcard, and UserRoomEventWildcard and assert
they return exactly "chat.user.*.response.>", "chat.room.*.event", and
"chat.user.*.event.room" respectively; implement either three small Test...
functions or a table-driven TestWildcards that uses testing.T and
t.Fatalf/t.Errorf to fail on mismatches, referencing the functions by name so
the test imports the same package and verifies the exact string values.
| mongodb: | ||
| image: mongo:8 | ||
| ports: | ||
| - "27017:27017" | ||
| networks: [loadtest] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect readiness gating in the loadgen compose file.
python - <<'PY'
from pathlib import Path
p = Path("tools/loadgen/deploy/docker-compose.loadtest.yml")
text = p.read_text()
for svc in ("nats", "mongodb"):
start = text.index(f" {svc}:")
following = text.find("\n ", start + 1)
next_following = text.find("\n ", following + 1) if following != -1 else -1
block_end = next_following if next_following != -1 else len(text)
block = text[start:block_end]
print(f"{svc} has healthcheck:", "healthcheck:" in block)
print("service_healthy references:")
for i, line in enumerate(text.splitlines(), 1):
if "service_healthy" in line:
print(f"{i}: {line}")
PYRepository: hmchangw/chat
Length of output: 182
🏁 Script executed:
cat -n tools/loadgen/deploy/docker-compose.loadtest.ymlRepository: hmchangw/chat
Length of output: 4064
🏁 Script executed:
# Check if any main.go implements fail-fast on connection errors
# Look at message-gatekeeper as an example
find . -path "*/message-gatekeeper/main.go" -type f | head -1 | xargs -r head -80Repository: hmchangw/chat
Length of output: 2219
🌐 Web query:
NATS Docker healthcheck /healthz endpoint js-enabled-only
💡 Result:
The NATS server provides a /healthz HTTP endpoint on the monitoring port (default 8222) for health checks, commonly used in Docker containers. A plain GET request to http://localhost:8222/healthz returns HTTP 200 with {"status": "ok"} if the server can accept client connections on port 4222, or 503 if unhealthy. The parameter ?js-enabled-only=true makes the endpoint return 503 if JetStream is not enabled or has been disabled (e.g., due to insufficient resources), exiting early without further checks. This is useful for JetStream deployments to verify the subsystem is operational without validating all streams/consumers. Other parameters include ?js-server-only=true (skips asset checks, verifies server/meta leader sync) and checks for specific streams/accounts. In Docker, enable monitoring with -m 8222 or http_port: 8222 in config. Official nats Docker image exposes 8222. Common Docker healthcheck: healthcheck: test: ["CMD", "wget", "-qO-", "http://localhost:8222/healthz"] interval: 10s timeout: 5s retries: 3 For JetStream-only verification, use /healthz?js-enabled-only=true in the test command. Official Docker image lacks built-in HEALTHCHECK; users add it in docker-compose or Kubernetes manifests (e.g., nats-io/k8s Helm charts use variants like /healthz?js-server-only=true for probes). No JavaScript requirement; endpoints are plain HTTP/JSON.
Citations:
- 1: https://mintlify.com/nats-io/nats-server/operations/monitoring-endpoints
- 2: https://docs.nats.io/running-a-nats-service/nats_admin/monitoring
- 3: Fix /healthz?js-enabled=true behavior nats-io/nats-server#3704
- 4: https://oneuptime.com/blog/post/2026-02-02-nats-docker/view
- 5: Fix
/healthzcheck forjs-enabled=truenats-io/nats-server#3703 - 6: Added param options to /healthz endpoint nats-io/nats-server#3326
- 7: Startup probe should use js-server-only=true to prevent client stream/consumer issues from poisoning cluster health nats-io/k8s#1140
🌐 Web query:
MongoDB Docker healthcheck mongosh ping command syntax
💡 Result:
The standard syntax for a MongoDB Docker healthcheck using mongosh and the ping command is: In Docker Compose: healthcheck: test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping').ok", "--quiet"] interval: 10s timeout: 5s retries: 5 start_period: 40s In Dockerfile: HEALTHCHECK --interval=10s --timeout=5s --start-period=30s --retries=5 \ CMD mongosh --eval "db.adminCommand('ping').ok" --quiet || exit 1 For authenticated setups, include credentials: test: ["CMD", "mongosh", "--username", "admin", "--password", "secret", "--authenticationDatabase", "admin", "--eval", "db.adminCommand('ping').ok", "--quiet"] The db.adminCommand('ping').ok expression returns 1 if healthy (exit 0), else fails (exit 1). Use --quiet to suppress output. The ping command is a lightweight no-op test confirmed by official MongoDB docs. This is the community standard for MongoDB 6.0+ Docker images where mongosh replaces mongo shell. Official MongoDB Docker image does not include built-in HEALTHCHECK; users must add it.
Citations:
- 1: https://oneuptime.com/blog/post/2026-03-31-mongodb-docker-health-checks/view
- 2: https://stackoverflow.com/questions/54384042/why-does-the-docker-compose-healthcheck-of-my-mongo-container-always-fail
- 3: https://stackoverflow.com/questions/37839365/simple-http-tcp-health-check-for-mongodb
- 4: https://cms.nucleusnetwork.com/urban-beat/mongodb-docker-image-implementing-a-health-check-1764815265
- 5: https://www.mongodb.com/docs/v6.2/reference/command/ping/
Add healthchecks to NATS and MongoDB; gate dependent services on service_healthy instead of container startup.
The Go services in this compose file (message-gatekeeper, message-worker, broadcast-worker, loadgen) implement fail-fast startup—they exit immediately if NATS or MongoDB connections fail. Using depends_on: [nats, mongodb] (which defaults to service_started) or condition: service_started allows containers to start before they accept connections, causing make up to fail unpredictably.
Add healthchecks to NATS and MongoDB and update all dependent services to use condition: service_healthy. Cassandra already follows this pattern in the same file (lines 25–35), establishing the precedent.
Proposed fix
nats:
image: nats:2.11-alpine
command: ["-js", "-m", "8222"]
ports:
- "4222:4222"
- "8222:8222"
networks: [loadtest]
+ healthcheck:
+ test: ["CMD", "wget", "-qO-", "http://localhost:8222/healthz?js-enabled-only=true"]
+ interval: 5s
+ timeout: 3s
+ retries: 20
mongodb:
image: mongo:8
ports:
- "27017:27017"
networks: [loadtest]
+ healthcheck:
+ test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok"]
+ interval: 5s
+ timeout: 3s
+ retries: 20
cassandra:
image: cassandra:4.1
environment:
- CASSANDRA_CLUSTER_NAME=loadtest
ports:
- "9042:9042"
networks: [loadtest]
healthcheck:
test: ["CMD-SHELL", "nodetool status | grep -q '^UN'"]
interval: 10s
timeout: 5s
retries: 30
cassandra-init:
image: cassandra:4.1
depends_on:
cassandra:
condition: service_healthy
entrypoint:
- sh
- -c
- |
cqlsh cassandra -e "CREATE KEYSPACE IF NOT EXISTS chat WITH replication = {'class':'SimpleStrategy','replication_factor':1};"
networks: [loadtest]
restart: "no"
message-gatekeeper:
build:
context: ../../..
dockerfile: message-gatekeeper/deploy/Dockerfile
environment:
- NATS_URL=nats://nats:4222
- SITE_ID=site-local
- MONGO_URI=mongodb://mongodb:27017
- MONGO_DB=chat
- depends_on: [nats, mongodb]
+ depends_on:
+ nats:
+ condition: service_healthy
+ mongodb:
+ condition: service_healthy
networks: [loadtest]
message-worker:
build:
context: ../../..
dockerfile: message-worker/deploy/Dockerfile
environment:
- NATS_URL=nats://nats:4222
- SITE_ID=site-local
- MONGO_URI=mongodb://mongodb:27017
- MONGO_DB=chat
- CASSANDRA_HOSTS=cassandra
- CASSANDRA_KEYSPACE=chat
depends_on:
nats:
- condition: service_started
+ condition: service_healthy
mongodb:
- condition: service_started
+ condition: service_healthy
cassandra-init:
condition: service_completed_successfully
networks: [loadtest]
broadcast-worker:
build:
context: ../../..
dockerfile: broadcast-worker/deploy/Dockerfile
environment:
- NATS_URL=nats://nats:4222
- SITE_ID=site-local
- MONGO_URI=mongodb://mongodb:27017
- MONGO_DB=chat
- depends_on: [nats, mongodb]
+ depends_on:
+ nats:
+ condition: service_healthy
+ mongodb:
+ condition: service_healthy
networks: [loadtest]
loadgen:
build:
context: ../../..
dockerfile: tools/loadgen/deploy/Dockerfile
environment:
- NATS_URL=nats://nats:4222
- SITE_ID=site-local
- MONGO_URI=mongodb://mongodb:27017
- MONGO_DB=chat
- METRICS_ADDR=:9099
ports:
- "9099:9099"
- depends_on: [nats, mongodb, message-gatekeeper, message-worker, broadcast-worker]
+ depends_on:
+ nats:
+ condition: service_healthy
+ mongodb:
+ condition: service_healthy
+ message-gatekeeper:
+ condition: service_started
+ message-worker:
+ condition: service_started
+ broadcast-worker:
+ condition: service_started
entrypoint: ["sleep", "infinity"]
networks: [loadtest]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mongodb: | |
| image: mongo:8 | |
| ports: | |
| - "27017:27017" | |
| networks: [loadtest] | |
| mongodb: | |
| image: mongo:8 | |
| ports: | |
| - "27017:27017" | |
| networks: [loadtest] | |
| healthcheck: | |
| test: ["CMD", "mongosh", "--quiet", "--eval", "db.adminCommand('ping').ok"] | |
| interval: 5s | |
| timeout: 3s | |
| retries: 20 |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/deploy/docker-compose.loadtest.yml` around lines 12 - 16, Add
proper Docker healthchecks for the mongodb and nats services (mirroring the
existing cassandra healthcheck pattern) and change all dependent services
(message-gatekeeper, message-worker, broadcast-worker, loadgen) to use
depends_on with condition: service_healthy instead of the default
service_started; ensure the mongodb service block includes a CMD/healthcheck
that verifies Mongo responds (e.g., mongo --eval or similar) and the nats
service block includes a healthcheck that verifies NATS is accepting connections
so the Go services only start once those healthchecks are healthy.
| FROM alpine:3.21 | ||
| RUN apk add --no-cache ca-certificates | ||
| COPY --from=builder /loadgen /loadgen | ||
| ENTRYPOINT ["/loadgen"] |
There was a problem hiding this comment.
Run container as a non-root user.
Trivy DS-0002: image currently runs as root. For a load-test tool this is low severity, but a single-line fix hardens the image.
Proposed fix
FROM alpine:3.21
RUN apk add --no-cache ca-certificates
-COPY --from=builder /loadgen /loadgen
+RUN adduser -D -u 10001 loadgen
+COPY --from=builder /loadgen /loadgen
+USER loadgen
ENTRYPOINT ["/loadgen"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/deploy/Dockerfile` around lines 13 - 16, The Dockerfile runs
the binary as root; create a non-root user and switch to it: add steps in the
Dockerfile (before ENTRYPOINT) to create a group/user (e.g., addgroup/adduser or
addgroup -S and adduser -S on alpine), chown the /loadgen binary to that user,
and set USER to that non-root username so ENTRYPOINT ["/loadgen"] runs as the
non-root user; ensure permissions allow execution by that user.
| ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond) | ||
| defer cancel() | ||
| _ = g.Run(ctx) |
There was a problem hiding this comment.
Check Run errors in these tests.
These calls currently discard g.Run(ctx) errors, so a generator failure can be hidden behind later assertions.
Proposed fix pattern
- _ = g.Run(ctx)
+ require.NoError(t, g.Run(ctx))As per coding guidelines, “Never ignore errors silently — comment if intentionally discarded”.
Also applies to: 113-115, 141-143, 186-188, 222-224, 249-251
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/generator_test.go` around lines 90 - 92, Tests in
generator_test.go are silently discarding errors from g.Run(ctx); update each
call (e.g., the g.Run(ctx) at the locations noted) to check the returned error
and fail the test if non-nil — for example, replace the ignored call with an
assertion like t.Fatalf or require.NoError(t, err) after capturing err :=
g.Run(ctx). Ensure you do this at every occurrence (including the calls near
lines mentioned) so generator failures surface instead of being hidden.
| func (g *Generator) content() string { | ||
| r := g.cfg.Preset.ContentBytes | ||
| size := r.Min | ||
| if r.Max > r.Min { | ||
| size = r.Min + g.rng.Intn(r.Max-r.Min+1) | ||
| } | ||
| if size <= 0 { | ||
| size = 1 | ||
| } | ||
| body := g.maxBody[:size] | ||
| if g.cfg.Preset.MentionRate > 0 && g.rng.Float64() < g.cfg.Preset.MentionRate { | ||
| target := g.rng.Intn(g.cfg.Preset.Users) | ||
| body = fmt.Sprintf("@user-%d %s", target, body) | ||
| } | ||
| return body |
There was a problem hiding this comment.
Potential panic on pathological preset configurations.
Two edge cases can cause content() to panic:
g.maxBody[:size]—maxBodylength is clamped toPreset.ContentBytes.Max(or 1). But ifPreset.ContentBytes.Min > Preset.ContentBytes.Max(orMax <= 0whileMin > 1), thensizestarts atMin, is not reduced by ther.Max > r.Minbranch, and exceedslen(maxBody)→ slice-out-of-bounds panic.g.rng.Intn(g.cfg.Preset.Users)panics whenUsers == 0andMentionRate > 0.
Both are preset-misconfiguration hazards rather than expected inputs, but since presets are externally named and may grow, a defensive clamp is cheap.
🛡️ Proposed fix
func (g *Generator) content() string {
r := g.cfg.Preset.ContentBytes
size := r.Min
if r.Max > r.Min {
size = r.Min + g.rng.Intn(r.Max-r.Min+1)
}
if size <= 0 {
size = 1
}
+ if size > len(g.maxBody) {
+ size = len(g.maxBody)
+ }
body := g.maxBody[:size]
- if g.cfg.Preset.MentionRate > 0 && g.rng.Float64() < g.cfg.Preset.MentionRate {
+ if g.cfg.Preset.Users > 0 && g.cfg.Preset.MentionRate > 0 && g.rng.Float64() < g.cfg.Preset.MentionRate {
target := g.rng.Intn(g.cfg.Preset.Users)
body = fmt.Sprintf("@user-%d %s", target, body)
}
return body
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (g *Generator) content() string { | |
| r := g.cfg.Preset.ContentBytes | |
| size := r.Min | |
| if r.Max > r.Min { | |
| size = r.Min + g.rng.Intn(r.Max-r.Min+1) | |
| } | |
| if size <= 0 { | |
| size = 1 | |
| } | |
| body := g.maxBody[:size] | |
| if g.cfg.Preset.MentionRate > 0 && g.rng.Float64() < g.cfg.Preset.MentionRate { | |
| target := g.rng.Intn(g.cfg.Preset.Users) | |
| body = fmt.Sprintf("@user-%d %s", target, body) | |
| } | |
| return body | |
| func (g *Generator) content() string { | |
| r := g.cfg.Preset.ContentBytes | |
| size := r.Min | |
| if r.Max > r.Min { | |
| size = r.Min + g.rng.Intn(r.Max-r.Min+1) | |
| } | |
| if size <= 0 { | |
| size = 1 | |
| } | |
| if size > len(g.maxBody) { | |
| size = len(g.maxBody) | |
| } | |
| body := g.maxBody[:size] | |
| if g.cfg.Preset.Users > 0 && g.cfg.Preset.MentionRate > 0 && g.rng.Float64() < g.cfg.Preset.MentionRate { | |
| target := g.rng.Intn(g.cfg.Preset.Users) | |
| body = fmt.Sprintf("@user-%d %s", target, body) | |
| } | |
| return body | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/generator.go` around lines 135 - 149, In content(), guard
against malformed presets by clamping the computed size to the actual buffer
length and validating Users before calling Intn: after computing size from
g.cfg.Preset.ContentBytes.Min/Max, set size = max(1, min(size, len(g.maxBody)))
(reference: function content(), g.cfg.Preset.ContentBytes, g.maxBody, and size)
and only call g.rng.Intn(g.cfg.Preset.Users) when g.cfg.Preset.Users > 0
(reference: MentionRate and Users); if Users == 0 but MentionRate triggers, skip
mentioning or handle it deterministically to avoid rng.Intn(0) panics.
| // Allow trailing events to flow. | ||
| time.Sleep(2 * time.Second) | ||
|
|
||
| // Assert the canonical stream drained. | ||
| for _, durable := range []string{"message-worker", "broadcast-worker"} { | ||
| cons, err := js.Consumer(ctx, canonical.Name, durable) | ||
| require.NoError(t, err) | ||
| info, err := cons.Info(ctx) | ||
| require.NoError(t, err) | ||
| require.Equal(t, uint64(0), info.NumPending, "durable %s still has pending", durable) | ||
| } |
There was a problem hiding this comment.
Replace sleep-based wait with a bounded poll on NumPending.
Using time.Sleep(2 * time.Second) before asserting drain violates the repo guideline against sleeping for synchronization and makes this test either slow (when 2s is excessive) or flaky (when 2s is too short under load). Poll cons.Info(ctx).NumPending with a short interval and a generous overall deadline instead — the assertion on Line 179 then just confirms the poll terminated on success, not on timeout.
♻️ Suggested approach
deadline := time.Now().Add(15 * time.Second)
for _, durable := range []string{"message-worker", "broadcast-worker"} {
cons, err := js.Consumer(ctx, canonical.Name, durable)
require.NoError(t, err)
for {
info, err := cons.Info(ctx)
require.NoError(t, err)
if info.NumPending == 0 {
break
}
require.False(t, time.Now().After(deadline),
"durable %s still has %d pending", durable, info.NumPending)
time.Sleep(100 * time.Millisecond)
}
}As per coding guidelines: "Never use time.Sleep for goroutine synchronization — use proper sync primitives".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/integration_test.go` around lines 170 - 180, Replace the fixed
time.Sleep(2 * time.Second) with a bounded poll that repeatedly calls
js.Consumer(ctx, canonical.Name, durable) -> cons.Info(ctx) and checks
info.NumPending until it reaches 0 or a deadline is exceeded; for each durable
in the existing loop ("message-worker","broadcast-worker") create a deadline
(e.g. time.Now().Add(15*time.Second)), poll every short interval (e.g. 100ms)
and require.NoError on cons.Info errors, breaking when info.NumPending==0 and
failing with require.False(time.Now().After(deadline), "durable %s still has %d
pending", durable, info.NumPending) if the deadline passes.
| nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) | ||
| if err != nil { | ||
| slog.Error("nats connect", "error", err) | ||
| return 1 | ||
| } | ||
| js, err := jetstream.New(nc.NatsConn()) | ||
| if err != nil { | ||
| slog.Error("jetstream init", "error", err) | ||
| return 1 | ||
| } |
There was a problem hiding this comment.
NATS connection leak on jetstream.New failure.
If jetstream.New returns an error at line 164, runRun returns 1 without draining or closing nc, leaking the connection and associated goroutines. Add a cleanup on this error path (or defer an unconditional nc.Drain() right after the successful connect).
🛠️ Proposed fix
nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile)
if err != nil {
slog.Error("nats connect", "error", err)
return 1
}
js, err := jetstream.New(nc.NatsConn())
if err != nil {
slog.Error("jetstream init", "error", err)
+ _ = nc.Drain()
return 1
}Similarly, if any of the three Subscribe calls (lines 186, 219, 229) fail, the function returns without draining nc or shutting down metricsSrv — consider restructuring the error paths so cleanup is guaranteed.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) | |
| if err != nil { | |
| slog.Error("nats connect", "error", err) | |
| return 1 | |
| } | |
| js, err := jetstream.New(nc.NatsConn()) | |
| if err != nil { | |
| slog.Error("jetstream init", "error", err) | |
| return 1 | |
| } | |
| nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) | |
| if err != nil { | |
| slog.Error("nats connect", "error", err) | |
| return 1 | |
| } | |
| js, err := jetstream.New(nc.NatsConn()) | |
| if err != nil { | |
| slog.Error("jetstream init", "error", err) | |
| _ = nc.Drain() | |
| return 1 | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/main.go` around lines 159 - 168, The NATS connection opened by
natsutil.Connect is not cleaned up if jetstream.New or subsequent Subscribe
calls fail; update runRun to guarantee cleanup by draining/closing nc and
shutting down metricsSrv on all error paths — either call nc.Drain() (and
nc.Close() if needed) and stop metricsSrv immediately before returning on
jetstream.New error and any Subscribe failures, or defer an unconditional
cleanup (e.g., defer func(){ if nc!=nil { nc.Drain(); nc.Close() } ; if
metricsSrv!=nil { metricsSrv.Shutdown(ctx) } }) immediately after a successful
natsutil.Connect so nc and metricsSrv are always cleaned up even on early
returns; reference natsutil.Connect, jetstream.New, nc.Drain (and Subscribe
calls) when applying the change.
| runCtx, cancelRun := context.WithTimeout(ctx, *duration) | ||
| defer cancelRun() | ||
| warmupDeadline := time.Now().Add(*warmup) | ||
| genErr := gen.Run(runCtx) | ||
| // Wait up to 2 seconds for trailing replies and broadcasts to arrive. | ||
| time.Sleep(2 * time.Second) | ||
| collector.DiscardBefore(warmupDeadline) | ||
| missingReplies, missingBroadcasts := collector.Finalize() | ||
|
|
||
| cancelSamplers() | ||
| samplerWG.Wait() | ||
|
|
||
| shutCtx, cancelShut := context.WithTimeout(context.Background(), 5*time.Second) | ||
| _ = metricsSrv.Shutdown(shutCtx) | ||
| cancelShut() | ||
| _ = nc.Drain() | ||
|
|
||
| if genErr != nil { | ||
| slog.Error("generator error", "error", genErr) | ||
| } | ||
|
|
||
| mfs, gerr := metrics.Registry.Gather() | ||
| if gerr != nil { | ||
| slog.Warn("metrics gather", "error", gerr) | ||
| mfs = nil | ||
| } | ||
| publishErrs := gatheredCounterValue(mfs, "loadgen_publish_errors_total", "", "") | ||
| gkErrs := gatheredCounterValue(mfs, "loadgen_publish_errors_total", "reason", "gatekeeper") | ||
| sent := int(gatheredCounterValue(mfs, "loadgen_published_total", "preset", p.Name)) | ||
| measured := *duration - *warmup | ||
| actualRate := 0.0 | ||
| if measured > 0 { | ||
| // In canonical mode, byReqID is never populated, so E1Count/missingReplies | ||
| // are both 0. Fall back to `sent` to compute the true publish rate. | ||
| switch injectMode { | ||
| case InjectCanonical: | ||
| actualRate = float64(sent) / measured.Seconds() | ||
| default: | ||
| actualRate = float64(collector.E1Count()+missingReplies) / measured.Seconds() | ||
| } |
There was a problem hiding this comment.
actualRate uses nominal duration, not elapsed wall-clock.
measured := *duration - *warmup uses the configured duration, but runCtx is a child of the signal-cancelled base ctx (line 265), so a SIGINT aborts gen.Run early. In that case sent reflects a short actual run but is divided by the full nominal measured, underreporting the achieved rate.
Capture the generator start time and compute measured against time.Since(...) - warmup (clamped).
🛠️ Suggested change
runCtx, cancelRun := context.WithTimeout(ctx, *duration)
defer cancelRun()
warmupDeadline := time.Now().Add(*warmup)
+ genStart := time.Now()
genErr := gen.Run(runCtx)
+ elapsed := time.Since(genStart)
...
- measured := *duration - *warmup
+ measured := elapsed - *warmup
+ if measured < 0 {
+ measured = 0
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/main.go` around lines 265 - 304, The measured elapsed time uses
the configured duration instead of actual wall-clock runtime, so when runCtx is
cancelled early (runCtx, gen.Run) the computed measured := *duration - *warmup
underreports rate; fix by recording the generator start time (e.g., before
calling gen.Run), after gen.Run returns compute elapsed := time.Since(start) and
compute measured := elapsed - *warmup (clamped to >=0) before using it in
actualRate calculation, leaving the InjectCanonical switch and actualRate
assignment logic unchanged.
| if err := insertDocs(ctx, db.Collection("users"), f.Users); err != nil { | ||
| return err | ||
| } | ||
| if err := insertDocs(ctx, db.Collection("rooms"), f.Rooms); err != nil { | ||
| return err | ||
| } | ||
| if err := insertDocs(ctx, db.Collection("subscriptions"), f.Subscriptions); err != nil { | ||
| return err |
There was a problem hiding this comment.
Wrap the seeding step that failed.
insertDocs already adds collection context, but Seed still returns bare errors here. Add operation-level context so callers can distinguish which fixture phase failed.
Proposed fix
if err := insertDocs(ctx, db.Collection("users"), f.Users); err != nil {
- return err
+ return fmt.Errorf("seed users: %w", err)
}
if err := insertDocs(ctx, db.Collection("rooms"), f.Rooms); err != nil {
- return err
+ return fmt.Errorf("seed rooms: %w", err)
}
if err := insertDocs(ctx, db.Collection("subscriptions"), f.Subscriptions); err != nil {
- return err
+ return fmt.Errorf("seed subscriptions: %w", err)
}As per coding guidelines, “Always wrap errors with context using fmt.Errorf("short description: %w", err) … Never return bare err”.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err := insertDocs(ctx, db.Collection("users"), f.Users); err != nil { | |
| return err | |
| } | |
| if err := insertDocs(ctx, db.Collection("rooms"), f.Rooms); err != nil { | |
| return err | |
| } | |
| if err := insertDocs(ctx, db.Collection("subscriptions"), f.Subscriptions); err != nil { | |
| return err | |
| if err := insertDocs(ctx, db.Collection("users"), f.Users); err != nil { | |
| return fmt.Errorf("seed users: %w", err) | |
| } | |
| if err := insertDocs(ctx, db.Collection("rooms"), f.Rooms); err != nil { | |
| return fmt.Errorf("seed rooms: %w", err) | |
| } | |
| if err := insertDocs(ctx, db.Collection("subscriptions"), f.Subscriptions); err != nil { | |
| return fmt.Errorf("seed subscriptions: %w", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/seed.go` around lines 38 - 45, The Seed function returns bare
errors from insertDocs for each collection; wrap each returned error with
operation-level context using fmt.Errorf so callers know which fixture phase
failed. Specifically, change the three returns that propagate errors from
insertDocs(ctx, db.Collection("users"), f.Users), insertDocs(ctx,
db.Collection("rooms"), f.Rooms), and insertDocs(ctx,
db.Collection("subscriptions"), f.Subscriptions) to return fmt.Errorf("seed
users: %w", err), fmt.Errorf("seed rooms: %w", err), and fmt.Errorf("seed
subscriptions: %w", err) respectively (or similar descriptive messages) to
include collection/phase context.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
tools/loadgen/main.go (1)
270-276: Hard-coded 2 s drain wait.
time.Sleep(2 * time.Second)is a fixed grace window for trailing E1 replies and E2 broadcasts. At high rates or under degraded conditions, 2 s may not be enough, producing inflatedmissingReplies/missingBroadcastscounts (which in turn affectDetermineExitCode); at low rates it needlessly delays finalization. Consider making it configurable via a--drainflag (default 2 s) and/or exiting early onceE1Count+E2Counthas stopped advancing for one sample interval.As per coding guidelines: "Never use
time.Sleepfor goroutine synchronization — use proper sync primitives". This isn't strict goroutine sync, but a deterministic drain loop against the collector avoids the same class of flakiness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/main.go` around lines 270 - 276, Replace the hard-coded time.Sleep(2 * time.Second) drain with a configurable drain and a deterministic wait loop: add a --drain flag (default 2s) to parse into a time.Duration, then repeatedly poll the collector (using collector.DiscardBefore(warmupDeadline) / collector.Finalize() or a dedicated collector.Metrics() call) until either the elapsed drain time reaches the configured duration or the sum of E1Count+E2Count stops increasing for one sample interval; after the loop call cancelSamplers() and samplerWG.Wait() as before. Reference symbols: time.Sleep, --drain flag/value, warmupDeadline, collector.DiscardBefore, collector.Finalize, cancelSamplers, samplerWG.Wait.tools/loadgen/report_test.go (1)
116-135: Misleading comment inTestWriteCSV_RowWriteError.The comment claims the writer "succeeds the first write (header) but then a pipe that we close, so the row write fails" — but
pw.Close()is called before any write, so the header write is what fails, not the row write. The test still validates error propagation, but either update the comment to reflect the actual scenario, or delaypw.Close()until afterWriteCSVhas flushed its header if you actually want to exercise the row-write-after-header-succeeds path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/report_test.go` around lines 116 - 135, The test TestWriteCSV_RowWriteError currently closes pw before any writes, so the header write fails (not the row write) — update the comment to accurately state "header write fails because pw is closed before WriteCSV" or, if you intended to exercise a successful header write followed by a failing row write, move the pw.Close() so it occurs after the header is flushed (e.g., ensure WriteCSV has written the header or read the header from pr first, then close pw) while keeping the test assertion that WriteCSV returns an error; update references to pw.Close(), WriteCSV, and the header/row write behavior accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tools/loadgen/main.go`:
- Around line 377-391: writeCSVFile currently fills CSVSample.TimestampNs with
the loop index and never sets RequestID; update the data flow so the CSV matches
the header: change the Collector API (E1Samples/E2Samples) to return per-sample
structs including PublishedAt (time.Time), RequestID (string) and Latency
(time.Duration) (or add new methods that return these tuples), update CSVSample
to include RequestID and set TimestampNs = PublishedAt.UnixNano(), RequestID =
sample.RequestID, LatencyNs = sample.Latency.Nanoseconds(), and then modify
writeCSVFile to iterate those samples (instead of relying on int index) and call
WriteCSV with the populated rows; alternatively, if you prefer not to change
Collector, remove/rename the timestamp_ns and request_id columns in report.go to
reflect only metric and latency and adjust CSVSample and WriteCSV accordingly.
---
Nitpick comments:
In `@tools/loadgen/main.go`:
- Around line 270-276: Replace the hard-coded time.Sleep(2 * time.Second) drain
with a configurable drain and a deterministic wait loop: add a --drain flag
(default 2s) to parse into a time.Duration, then repeatedly poll the collector
(using collector.DiscardBefore(warmupDeadline) / collector.Finalize() or a
dedicated collector.Metrics() call) until either the elapsed drain time reaches
the configured duration or the sum of E1Count+E2Count stops increasing for one
sample interval; after the loop call cancelSamplers() and samplerWG.Wait() as
before. Reference symbols: time.Sleep, --drain flag/value, warmupDeadline,
collector.DiscardBefore, collector.Finalize, cancelSamplers, samplerWG.Wait.
In `@tools/loadgen/report_test.go`:
- Around line 116-135: The test TestWriteCSV_RowWriteError currently closes pw
before any writes, so the header write fails (not the row write) — update the
comment to accurately state "header write fails because pw is closed before
WriteCSV" or, if you intended to exercise a successful header write followed by
a failing row write, move the pw.Close() so it occurs after the header is
flushed (e.g., ensure WriteCSV has written the header or read the header from pr
first, then close pw) while keeping the test assertion that WriteCSV returns an
error; update references to pw.Close(), WriteCSV, and the header/row write
behavior accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 632386f2-4c64-4820-8617-58bed0d4d946
📒 Files selected for processing (6)
tools/loadgen/generator.gotools/loadgen/main.gotools/loadgen/main_test.gotools/loadgen/metrics.gotools/loadgen/report.gotools/loadgen/report_test.go
✅ Files skipped from review due to trivial changes (1)
- tools/loadgen/main_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- tools/loadgen/metrics.go
- tools/loadgen/report.go
| func writeCSVFile(path string, c *Collector) error { | ||
| f, err := os.Create(path) | ||
| if err != nil { | ||
| return fmt.Errorf("create csv: %w", err) | ||
| } | ||
| defer func() { _ = f.Close() }() | ||
| var rows []CSVSample | ||
| for i, d := range c.E1Samples() { | ||
| rows = append(rows, CSVSample{TimestampNs: int64(i), Metric: "E1", LatencyNs: d.Nanoseconds()}) | ||
| } | ||
| for i, d := range c.E2Samples() { | ||
| rows = append(rows, CSVSample{TimestampNs: int64(i), Metric: "E2", LatencyNs: d.Nanoseconds()}) | ||
| } | ||
| return WriteCSV(f, rows) | ||
| } |
There was a problem hiding this comment.
writeCSVFile emits incorrect timestamp_ns and always-empty request_id.
The CSV header declared in report.go is timestamp_ns,request_id,metric,latency_ns, but here TimestampNs is populated with the loop index (int64(i)) and RequestID is never set. Consumers parsing the exported CSV will get a monotonic 0..N counter labeled as nanoseconds and always-empty request IDs, which makes per-sample CSV export misleading for analysis and plotting.
Either (a) extend Collector to expose per-sample (publishedAt, requestID, latency) tuples and wire them through here, or (b) drop/rename the timestamp_ns/request_id columns so the CSV matches what is actually available.
🛠️ Sketch
- var rows []CSVSample
- for i, d := range c.E1Samples() {
- rows = append(rows, CSVSample{TimestampNs: int64(i), Metric: "E1", LatencyNs: d.Nanoseconds()})
- }
- for i, d := range c.E2Samples() {
- rows = append(rows, CSVSample{TimestampNs: int64(i), Metric: "E2", LatencyNs: d.Nanoseconds()})
- }
+ var rows []CSVSample
+ for _, s := range c.E1SamplesDetailed() { // returns publishedAt, requestID, latency
+ rows = append(rows, CSVSample{
+ TimestampNs: s.PublishedAt.UnixNano(),
+ RequestID: s.RequestID,
+ Metric: "E1",
+ LatencyNs: s.Latency.Nanoseconds(),
+ })
+ }
+ // ...same for E2 via MsgID🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/loadgen/main.go` around lines 377 - 391, writeCSVFile currently fills
CSVSample.TimestampNs with the loop index and never sets RequestID; update the
data flow so the CSV matches the header: change the Collector API
(E1Samples/E2Samples) to return per-sample structs including PublishedAt
(time.Time), RequestID (string) and Latency (time.Duration) (or add new methods
that return these tuples), update CSVSample to include RequestID and set
TimestampNs = PublishedAt.UnixNano(), RequestID = sample.RequestID, LatencyNs =
sample.Latency.Nanoseconds(), and then modify writeCSVFile to iterate those
samples (instead of relying on int index) and call WriteCSV with the populated
rows; alternatively, if you prefer not to change Collector, remove/rename the
timestamp_ns and request_id columns in report.go to reflect only metric and
latency and adjust CSVSample and WriteCSV accordingly.
…t a COLLSCAN broadcast-worker (and gatekeeper) enrich messages via userstore.FindUsersByAccounts which filters on the account field. Without an index, each enrichment at the large preset COLLSCANs 10k users (~10ms/msg), saturating the pipeline at rate=500. This was the cause of the 170x E2 latency gap between small and large presets. https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tools/loadgen/seed.go (1)
28-36: Optional: collapse the drop sequence via the same loop used inTeardown.The three drops here mirror
Teardownexactly. Reusing a small helper (or an inline loop over[]string{"users","rooms","subscriptions"}) removes the duplication and keeps the two code paths in sync if collections are added/renamed later.Proposed refactor
- if err := db.Collection("users").Drop(ctx); err != nil { - return fmt.Errorf("drop users: %w", err) - } - if err := db.Collection("rooms").Drop(ctx); err != nil { - return fmt.Errorf("drop rooms: %w", err) - } - if err := db.Collection("subscriptions").Drop(ctx); err != nil { - return fmt.Errorf("drop subscriptions: %w", err) - } + for _, c := range []string{"users", "rooms", "subscriptions"} { + if err := db.Collection(c).Drop(ctx); err != nil { + return fmt.Errorf("drop %s: %w", c, err) + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/seed.go` around lines 28 - 36, The three explicit collection Drop calls duplicate Teardown; replace them with a single loop or helper call so they stay in sync—either call the existing Teardown helper or iterate over []string{"users","rooms","subscriptions"} and call db.Collection(name).Drop(ctx) for each; update the function in seed.go containing those drops and remove the three individual Drop blocks to avoid duplication.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tools/loadgen/seed.go`:
- Around line 28-36: The three explicit collection Drop calls duplicate
Teardown; replace them with a single loop or helper call so they stay in
sync—either call the existing Teardown helper or iterate over
[]string{"users","rooms","subscriptions"} and call db.Collection(name).Drop(ctx)
for each; update the function in seed.go containing those drops and remove the
three individual Drop blocks to avoid duplication.
At high target rates (1000 msg/s), actual delivery was falling well below
target (~775/1000) because publishOne ran serially on the ticker goroutine
and time.Ticker drops missed ticks. Under any per-publish stall above the
1ms budget, a tick was silently lost.
Changes:
- Generator dispatches each tick's publish to a bounded goroutine pool
sized by the new MAX_IN_FLIGHT env var (default 200). Pool saturation
(pool full when a tick fires) is recorded as
loadgen_publish_errors_total{reason="saturated"} rather than silently
dropped, so throttling is observable in the summary / Grafana panel.
- MAX_IN_FLIGHT=0 preserves the legacy serial behavior for bisection.
- Generator.rng is protected by a mutex so publishOne is safe under
concurrent dispatch. Helper methods g.intn / g.float64 wrap access.
- On ctx cancel, Run waits for in-flight publishes to drain (5s grace).
- New opt-in pprof HTTP server gated on PPROF_ADDR (default empty; off).
Served on a dedicated addr, NOT the metrics port, so Prometheus scrapes
don't inadvertently expose profiling handlers.
- Docker-compose documents both new env vars.
New tests:
- TestGenerator_MaxInFlightZeroRunsSerially — legacy path still works.
- TestGenerator_PoolSaturationCountedAsError — blocking publisher + pool
of 1 forces saturation; counter increments.
Spec: docs/superpowers/specs/2026-04-24-loadgen-worker-pool-design.md
https://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
There was a problem hiding this comment.
🧹 Nitpick comments (5)
tools/loadgen/generator_test.go (1)
300-338: LGTM on saturation coverage; consider asserting saturated count is bounded by ticks.Test correctly drives the worker pool to saturation by combining
MaxInFlight: 1with a publisher that blocks until the run ends. One small hardening idea: withRate: 500over120msonly ~60 ticks fire; bounding the assertion (saturated >= someFloor && saturated <= ~60) would catch a regression where the saturation counter starts double-counting ticks. Optional — the lower-bound> 0already validates the primary contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/generator_test.go` around lines 300 - 338, Update TestGenerator_PoolSaturationCountedAsError to assert an upper bound (and optional lower bound) on the saturated counter to catch double-counting: compute the maximum possible ticks from the configured Rate and test duration (Rate=500 over 120ms → ~60 ticks) and add a small allowance (e.g. +1) then assert saturated <= maxTicks and still assert saturated > 0; reference the test name TestGenerator_PoolSaturationCountedAsError, the Generator config fields Rate and MaxInFlight, the test timeout (120*time.Millisecond) and the saturated variable when adding these bounds.tools/loadgen/main.go (2)
188-201: Document/restrict pprof bind address.
PPROF_ADDR=":6060"(the example in the spec) binds on all interfaces. In any environment where the loadgen container is reachable from outside the test network, that exposes goroutine dumps, allocation profiles, and/debug/pprof/cmdline(which leaks args). Two low-cost mitigations worth considering:
- Document in the compose file / spec that operators should prefer
127.0.0.1:6060(or a dedicated internal-only network) and only expose the port when actively profiling.- Optionally validate
cfg.PProfAddrand warn if it parses to a non-loopback host.Not a blocker — the env var is opt-in and defaults empty — but worth calling out so it lands in the runbook.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/main.go` around lines 188 - 201, The pprof server currently binds to cfg.PProfAddr (pprofSrv) which may be set to a non-loopback address like ":6060"; update the runbook/compose docs to recommend using "127.0.0.1:6060" or an internal-only network and only exposing the port when actively profiling, and add a runtime validation in main.go that parses cfg.PProfAddr and logs a warning via slog.Warn if the resolved host is not loopback before calling pprofSrv.ListenAndServe; reference cfg.PProfAddr, pprofSrv and the ListenAndServe goroutine for where to add the check and the documentation note.
188-305: pprof shutdown shares the 5 s metrics timeout — spec says 2 s.Lines 300‑305 use one
shutCtx(5 s) for bothmetricsSrv.ShutdownandpprofSrv.Shutdown. The design spec (line 125: "Onctx.Done(): gracefully shut down the pprof server with a 2 s timeout.") asks for an independent, shorter pprof timeout. Functionally minor, but worth aligning so the spec stays the source of truth, and so a hung pprof handler can't eat into the metrics-server shutdown budget.📝 Proposed tweak
shutCtx, cancelShut := context.WithTimeout(context.Background(), 5*time.Second) _ = metricsSrv.Shutdown(shutCtx) + cancelShut() if pprofSrv != nil { - _ = pprofSrv.Shutdown(shutCtx) + pprofShutCtx, cancelPprof := context.WithTimeout(context.Background(), 2*time.Second) + _ = pprofSrv.Shutdown(pprofShutCtx) + cancelPprof() } - cancelShut() _ = nc.Drain()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/main.go` around lines 188 - 305, The pprof shutdown is using the same 5s shutdown context as metricsSrv; change to use an independent 2s timeout context for pprofSrv.Shutdown so pprof cannot consume the metrics shutdown budget: keep the existing 5s shutCtx for metricsSrv.Shutdown (the call using metricsSrv.Shutdown) but create a new context (e.g., pprofShutCtx with context.WithTimeout(context.Background(), 2*time.Second)) and call pprofSrv.Shutdown(pprofShutCtx) (then cancel/close it); update the code paths referencing pprofSrv and the existing shutCtx accordingly.docs/superpowers/specs/2026-04-24-loadgen-worker-pool-design.md (1)
150-166: Use unique subheadings to silence MD024.
markdownlintflags duplicate### New unit testheadings at lines 150 and 162 (and the same will apply if any other top-level headings repeat). Consider giving each test its own descriptive subheading so the doc TOC and lint stay clean.📝 Proposed tweak
-### New unit test +### New unit test — `TestGenerator_MaxInFlightZeroRunsSerially` `TestGenerator_MaxInFlightZeroRunsSerially` — with `MaxInFlight=0`, the generator's behavior is unchanged from today. Reuses the existing `TestGenerator_SendsExpectedCount` assertion style. -### Adjusted unit test +### Adjusted unit test — `TestGenerator_SendsExpectedCount` `TestGenerator_SendsExpectedCount` — still valid with `MaxInFlight > 0`, but the count may be closer to the theoretical target since the ticker is no longer blocked. -### New unit test +### New unit test — `TestGenerator_PoolSaturationCountedAsError`🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/superpowers/specs/2026-04-24-loadgen-worker-pool-design.md` around lines 150 - 166, Replace the repeated "### New unit test" headings with unique, descriptive subheadings to satisfy MD024; for example rename the first to "### New unit test: TestGenerator_MaxInFlightZeroRunsSerially", the second occurrence to "### New unit test: TestGenerator_PoolSaturationCountedAsError" (and if keeping the adjusted test, use "### Adjusted unit test: TestGenerator_SendsExpectedCount"), updating those headings in the spec so each test name (TestGenerator_MaxInFlightZeroRunsSerially, TestGenerator_SendsExpectedCount, TestGenerator_PoolSaturationCountedAsError) is included in its subheading to avoid duplicate top-level headings.tools/loadgen/generator.go (1)
105-113: Drain-timeout log entry is missing — spec calls for it.The design spec (lines 89‑91) states: "On
ctx.Done(): stop the ticker, thenwg.Wait()with a bounded grace period (5 s). If the grace expires, log and return". The implementation silently falls through whendrainGracePeriodelapses, so an operator who hits this path has no signal that publishes were still in flight whenRunreturned. Aslog.Warnon the timeout branch makes the situation observable and lines up with the rest of the service's structured-logging conventions.📝 Proposed change
case <-ctx.Done(): done := make(chan struct{}) go func() { wg.Wait(); close(done) }() select { case <-done: case <-time.After(drainGracePeriod): + slog.Warn("generator drain grace expired with in-flight publishes", + "preset", g.cfg.Preset.Name, + "grace", drainGracePeriod) } return nilAs per coding guidelines: "Always use
log/slogwith JSON format … Use structured fields as key-value pairs in logging".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/loadgen/generator.go` around lines 105 - 113, The drain timeout branch in Run (on ctx.Done()) currently returns silently when the wg.Wait() grace period (drainGracePeriod) elapses; update the select waiting for done vs time.After to log a structured warning via slog.Warn when the time.After branch fires (include keys like "component":"loadgen", "event":"drain_timeout", "drainGracePeriod":drainGracePeriod, and optionally "pending":<true>), ensure the ticker is stopped before waiting, and then return nil as before so operators can observe that publishes were still in flight.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/superpowers/specs/2026-04-24-loadgen-worker-pool-design.md`:
- Around line 150-166: Replace the repeated "### New unit test" headings with
unique, descriptive subheadings to satisfy MD024; for example rename the first
to "### New unit test: TestGenerator_MaxInFlightZeroRunsSerially", the second
occurrence to "### New unit test: TestGenerator_PoolSaturationCountedAsError"
(and if keeping the adjusted test, use "### Adjusted unit test:
TestGenerator_SendsExpectedCount"), updating those headings in the spec so each
test name (TestGenerator_MaxInFlightZeroRunsSerially,
TestGenerator_SendsExpectedCount, TestGenerator_PoolSaturationCountedAsError) is
included in its subheading to avoid duplicate top-level headings.
In `@tools/loadgen/generator_test.go`:
- Around line 300-338: Update TestGenerator_PoolSaturationCountedAsError to
assert an upper bound (and optional lower bound) on the saturated counter to
catch double-counting: compute the maximum possible ticks from the configured
Rate and test duration (Rate=500 over 120ms → ~60 ticks) and add a small
allowance (e.g. +1) then assert saturated <= maxTicks and still assert saturated
> 0; reference the test name TestGenerator_PoolSaturationCountedAsError, the
Generator config fields Rate and MaxInFlight, the test timeout
(120*time.Millisecond) and the saturated variable when adding these bounds.
In `@tools/loadgen/generator.go`:
- Around line 105-113: The drain timeout branch in Run (on ctx.Done()) currently
returns silently when the wg.Wait() grace period (drainGracePeriod) elapses;
update the select waiting for done vs time.After to log a structured warning via
slog.Warn when the time.After branch fires (include keys like
"component":"loadgen", "event":"drain_timeout",
"drainGracePeriod":drainGracePeriod, and optionally "pending":<true>), ensure
the ticker is stopped before waiting, and then return nil as before so operators
can observe that publishes were still in flight.
In `@tools/loadgen/main.go`:
- Around line 188-201: The pprof server currently binds to cfg.PProfAddr
(pprofSrv) which may be set to a non-loopback address like ":6060"; update the
runbook/compose docs to recommend using "127.0.0.1:6060" or an internal-only
network and only exposing the port when actively profiling, and add a runtime
validation in main.go that parses cfg.PProfAddr and logs a warning via slog.Warn
if the resolved host is not loopback before calling pprofSrv.ListenAndServe;
reference cfg.PProfAddr, pprofSrv and the ListenAndServe goroutine for where to
add the check and the documentation note.
- Around line 188-305: The pprof shutdown is using the same 5s shutdown context
as metricsSrv; change to use an independent 2s timeout context for
pprofSrv.Shutdown so pprof cannot consume the metrics shutdown budget: keep the
existing 5s shutCtx for metricsSrv.Shutdown (the call using metricsSrv.Shutdown)
but create a new context (e.g., pprofShutCtx with
context.WithTimeout(context.Background(), 2*time.Second)) and call
pprofSrv.Shutdown(pprofShutCtx) (then cancel/close it); update the code paths
referencing pprofSrv and the existing shutCtx accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b8649a5c-23c6-448c-bfdb-6b4adeb10bab
📒 Files selected for processing (5)
docs/superpowers/specs/2026-04-24-loadgen-worker-pool-design.mdtools/loadgen/deploy/docker-compose.loadtest.ymltools/loadgen/generator.gotools/loadgen/generator_test.gotools/loadgen/main.go
🚧 Files skipped from review as they are similar to previous changes (1)
- tools/loadgen/deploy/docker-compose.loadtest.yml
Summary
Implements a complete load-testing harness for the single-site messaging pipeline (
message-gatekeeper→MESSAGES_CANONICAL→message-worker+broadcast-worker). The harness consists of a Go-based CLI tool with three subcommands (seed,run,teardown), a docker-compose deployment stack, and comprehensive metrics collection via Prometheus and optional Grafana dashboards.Key Changes
Core loadgen tool (
tools/loadgen/):main.go: Config parsing, subcommand dispatch, graceful shutdown via signal handlingpreset.go: Four built-in presets (small, medium, large, realistic) with deterministic fixture generationgenerator.go: Open-loop message publisher with configurable rate and injection mode (frontdoor or canonical)collector.go: Dual-path latency correlation (E1: gatekeeper ack, E2: broadcast visibility) via reply/broadcast subject subscribersconsumerlag.go: JetStream consumer info polling for backlog measurementreport.go: Terminal summary with percentile computation, CSV export, and exit-code logicmetrics.go: Prometheus registry with histograms, counters, and gaugesseed.go: MongoDB seeding/teardown for users, rooms, and subscriptionsDeployment (
tools/loadgen/deploy/):docker-compose.loadtest.yml: Full single-site stack (NATS+JetStream, MongoDB, Cassandra, gatekeeper, workers, loadgen, optional prometheus+grafana)Dockerfile: Multi-stage Alpine buildMakefile: Scoped targets forup,seed,run,run-dashboards,downTests:
preset_test.go: Determinism and fixture generation validationgenerator_test.go: Rate-pacing and subject format verificationcollector_test.go: E1/E2 correlation logicreport_test.go: Percentile math and CSV formattingmain_test.go: Utility function testsconsumerlag_test.go: Consumer sampler state trackingintegration_test.go: End-to-end wiring with testcontainers (NATS, MongoDB, Cassandra)Subject helpers (
pkg/subject/subject.go):UserResponseWildcard()andRoomEventWildcard()for collector subscriptionsImplementation Details
sync.Mapfor thread-safe trackingConsumerInfoevery 1s to track pending messages and redeliveriessignal.NotifyContextto propagate SIGINT/SIGTERM as context cancellation, allowing finalizers (summary, metrics drain) to run:9099; optional Grafana dashboard for real-time visualizationmongoutil.Connectand raw collection API to keep the tool focusedTesting Strategy
testifyassertions andtestcontainers-gofor container lifecycle managementhttps://claude.ai/code/session_01XjBvf9fek9i4DYnTdQzPqF
Summary by CodeRabbit
New Features
Documentation