Implements the second part of sourcing deduplication#7975
Implements the second part of sourcing deduplication#7975jnmoyne wants to merge 2 commits intonats-io:mainfrom
Conversation
… discard new per subject. Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
|
@claude please review. |
|
I'll analyze this and get back to you. |
|
@codex please review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d39d149c29
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
… kicking in the SourcingDeduplication tests. Signed-off-by: Jean-Noel Moyne <jnmoyne@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 92a687d64d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // Retry in all type of errors we do not want to skip if we are still leader. | ||
| if mset.isLeader() { | ||
| if !errors.Is(err, errMsgIdDuplicate) { | ||
| if !errors.Is(err, errMsgIdDuplicate) && !errors.Is(err, ErrMaxMsgsPerSubject) { |
There was a problem hiding this comment.
Retry
ErrMaxMsgsPerSubject instead of advancing the source
When a sourced stream with DiscardNewPer temporarily rejects a message because the subject is full, this new branch treats ErrMaxMsgsPerSubject like a duplicate and returns true after si.sseq has already been advanced. That permanently drops the blocked source sequence: if the destination later frees that subject slot (for example via purge, delete, or MaxAge expiration), sourcing resumes from the newer sequence and the missed update is never replayed. The previous retrySourceConsumerAtSeq(iName, si.sseq) path was what allowed these temporary per-subject overflows to recover.
Useful? React with 👍 / 👎.

Implements the second part of sourcing deduplication, extending #7896 to discard new per subject.
Signed-off-by: Jean-Noël Moyne jnmoyne@gmail.com