Skip to content

Pool compression buffer in encodeStreamMsgAllowCompressAndBatch#7871

Draft
wallyqs wants to merge 1 commit intomainfrom
wq/encode-stream-msg-allocs
Draft

Pool compression buffer in encodeStreamMsgAllowCompressAndBatch#7871
wallyqs wants to merge 1 commit intomainfrom
wq/encode-stream-msg-allocs

Conversation

@wallyqs
Copy link
Copy Markdown
Member

@wallyqs wallyqs commented Feb 24, 2026

Try avoiding double allocation happening on messages that are over 8KB by using sync.Pool, release buffer once compression done.

go test -run='^$' -bench='BenchmarkEncodeStreamMsg|BenchmarkEncodeDecodeStreamMsg' -benchmem ./server/

Signed-off-by: Waldemar Quevedo wally@nats.io

Reduce GC / memory used churn by avoiding double allocation
happening on messages that are over 8KB.

```
go test -run='^$' -bench='BenchmarkEncodeStreamMsg|BenchmarkEncodeDecodeStreamMsg' -benchmem ./server/
```

Signed-off-by: Waldemar Quevedo <wally@nats.io>
@wallyqs wallyqs force-pushed the wq/encode-stream-msg-allocs branch from 8107cca to 773f900 Compare February 24, 2026 12:42
Copy link
Copy Markdown
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to reuse an existing pool, e.g. nbPoolGet and nbPoolPut, or do you have a strong preference for a new one?

@neilalexander
Copy link
Copy Markdown
Member

@codex and @claude please review.

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Mar 10, 2026

Claude finished @neilalexander's task in 4m 55s —— View job


PR Review: Pool compression buffer in encodeStreamMsgAllowCompressAndBatch

  • Gather context and read changed files
  • Analyze the implementation
  • Provide review feedback

Overview

The PR avoids a double allocation during S2 compression by pooling the temporary scratch buffer (nbuf) used inside encodeStreamMsgAllowCompressAndBatch. The approach is broadly correct — pooling makes sense here since the buffer is only needed transiently. However there are a few issues worth discussing.


Critical Bug: Potential panic in batch message path

The key change in the compression block is:

// jetstream_cluster.go:9430-9442
if shouldCompress {
    nbuf := getCompressBuf(s2.MaxEncodedLen(elen))
    nbuf[0] = byte(compressedStreamMsgOp)
    ebuf := s2.Encode(nbuf[1:], buf[opIndex+1:])
    if len(ebuf) < len(buf) {                     // ← potentially too permissive
        buf[opIndex] = byte(compressedStreamMsgOp)
        copy(buf[opIndex+1:], ebuf)
        buf = buf[:len(ebuf)+opIndex+1]           // ← can panic
    }
    putCompressBuf(nbuf)
}

For non-batch messages (opIndex == 0) this is fine: len(ebuf) < len(buf) guarantees len(ebuf)+0+1 ≤ len(buf) = cap(buf).

For batch messages (opIndex > 0), buf contains:

  • buf[0..opIndex-1] — batch prefix (op + batch ID + batchSeq varint)
  • buf[opIndex] — stream msg op
  • buf[opIndex+1..] — the actual stream message payload

ebuf is the compressed form of buf[opIndex+1:]. The condition len(ebuf) < len(buf) can pass even when len(ebuf) >= len(buf) - opIndex - 1, i.e. when the compressed payload is actually larger than the uncompressed payload portion but still smaller than the total buffer (because the batch prefix bytes make up the difference). In that case:

  1. copy(buf[opIndex+1:], ebuf) silently truncates the copy (it copies only len(buf)-opIndex-1 bytes)
  2. buf = buf[:len(ebuf)+opIndex+1] panics because len(ebuf)+opIndex+1 > len(buf) == cap(buf)

The original code was safe here because it resliced the much larger nbuf (cap = s2.MaxEncodedLen(elen)), not buf.

Fix: change the compression condition to:

if len(ebuf)+opIndex+1 < len(buf) {

This ensures the final encoded message (batch prefix + compressed payload) is actually smaller than the original, which is the correct semantic check, and guarantees the reslice is in-bounds.

Fix this →


Minor: Dead code

// jetstream_cluster.go:9432
nbuf[0] = byte(compressedStreamMsgOp)

This byte is never used. ebuf is a sub-slice of nbuf[1:] and the result is copied directly into buf. The byte at nbuf[0] isn't referenced anywhere. This is a leftover from the old approach where nbuf itself was the output buffer. Safe to remove.


Minor: putCompressBuf allocates on every call

func putCompressBuf(buf []byte) {
    compressBufPool.Put(&buf)  // &buf escapes to heap
}

The &buf forces the local slice-header copy to escape to the heap on every call. With high message throughput this adds GC pressure — somewhat counterproductive for a change intended to reduce allocations. A cleaner approach that avoids the extra allocation is to expose the pointer through the API:

func getCompressBuf(sz int) *[]byte {
    bp := compressBufPool.Get().(*[]byte)
    if cap(*bp) >= sz {
        *bp = (*bp)[:sz]
        return bp
    }
    *bp = make([]byte, sz)
    return bp
}

func putCompressBuf(bp *[]byte) {
    compressBufPool.Put(bp)
}

Then at call sites: nbuf := getCompressBuf(...)nbuf := *getCompressBuf(...) and putCompressBuf(&nbuf)putCompressBuf(nbufp).


On nbPoolGet/nbPoolPut (re: @neilalexander's comment)

The existing nbPool family uses three fixed sizes: 512 B, 4 KB, and 64 KB (nbPoolSizeLarge). The compression buffer needs to be s2.MaxEncodedLen(elen), which for a 1 MB message is ~1.17 MB — well beyond 64 KB. nbPoolGet for any request > 64 KB returns a 64 KB buffer, which would cause s2.Encode to allocate a new buffer anyway. So reusing nbPool is not practical here; a dedicated pool with dynamic sizing (as implemented) is the right call.


Tests

The benchmarks are well-structured — covering compressible vs. incompressible data, various sizes below/above the 8 KB threshold, batch and non-batch paths, and a full encode/decode roundtrip. However, there are no functional correctness tests for the batch compression path specifically. Given the bug above, it would be good to add a test that exercises encodeStreamMsgAllowCompressAndBatch with a batch ID and a payload just above the compression threshold to verify the output is a valid decodable message.


Summary

Severity Issue
🔴 Bug Potential panic when opIndex > 0 (batch messages) and compressed payload is marginally smaller than len(buf)
🟡 Minor nbuf[0] = byte(compressedStreamMsgOp) is dead code
🟡 Minor putCompressBuf causes a heap allocation per call
ℹ️ Info New dedicated pool is the right choice over nbPool for variable-size compression buffers

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 773f900347

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 9436 to +9439
if len(ebuf) < len(buf) {
buf = nbuf[:len(ebuf)+opIndex+1]
buf[opIndex] = byte(compressedStreamMsgOp)
copy(buf[opIndex+1:], ebuf)
buf = buf[:len(ebuf)+opIndex+1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Compare compressed length to payload span before copying

For batched messages (opIndex > 0), s2.Encode can return output slightly larger than the uncompressed stream payload (for incompressible data this is typically src+5), and the current guard len(ebuf) < len(buf) still passes because len(buf) includes the batch prefix. In that case copy(buf[opIndex+1:], ebuf) truncates ebuf to the payload span, but the subsequent reslice keeps the larger length, producing a corrupted compressed frame (and potentially decode failures on replication) for large incompressible batched payloads.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 773f900347

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 9436 to +9439
if len(ebuf) < len(buf) {
buf = nbuf[:len(ebuf)+opIndex+1]
buf[opIndex] = byte(compressedStreamMsgOp)
copy(buf[opIndex+1:], ebuf)
buf = buf[:len(ebuf)+opIndex+1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Compare compressed size to payload before in-place copy

In encodeStreamMsgAllowCompressAndBatch, the guard len(ebuf) < len(buf) is checking against the full batched frame size instead of the compressible payload size (len(buf)-opIndex-1). For batch entries (opIndex > 0), incompressible S2 output can be a few bytes larger than the original payload but still smaller than the full frame, so this branch is taken and copy(buf[opIndex+1:], ebuf) truncates the compressed bytes to the old payload length; the subsequent reslice then exposes invalid trailing bytes, producing corrupt compressed batch records (and can panic if the new length exceeds cap(buf)).

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 773f900347

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 9436 to +9439
if len(ebuf) < len(buf) {
buf = nbuf[:len(ebuf)+opIndex+1]
buf[opIndex] = byte(compressedStreamMsgOp)
copy(buf[opIndex+1:], ebuf)
buf = buf[:len(ebuf)+opIndex+1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Compare compressed size against payload window

For batched messages (opIndex > 0), this branch copies ebuf into buf[opIndex+1:], but the guard checks len(ebuf) < len(buf) instead of comparing against len(buf)-opIndex-1. When S2 output is slightly larger than the raw payload (which can happen for incompressible data) but still smaller than the full batch frame, copy truncates the compressed bytes and buf = buf[:len(ebuf)+opIndex+1] can extend past the written data (or panic if beyond capacity), producing corrupted compressed batch entries.

Useful? React with 👍 / 👎.

wallyqs pushed a commit to wallyqs/nats-server that referenced this pull request Mar 10, 2026
Cherry-picked from wq/encode-stream-msg-allocs branch (PR nats-io#7871).
Reduces GC pressure by pooling S2 compression scratch buffers.

https://claude.ai/code/session_01LU1gqVBKHyaQdkJ6K9Ur35
wallyqs pushed a commit to wallyqs/nats-server that referenced this pull request Mar 10, 2026
Apply feedback from Claude and Codex reviewers:

1. Fix critical bug: compressed length comparison `len(ebuf) < len(buf)`
   did not account for batch prefix offset. For batched messages where
   opIndex > 0, the comparison must be against the payload window size
   `len(buf)-opIndex-1` to avoid data truncation or corruption.

2. Fix heap escape in putCompressBuf: taking `&buf` of a slice parameter
   causes the slice header to escape to heap on every call. Instead,
   return the pool pointer from getCompressBuf and pass it back to
   putCompressBuf directly.

3. Remove dead code: `nbuf[0] = byte(compressedStreamMsgOp)` was never
   used since the op byte is written into buf, not nbuf.

https://claude.ai/code/session_01LU1gqVBKHyaQdkJ6K9Ur35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants