Skip to content

(2.14) Asynchronous stream snapshots#7876

Open
sciascid wants to merge 1 commit intomainfrom
async-stream-snapshots
Open

(2.14) Asynchronous stream snapshots#7876
sciascid wants to merge 1 commit intomainfrom
async-stream-snapshots

Conversation

@sciascid
Copy link
Copy Markdown
Contributor

Commit 0033a15 extended Raft's interface with the ability to create snapshot checkpoints which can be used to install snapshots asynchronously.
This commit reuses the same interface to make stream snapshots asynchronous: writing of the snapshot file and Raft's log compaction steps are done in a separate goroutine.

Signed-off-by: Daniele Sciascia daniele@nats.io

@sciascid
Copy link
Copy Markdown
Contributor Author

This pull request tries to reduce the tail latency of publishing to a stream.
Below are results of publishing to a single stream, on a 3 node cluster deployed on a single machine. The table shows throughput in msg/s and various latency measures (min, 50th percentile, 90th percentile and so on) in microseconds. In each experiment the client runs for 60 seconds.

Without the optimization in this pull request we have the following results:

batch_size throughput min p50 p90 p99 p99.9 max
1 10,243 61 94 108 130 293 28,937
500 282,402 967 1492 2128 12,355 20,522 45,815

Under low load, single client publishing one message at a time, latency varies between 61 microseconds, up to ~29 milliseconds for unlucky messages. Under high load, single client publishing 500 messages at a time, latency goes from ~1ms all the way to ~46ms.

One cause for the high tail latency is due to periodic snapshotting of the stream. Taking a stream snapshot involves writing a file and sync it to disk, and doing it in a safe way requires more than one call to fsync (710407c).
During this time, the monitorStream goroutine is blocked and can't process incoming messages. The above patch "moves" the writing and syncing to disk, off the stream's main thread.

With the changes in this pull request, tail latency improves significantly:

batch_size throughput min p50 p90 p99 p99.9 max
1 10,411 61 94 107 132 253 7,377
500 309,047 1052 1484 2091 3,707 5,419 14,415

I used the "lats" benchmark client: https://github.com/synadia-labs/lats

@sciascid sciascid force-pushed the async-stream-snapshots branch from 6e05f0f to e32926e Compare February 25, 2026 11:52
@sciascid sciascid marked this pull request as ready for review February 25, 2026 12:41
@sciascid sciascid requested a review from a team as a code owner February 25, 2026 12:41
@sciascid sciascid force-pushed the async-stream-snapshots branch from e32926e to 2f79166 Compare February 25, 2026 14:37
Copy link
Copy Markdown
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@neilalexander
Copy link
Copy Markdown
Member

@claude and @codex please review!

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Mar 10, 2026

Claude finished @neilalexander's task in 4m 0s —— View job


Code Review: Asynchronous Stream Snapshots

  • Read the PR diff and changed files
  • Analyze the async implementation for correctness
  • Check for race conditions and error handling
  • Post review feedback

Summary

This PR moves the file write and Raft log compaction of stream snapshots off the monitorStream goroutine and into a separate background goroutine, using the existing RaftNodeCheckpoint interface introduced for meta-layer async snapshots. The approach is sound, the benchmarking shows meaningful tail-latency improvement, and the overall design mirrors the already-reviewed meta-layer async snapshot code.

Observations

Concurrency and mutex usage — looks correct

snapMu is used consistently to protect snapshotting, fallbackSnapshot, and failedSnapshots. The handleInstallResult closure is always called under snapMu, whether from the main goroutine (fallback path) or the async goroutine. The locally captured curState in doSnapshot is safe because only one snapshot can run at a time (guarded by snapshotting).

Behavior change: errCatchupsRunning at CreateSnapshotCheckpoint time

In the old code, errCatchupsRunning returned from n.InstallSnapshot was silently ignored — no warning, no failedSnapshots increment:

// old
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
    s.RateLimitWarnf(...)
    failedSnapshots++
}

In the new code, errCatchupsRunning can come from CreateSnapshotCheckpoint, and the guard only excludes errNoSnapAvailable and errNodeClosed:

// new (server/jetstream_cluster.go ~L3020)
if err != errNoSnapAvailable && err != errNodeClosed {
    s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", ...)
    failedSnapshots++
}

This means that while a catchup is in progress, each snapshot tick will log a warning and increment failedSnapshots — eventually causing forceSnapshot = failedSnapshots > 4 to kick in and override the catchup check. The meta-layer snapshot code has the same pattern (line ~1554), so this appears to be an intentional alignment rather than a bug, but it is a change in observable behaviour worth confirming. Fix this →

Shutdown with an in-flight async snapshot

On <-s.quitCh / <-mqch, the code sets fallbackSnapshot = true and then calls doSnapshot(false). If an async snapshot is already running, doSnapshot returns immediately (because snapshotting == true). The async goroutine is registered with s.grWG via s.startGoRoutine, so the server will wait for it to complete. This is correct and safe.

Redundant c.Abort() in handleInstallResult

When c.InstallSnapshot(snap) returns an error, handleInstallResult calls c.Abort() to reset n.snapshotting. However, checkpoint.InstallSnapshot already resets n.snapshotting unconditionally via defer func() { n.snapshotting = false }() (raft.go:1488). The c.Abort() call is a no-op but harmless. Worth a comment clarifying this is intentional defensive cleanup, or removing it.

Double lock/unlock in the timer case (minor)

// server/jetstream_cluster.go ~L3366
case <-t.C:
    snapMu.Lock()
    forceIfFailed := failedSnapshots > 0
    snapMu.Unlock()
    doSnapshot(forceIfFailed)  // acquires snapMu again

forceIfFailed is read under one critical section and then used in a second. The value could be stale, but doSnapshot re-checks the same condition internally under snapMu, so there's no correctness issue — just a slightly redundant lock cycle. Not worth changing.

startGoRoutine failure handling

If the server is already shutting down and startGoRoutine returns false, snapshotting is reset to false and c.Abort() is called. This correctly keeps the monitorStream and Raft states in sync. The subsequent snapshot attempt on shutdown (doSnapshot(false) from the quitCh/mqch cases) will see snapshotting == false and run the blocking fallback. Well handled.

Verdict

The implementation is correct and safe. The only item worth a second look is the errCatchupsRunning handling change — either it's intentional alignment with the meta-layer behaviour, or the condition should also exclude errCatchupsRunning to preserve the pre-existing silent-ignore semantics. Everything else looks good, and the latency numbers are compelling.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2f79166212

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +3020 to +3021
if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ignore catchup-in-progress on snapshot checkpoint creation

When CreateSnapshotCheckpoint fails because a peer catchup is active (errCatchupsRunning from raft.createSnapshotCheckpointLocked), this branch treats it as a hard failure, increments failedSnapshots, and shortens the compaction ticker. On busy clusters this turns normal catchup behavior into repeated failure accounting and eventually forced snapshots, which is a behavioral regression from the previous stream path that explicitly ignored errCatchupsRunning and can trigger unnecessary blocking snapshots/log churn.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would previously happen on the n.InstallSnapshot() call. This PR "splits" that call into n.CreateSnapshotCheckpoint() followed by InstallSnapshot on that checkpoint

@neilalexander neilalexander changed the title Asynchronous stream snapshots (2.14) Asynchronous stream snapshots Mar 10, 2026
@wallyqs
Copy link
Copy Markdown
Member

wallyqs commented Mar 10, 2026

needs rebase?

@sciascid sciascid force-pushed the async-stream-snapshots branch from 2f79166 to 5885726 Compare March 11, 2026 09:40
@sciascid
Copy link
Copy Markdown
Contributor Author

needs rebase?

done

Commit 0033a15 extended Raft's
interface with the ability to create snapshot checkpoints which
can be used to install snapshots asynchronously.
This commit reuses the same interface to make stream snapshots
asynchronous: writing of the snapshot file and Raft's log
compaction steps are done in a separate goroutine.

Signed-off-by: Daniele Sciascia <daniele@nats.io>
@sciascid sciascid force-pushed the async-stream-snapshots branch from 5885726 to e800634 Compare March 11, 2026 10:47
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e8006345c5

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

// to force it through. We might have been catching up a peer for a long period, and this
// protects our log size from growing indefinitely.
forceSnapshot := failedSnapshots > 4
c, err := n.CreateSnapshotCheckpoint(forceSnapshot)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-check catchup state before installing checkpoint

CreateSnapshotCheckpoint is invoked before flushAllPending() and stateSnapshot(), but the Raft catchup guard (len(n.progress) == 0) is only enforced at checkpoint creation time, not in checkpoint.InstallSnapshot. If a follower catchup starts in that window, this path will still install/compact a non-forced snapshot, which bypasses the previous safety behavior and can interrupt active catchups with avoidable retry/snapshot churn.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants