JetStream: Implement MaxAckPendingPerSubject#7910
JetStream: Implement MaxAckPendingPerSubject#7910zenozaga wants to merge 6 commits intonats-io:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4066b49fb4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
server/consumer.go
Outdated
| if o.ptc == nil { | ||
| o.ptc = make(map[string]int) | ||
| } | ||
| for seq, p := range o.pending { |
There was a problem hiding this comment.
Reset per-subject counters before rebuilding restored state
applyState() rebuilds ptc by incrementing counts from o.pending, but it never clears existing entries first. Since applyState() is reached from both stored-state load and snapshot/state updates, repeated application can double-count the same pending entries; with small MaxAckPendingPerSubject, acknowledgments then leave stale counts behind and the subject stays blocked even when there are no real pending messages.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Now resetting the ptc map in applyState to ensure idempotency during state recovery
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: db627ad515
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
server/consumer.go
Outdated
| if config.MaxAckPendingPerSubject == 0 { | ||
| if pedantic && streamCfg.ConsumerLimits.MaxAckPendingPerSubject > 0 { | ||
| return NewJSPedanticError(errors.New("max_ack_pending_per_subject must be set if it's configured in stream limits")) | ||
| } | ||
| config.MaxAckPendingPerSubject = streamCfg.ConsumerLimits.MaxAckPendingPerSubject |
There was a problem hiding this comment.
Enforce per-subject limits when the consumer leaves it unset
setConsumerConfigDefaults() only fills MaxAckPendingPerSubject from stream-level limits when the field is zero, but it never applies server/account limits the way MaxAckPending does. As a result, creating an AckExplicit/AckAll consumer with max_ack_pending_per_subject omitted under a positive account/server cap leaves the value at 0, and checkConsumerCfg() accepts it because it only rejects values > the cap; this bypasses the configured policy and allows unlimited per-subject pending acks.
Useful? React with 👍 / 👎.
server/consumer.go
Outdated
| if o.cfg.MaxAckPendingPerSubject > 0 && subj != _EMPTY_ { | ||
| o.ptc[subj]++ |
There was a problem hiding this comment.
Keep per-subject counters consistent when pending is removed
This increments o.ptc for new pending entries, but pending entries are also removed through non-ack paths (for example stream purge/expiry/truncation reconciliation) that delete from o.pending without decrementing o.ptc. Once that happens, getNextMsg() still sees o.ptc[subj] >= MaxAckPendingPerSubject and can keep blocking deliveries for that subject even though no real pending message remains.
Useful? React with 👍 / 👎.
db627ad to
4c9bf1e
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4c9bf1edfa
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
server/consumer.go
Outdated
| if o.pendingSubjectHitLimit(pmsg.subj) { | ||
| pmsg.returnToPool() | ||
| return nil, 0, errMaxAckPending |
There was a problem hiding this comment.
Skip blocked subjects instead of stalling delivery cursor
When pendingSubjectHitLimit is true, this branch returns errMaxAckPending before advancing o.sseq, so the consumer keeps retrying the same blocked stream sequence. With a sequence like A1, A2, B1 and MaxAckPendingPerSubject=1, once A1 is pending the consumer repeatedly stops at A2 and never reaches B1 until A1 is acked, which lets one hot subject head-of-line block unrelated subjects.
Useful? React with 👍 / 👎.
| // Add to pending. | ||
| o.state.Pending[sseq] = &Pending{dseq, ts} | ||
| o.state.Pending[sseq] = &Pending{dseq, ts, subj} |
There was a problem hiding this comment.
Preserve original pending sequence on file-store redelivery
This path now always rewrites state.Pending[sseq] even when sseq was already delivered, replacing the original consumer delivery sequence with the redelivery sequence. UpdateAcks() depends on p.Sequence staying original to advance ack floors correctly, so file-backed consumers can end up with incorrect ack-floor progression and persisted pending state after redeliveries.
Useful? React with 👍 / 👎.
| } | ||
| // Store in pending. | ||
| state.Pending[sseq] = &Pending{dseq, ts} | ||
| state.Pending[sseq] = &Pending{dseq, ts, _EMPTY_} |
There was a problem hiding this comment.
Persist pending subjects in recovered consumer state
Recovered pending entries are forced to Subject: _EMPTY_, so restart/snapshot restore drops subject identity for all in-flight acks. Since per-subject throttling relies on pending subjects, recovered consumers cannot accurately reapply MaxAckPendingPerSubject for already-pending messages and may over-deliver until those entries drain.
Useful? React with 👍 / 👎.
…t in MaxAckPendingPerSubject This commit addresses the following issues identified in the code review and audit: - Fixed head-of-line blocking in getNextMsg by skipping blocked subjects without losing messages. - Fixed pending sequence corruption in filestore UpdateDelivered by adding missing else branch. - Fixed 5 cases of ptc drift where delete(o.pending) bypassed decPendingSubject. - Fixed ineffassign lint error. Signed-off-by: Randy stiven Valentin <zenozaga@gmail.com>
f9419e3 to
b11c5a4
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b11c5a47de
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| o.updateSkipped(sseq + 1) | ||
| } | ||
| o.sseq = sseq + 1 | ||
| } |
There was a problem hiding this comment.
Preserve blocked sequences when advancing delivery cursor
When getNextMsg() skips a blocked subject and later finds an unblocked message, this branch still advances o.sseq to sseq+1, which moves the cursor past earlier blocked sequences that were never delivered. With a stream like A1, A2, B1 and MaxAckPendingPerSubject=1, delivering B1 sets o.sseq past A2; after A1 is acked, A2 is no longer revisited and can be permanently skipped.
Useful? React with 👍 / 👎.
| if p, ok := o.pending[seq]; ok && p != nil { | ||
| delete(o.pending, seq) | ||
| o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) | ||
| o.removeFromPending(seq) | ||
| o.updateDelivered(p.Sequence, seq, dc, p.Timestamp, p.Subject) |
There was a problem hiding this comment.
Signal after max-delivery removal frees subject capacity
hasMaxDeliveries() now removes pending entries through removeFromPending(), but it discards the boolean that indicates a subject limit transition and only signals on global MaxAckPending. If an expired message hits MaxDeliver and was the only pending message for a subject at MaxAckPendingPerSubject, the consumer is not woken up to retry blocked messages on that subject, so push consumers can remain stalled until unrelated activity triggers another wakeup.
Useful? React with 👍 / 👎.
This fixes CI build failures when JetStream tests are explicitly skipped. Signed-off-by: Randy stiven Valentin <zenozaga@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5f45fae339
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if o.cfg.MaxAckPendingPerSubject <= 0 || subj == _EMPTY_ || o.ptc == nil { | ||
| return false |
There was a problem hiding this comment.
Decrement subject counters when toggling per-subject limits
decPendingSubject exits early whenever MaxAckPendingPerSubject <= 0, so if this limit is disabled on a live consumer, pending entries are removed/acked, and then the limit is enabled again, o.ptc can retain stale counts from before disablement. pendingSubjectHitLimit will then treat those subjects as still saturated and block fresh deliveries even though no real pending entries remain.
Useful? React with 👍 / 👎.
server/consumer.go
Outdated
| // Check if these are no longer valid. | ||
| if seq < fseq || seq <= o.asflr { | ||
| delete(o.pending, seq) | ||
| o.removeFromPending(seq) |
There was a problem hiding this comment.
Signal after checkPending removes subject-limited pending entries
This path removes invalid pending entries with removeFromPending(seq) but drops the returned transition flag, so when that removal is what frees a subject from MaxAckPendingPerSubject, no wakeup is sent to resume blocked deliveries. For push consumers already waiting after a subject-limit stall, this can leave delivery paused until unrelated traffic triggers another signal.
Useful? React with 👍 / 👎.
This commit: - Fixes a bug where o.sseq would not skip stream holes (purges/compactions) due to unconditional savedSseq restoration. - Adds //go:build !skip_js_tests to the new test file to fix CI build errors. - Verified with TestJetStreamConsumerPendingLowerThanStreamFirstSeq. Signed-off-by: Randy stiven Valentin <zenozaga@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e91e20dc9a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
server/consumer.go
Outdated
| } | ||
| } | ||
| delete(o.pending, seq) | ||
| o.removeFromPending(seq) |
There was a problem hiding this comment.
Signal after purge removes a subject-limited pending entry
removeFromPending() returns whether a subject just transitioned from blocked to available, but this purge path ignores that signal. When MaxAckPendingPerSubject is hit and a pending message is removed by stream purge (or wider purge reconciliation), the consumer may not be woken to retry delivery for that subject, so push/pull delivery can stay stalled until unrelated traffic triggers another wakeup.
Useful? React with 👍 / 👎.
This commit addresses several P2/P3 type issues identified during a full audit: - Fixes missing signals in purge, reconcile, and cleanup paths to avoid consumer stalls on blocked subjects. - Implements subject reconstruction from the stream store during consumer recovery to maintain limit enforcement across restarts. - Unifies all pending removals through removeFromPending to prevent ptc drift. Signed-off-by: Randy stiven Valentin <zenozaga@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b2209783a2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if o.cfg.MaxAckPendingPerSubject <= 0 || subj == _EMPTY_ { | ||
| return |
There was a problem hiding this comment.
Rebuild subject pending counts after enabling per-subject caps
incPendingSubject skips tracking whenever MaxAckPendingPerSubject is unset, but existing pending entries are still kept in o.pending. If a live consumer is later updated from 0/unset to a positive max_ack_pending_per_subject, those already-pending messages are never backfilled into o.ptc, so pendingSubjectHitLimit enforces the cap only for new deliveries and can over-deliver on subjects that were already in flight before the update.
Useful? React with 👍 / 👎.
When MaxAckPendingPerSubject is enabled during a consumer update, we now correctly backfill the per-subject tracking from currently pending messages. This ensures the limit is enforced for messages already in flight. Added TestJetStreamMaxAckPendingPerSubjectConfigUpdate to verify. Signed-off-by: Randy stiven Valentin <zenozaga@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c0558821e6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if cfg.MaxAckPendingPerSubject != o.cfg.MaxAckPendingPerSubject { | ||
| if cfg.MaxAckPendingPerSubject <= 0 { | ||
| o.ptc = nil | ||
| } | ||
| } |
There was a problem hiding this comment.
Wake stalled deliveries after per-subject limit increase
updateConfig() handles MaxAckPendingPerSubject changes by mutating o.ptc/o.cfg, but it never calls signalNewMessages() when the cap is raised or disabled. In the delivery loop (loopAndGatherMsgs), a push consumer that previously went to waitForMsgs because all available messages were blocked by the old per-subject cap will keep waiting on mch and not retry getNextMsg() until unrelated traffic (new publish/ack) arrives, so config updates that should immediately unblock delivery can appear ineffective.
Useful? React with 👍 / 👎.
|
We have previously considered this feature and decided not to implement it. In NATS we very often use more complex subjects than you suggest, so lets take the usual ORDERS examples you might have But what you need is ordered consumption on ID not the whole subject. So in our most typical use case this just would not achieve the desired result. We're on-and-off working on the consumer groups function that would address you need but that's parked for the moment. I am not sure we're ready to accept this feature given above notes but of course happy for other input from others. |
Thanks, that makes sense and I appreciate the explanation I agree that in many cases the key is really the ID, not the full subject. I also thought about the option of using something like The more natural layout for me ends up being I was able to work around this on my side with pull/fetch and client-side scheduling, so I’m okay for now. I mainly wanted to clarify that the use case is a bit broader than ordering by ID alone. Thanks, I appreciate your time and I’m glad to hear the groups feature |
|
This PR perfectly addresses a major architectural challenge we are currently facing! Our Use Case The Problem Why Current Workarounds Fall Short
Having the ability to automatically suspend the delivery of new messages on a per-subject basis until the active message is ACKed is the exact missing piece for our architecture. Huge +1 for this feature! |
Yeah, exactly. The noisy neighbor problem keeps becoming common At first I thought this might need something like a nested consumer. Then I started thinking that something like But after the response I got from the team, it looks like they may be thinking about something more robust. So I started thinking about an idea that came to mind and that could maybe be even better, something like this: // messaging
ConsumerConfig{
ConcurrencyPerSubject: []SubjectConcurrencyRule{
{
Match: "*.meta",
Max: 5,
},
{
Match: "*.messenger",
Max: 10,
},
},
}
// plans
ConsumerConfig{
ConcurrencyPerSubject: []SubjectConcurrencyRule{
{
Match: "*.premium",
Max: 5,
},
{
Match: "*.basic",
Max: 1,
},
},
}That’s more or less the kind of idea I feel could work better. It may sound like a bit of a strange idea, but it could solve the same problem as MaxAckPendingPerSubject in a more flexible way. For now, I guess all we can do is be patient and trust that the team will eventually move forward with the group feature. |
Adds MaxAckPendingPerSubject to ConsumerConfig to limit outstanding ACKs on a per-subject basis. This allows for granular flow control and prevents a single subject from exhausting the consumer's total MaxAckPending capacity.
This implementation resolves #4273.
Example
Consider a consumer subscribed to
messaging.group.>withMaxAckPending: 10andMaxAckPendingPerSubject: 1.This configuration allows the consumer to process messages for up to 10 different groups simultaneously, while restricting each specific child subject to only one message at a time. If multiple messages arrive for
messaging.group.A, the server delivers the first one and holds the rest, ensuring that the remaining global concurrency slots stay available for other subjects likemessaging.group.B. As soon as a subject is acknowledged, the server proactively resumes delivery for the next available message of that literal subject.Signed-off-by: Randy stiven Valentin zenozaga@gmail.com