Skip to content

Raft batching#7890

Draft
sciascid wants to merge 4 commits intomainfrom
raft-batching-opt
Draft

Raft batching#7890
sciascid wants to merge 4 commits intomainfrom
raft-batching-opt

Conversation

@sciascid
Copy link
Copy Markdown
Contributor

@sciascid sciascid commented Mar 2, 2026

Optimized Raft batching

Signed-off-by: Daniele Sciascia daniele@nats.io

For performance evaluation only.

Signed-off-by: Daniele Sciascia <daniele@nats.io>
// We can recover from the snapshot and the tail
// of the log.
fsCfg.SyncAlways = false
fsCfg.SyncOnFlush = true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream can be scaled down and up at any time, would these need to be updated then as well?

Mentioned this also in another comment relating to fsyncing in FlushAllPending, but perhaps this improved fsyncing strategy should be in its own PR?

n.RUnlock()
return werr
}
n.RUnlock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProposeMulti below is still holding the n.RLock() when pushing the proposal, should that also be the case here? Then it could just be a defer n.RUnlock() above.

defer n.Unlock()
// Don't check if we're leader before sending and storing, this is used on scaleup.
n.sendAppendEntryLocked([]*Entry{{EntrySnapshot, data}}, false)
n.prop.push(newProposedEntry(newEntry(EntrySnapshot, data), _EMPTY_))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be reverted or was there a specific reason to potentially batch this?

SendSnapshot isn't used often enough to have it be eligible for batching is what I'm thinking (only once during a stream/consumer scaleup). Also as the use of SendSnapshot has lead to issues in the past, I'd probably want to not touch it unless we're sure about it.

Comment on lines +2957 to +2959
"index": index,
"reserved": reserved,
"membershipChangeIndex": n.membChangeIndex,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add the account, group and ID here?

Suggested change
"index": index,
"reserved": reserved,
"membershipChangeIndex": n.membChangeIndex,
"n.accName": n.accName,
"n.group": n.group,
"n.id": n.id,
"index": index,
"reserved": reserved,
"membershipChangeIndex": n.membChangeIndex,

Comment on lines +4435 to +4441
l := len(ae.entries)
if l == 0 {
return false
}
if l == 1 {
return ae.entries[0].Type != EntryLeaderTransfer
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could optionally shorten:

Suggested change
l := len(ae.entries)
if l == 0 {
return false
}
if l == 1 {
return ae.entries[0].Type != EntryLeaderTransfer
}
if l := len(ae.entries); l == 0 {
return false
} else if l == 1 {
return ae.entries[0].Type != EntryLeaderTransfer
}

// Specifically done while holding the lock to not race.
n.RLock()
if checkLeader && n.State() != Leader {
n.RUnlock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock needs to be held when calling n.debug below, as its value may otherwise race if the config is reloaded.

n.aflr = n.pindex
index, err := n.sendAppendEntry([]*Entry{peerState}, true)
if err != nil {
return
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is ignored here, should we log about this and step down?

fs.mu.Lock()
defer fs.mu.Unlock()
fs.checkAndFlushLastBlock()
if fs.fcfg.SyncOnFlush {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this change should be made as part of this PR. Or is actually safe.. although I do agree that fsync-ing for every append entry and then once again for every single message is quite slow.

Imagine 5 message blocks that were written to but not synced, then this should be fine. But if the OS did decide to sync some blocks or only the last block, we'd come up and not be able to recover the data written in the previous blocks? (as we check for the recovered last sequence and when replaying from the snapshot during recovery skip over any messages below this last sequence)

A safer change, for the time being, would be to immediately sync only the last block when we flush and close the last message block to open the next in fs.newMsgBlockForWrite.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call FlushAllPending before a stream snapshot is created, if SyncOnFlush is set then we will call into syncBlocks, causing all message blocks that are marked as needSync to be synced to stable storage. My understanding is that if a snapshot was installed in Raft, then all indexes up to last sequence in the snapshot will be synced. Upon recovery, we seem to restore the last snapshot, and reset the raft state starting from the last sequence found in that snapshot. All subsequent sequences are then replayed.

A safer change, for the time being, would be to immediately sync only the last block when we flush and close the last message block to open the next in fs.newMsgBlockForWrite.

I don't think that would be enough. Suppose we have one full block b1 and then creating a new block b2, which is only partially full. b1 was synced when b2 was created. Taking a snapshot at this point would include b1, but could possibly loose b2. Meaning that we still need to sync b2, i.e the last block, before we take the snapshot.

Copy link
Copy Markdown
Member

@MauriceVanVeen MauriceVanVeen Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon recovery, we seem to restore the last snapshot, and reset the raft state starting from the last sequence found in that snapshot. All subsequent sequences are then replayed.

Indeed, but replaying for the stream does not mean we start of with the snapshot only and then try persisting the replayed messages again.

Imagine we have no snapshot and 100 messages spread in various append entries. It could be possible the OS choose to sync the last message block on its own before we tried to sync? In that case it could be our last sequence of 100 revives the restart, but all prior messages didn't.

The following lines would then see you skipping messages 1-99 since our last sequence is 100:

if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.accountLocked(needLock), mset.nameLocked(needLock), lseq+1-clfs, last)

Copy link
Copy Markdown
Member

@MauriceVanVeen MauriceVanVeen Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be enough. Suppose we have one full block b1 and then creating a new block b2, which is only partially full. b1 was synced when b2 was created. Taking a snapshot at this point would include b1, but could possibly loose b2. Meaning that we still need to sync b2, i.e the last block, before we take the snapshot.

Yep, would be okay by me to also sync b2 when we try to install the snapshot. I was mostly highlighting that we wouldn't get into a situation where the OS chose to sync b2 first before we got to b1, then being restarted and only recovering b2 but not b1 (since that wasn't synced yet). The logic outlined in my previous message would then see you replay but skip the lost messages since the last sequence is higher.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that the OS/filesystem could choose to reorder within a file. In that case, I believe this optimized syncing strategy, combined with the above skipping wouldn't work nicely. If we want to optimize this, I think we need a way to replay all entries in the raft wal, in a idempotent way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had been thinking about that as well. Instead of having replay "skip" messages that were already processed (due to high last seq), truncate based on the snapshot instead. That would reliably clean up anything in the stream "after" the snapshot and replaying will write all the messages in the same order again. But am still wondering then whether that should be in its own PR and this PR remains focused primarily on the Raft batching in raft.go.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some experiments with lazyfs. I can confirm pretty much everything that was discussed here. One thing to notice, and is relevant to this discussion: If the filesystem reorders writes within a block file, leaving a hole, recovery will set last sequence for that block file to whatever it found before the hole (if anything).
That's still problematic, we can end up with gaps between files, or entire blocks missing.
The truncate strategy, is apparently working though.

Previously with SyncAlways enabled we had one fd.Sync()
call per raft append entry, plus one for each message
batched in the append entry. This resulted in unnecessary
calls fd.Sync().
A replicated stream can recover from the stream snapshot
and the Raft log. So we keep the syncing the Raft log as
usual, and introduce sync the stream's filestore prior to
to taking a snapshot.

Signed-off-by: Daniele Sciascia <daniele@nats.io>
Reduce contention between Propose() and sendAppendEntry() by changing
Propose() to take a read lock and by avoiding holding raft.mu across
storeToWAL(), which may block on IO.

This requires stored append entries to be emitted only from the leader
goroutine. To enforce that:

* do not store EntryLeaderTransfer in the Raft log
* push EntryPeerState and EntrySnapshot through the proposal queue
* batch normal proposals in runAsLeader()
* keep EntrySnapshot and membership changes standalone so they are not
  batched with normal entries
* let EntrySnapshot skip the leader check when appended from the leader loop

Signed-off-by: Daniele Sciascia <daniele@nats.io>
Cap batches to the configured max payload size and to 65535
entries, the maximum representable in an AppendEntry message.

Signed-off-by: Daniele Sciascia <daniele@nats.io>
@sciascid sciascid force-pushed the raft-batching-opt branch from 82a15b0 to 5a0b857 Compare March 10, 2026 09:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants