Skip to content

Commit e4a8c79

Browse files
authored
[ADDED] Option to customize write buffer size (#2057)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
1 parent 9a88c4c commit e4a8c79

4 files changed

Lines changed: 120 additions & 2 deletions

File tree

nats.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const (
6161
DefaultMaxPingOut = 2
6262
DefaultMaxChanLen = 64 * 1024 // 64k
6363
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
64+
DefaultWriteBufSize = defaultBufSize
6465
RequestChanLen = 8
6566
DefaultDrainTimeout = 30 * time.Second
6667
DefaultFlusherTimeout = time.Minute
@@ -574,6 +575,14 @@ type Options struct {
574575
// IgnoreDiscoveredServers will disable adding advertised server URLs
575576
// from INFO messages to the server pool.
576577
IgnoreDiscoveredServers bool
578+
579+
// WriteBufferSize is an advanced option that sets the flush threshold
580+
// of the write buffer used to batch outgoing data before writing to
581+
// the underlying connection. In most cases, the default value should
582+
// not be changed. A smaller buffer reduces the amount of data that
583+
// can be lost on blocked writes but may significantly reduce throughput.
584+
// Defaults to 32768 bytes (32KB).
585+
WriteBufferSize int
577586
}
578587

579588
const (
@@ -1172,6 +1181,19 @@ func ReconnectBufSize(size int) Option {
11721181
}
11731182
}
11741183

1184+
// WriteBufferSize is an advanced option that sets the flush threshold
1185+
// of the write buffer used to batch outgoing data before writing to
1186+
// the underlying connection. In most cases, the default value should
1187+
// not be changed. A smaller buffer reduces the amount of data that
1188+
// can be lost on blocked writes but may significantly reduce throughput.
1189+
// Defaults to 32768 bytes (32KB).
1190+
func WriteBufferSize(size int) Option {
1191+
return func(o *Options) error {
1192+
o.WriteBufferSize = size
1193+
return nil
1194+
}
1195+
}
1196+
11751197
// Timeout is an Option to set the timeout for Dial on a connection.
11761198
// Defaults to 2s.
11771199
func Timeout(t time.Duration) Option {
@@ -1741,6 +1763,10 @@ func (o Options) Connect() (*Conn, error) {
17411763
if nc.Opts.ReconnectBufSize == 0 {
17421764
nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
17431765
}
1766+
// Default WriteBufferSize
1767+
if nc.Opts.WriteBufferSize <= 0 {
1768+
nc.Opts.WriteBufferSize = DefaultWriteBufSize
1769+
}
17441770
// Ensure that Timeout is not 0
17451771
if nc.Opts.Timeout == 0 {
17461772
nc.Opts.Timeout = DefaultTimeout
@@ -2080,7 +2106,7 @@ func (nc *Conn) newReaderWriter() {
20802106
off: -1,
20812107
}
20822108
nc.bw = &natsWriter{
2083-
limit: defaultBufSize,
2109+
limit: nc.Opts.WriteBufferSize,
20842110
plimit: nc.Opts.ReconnectBufSize,
20852111
}
20862112
}

nats_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2012-2023 The NATS Authors
1+
// Copyright 2012-2026 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -1915,3 +1915,30 @@ func TestValidateSubject(t *testing.T) {
19151915
})
19161916
}
19171917
}
1918+
1919+
func TestWriteBufferSize(t *testing.T) {
1920+
opts := GetDefaultOptions()
1921+
opts.WriteBufferSize = 64 * 1024
1922+
opts.Servers = []string{"nats://127.0.0.1:4222"}
1923+
nc := &Conn{Opts: opts}
1924+
nc.newReaderWriter()
1925+
1926+
if nc.bw.limit != 64*1024 {
1927+
t.Fatalf("Expected write buffer limit of %d, got %d", 64*1024, nc.bw.limit)
1928+
}
1929+
if len(nc.br.buf) != defaultBufSize {
1930+
t.Fatalf("Expected read buffer size of %d, got %d", defaultBufSize, len(nc.br.buf))
1931+
}
1932+
}
1933+
1934+
func TestWriteBufferSizeDefault(t *testing.T) {
1935+
opts := GetDefaultOptions()
1936+
opts.WriteBufferSize = DefaultWriteBufSize
1937+
opts.Servers = []string{"nats://127.0.0.1:4222"}
1938+
nc := &Conn{Opts: opts}
1939+
nc.newReaderWriter()
1940+
1941+
if nc.bw.limit != defaultBufSize {
1942+
t.Fatalf("Expected default write buffer limit of %d, got %d", defaultBufSize, nc.bw.limit)
1943+
}
1944+
}

test/bench_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package test
1515

1616
import (
17+
"fmt"
1718
"sync/atomic"
1819
"testing"
1920
"time"
@@ -255,3 +256,41 @@ func BenchmarkPublishValidation(b *testing.B) {
255256
b.StopTimer()
256257
})
257258
}
259+
260+
func BenchmarkPublishWithWriteBufferSize(b *testing.B) {
261+
payloads := []struct {
262+
name string
263+
size int
264+
}{
265+
{"16B", 16},
266+
{"128B", 128},
267+
{"512B", 512},
268+
}
269+
bufSizes := []int{512, 4096, 8192, 16384, 32768, 65536, 131072}
270+
271+
for _, p := range payloads {
272+
msg := make([]byte, p.size)
273+
for i := range msg {
274+
msg[i] = byte(i)
275+
}
276+
for _, sz := range bufSizes {
277+
b.Run(fmt.Sprintf("payload_%s/buf_%d", p.name, sz), func(b *testing.B) {
278+
s := RunDefaultServer()
279+
defer s.Shutdown()
280+
nc, err := nats.Connect(s.ClientURL(), nats.WriteBufferSize(sz))
281+
if err != nil {
282+
b.Fatalf("Failed to connect: %v", err)
283+
}
284+
defer nc.Close()
285+
b.ResetTimer()
286+
for range b.N {
287+
if err := nc.Publish("foo", msg); err != nil {
288+
b.Fatalf("Error publishing: %v", err)
289+
}
290+
}
291+
b.StopTimer()
292+
nc.Flush()
293+
})
294+
}
295+
}
296+
}

test/conn_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3563,3 +3563,29 @@ func TestTLSEOFAfterHandshakeBrokenPipe(t *testing.T) {
35633563
t.Fatalf("Expected error to wrap nats.ErrTLS, got: %v", err)
35643564
}
35653565
}
3566+
3567+
func TestWriteBufferSizeOption(t *testing.T) {
3568+
s := RunDefaultServer()
3569+
defer s.Shutdown()
3570+
3571+
nc, err := nats.Connect(s.ClientURL(), nats.WriteBufferSize(64*1024))
3572+
if err != nil {
3573+
t.Fatalf("Expected to connect, got: %v", err)
3574+
}
3575+
defer nc.Close()
3576+
3577+
if nc.Opts.WriteBufferSize != 64*1024 {
3578+
t.Fatalf("Expected WriteBufferSize 64KB, got %d", nc.Opts.WriteBufferSize)
3579+
}
3580+
3581+
sub, err := nc.SubscribeSync("foo")
3582+
if err != nil {
3583+
t.Fatalf("Error subscribing: %v", err)
3584+
}
3585+
if err := nc.Publish("foo", []byte("hello")); err != nil {
3586+
t.Fatalf("Error publishing: %v", err)
3587+
}
3588+
if _, err := sub.NextMsg(2 * time.Second); err != nil {
3589+
t.Fatalf("Error receiving message: %v", err)
3590+
}
3591+
}

0 commit comments

Comments
 (0)