Skip to content

Commit 5a94422

Browse files
authored
perf(parquet/compress): set zstd pool encoder concurrency to 1 (#717)
The zstdEncoderPool is used exclusively by EncodeAll(), which is a single-shot synchronous call that uses exactly one inner block encoder. However, zstd.NewWriter defaults concurrent to runtime.GOMAXPROCS, pre-allocating that many inner block encoders — each with its own ~1 MiB history buffer (ensureHist). On a 10-core machine, each pooled Encoder allocates 10 inner encoders when only 1 is ever used by EncodeAll. With WithEncoderConcurrency(1), each pooled encoder creates a single inner encoder, matching actual usage. The streaming Write/Close path is unaffected — it does not use the pool. Benchmark results (Apple M4 Pro, arm64, 256 KiB semi-random data): BenchmarkZstdPooledEncodeAll/Default-14 11000 B/op 5250 MB/s BenchmarkZstdPooledEncodeAll/Concurrency1-14 810 B/op 5500 MB/s 14x less memory per operation, ~5% higher throughput from reduced GC pressure. In a parquet write workload (1 GiB Arrow data, ZSTD level 3), this reduced ensureHist allocations from 22 GiB to 7 GiB and madvise kernel CPU from 4.6s to 2.3s (10% wall-time improvement). ### Rationale for this change High memory churn during parquet encoding ### What changes are included in this PR? Change to zstd encoder concurrency, a benchmark to reproduce results. ### Are these changes tested? Yes ### Are there any user-facing changes? No
1 parent 7ae2e33 commit 5a94422

2 files changed

Lines changed: 69 additions & 1 deletion

File tree

parquet/compress/compress_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"bytes"
2121
"io"
2222
"math/rand"
23+
"sync"
2324
"testing"
2425

2526
"github.com/apache/arrow-go/v18/parquet/compress"
27+
"github.com/klauspost/compress/zstd"
2628
"github.com/stretchr/testify/assert"
2729
)
2830

@@ -179,3 +181,69 @@ func TestUnmarshalTextError(t *testing.T) {
179181
err := compression.UnmarshalText([]byte("NO SUCH CODEC"))
180182
assert.EqualError(t, err, "not a valid CompressionCodec string")
181183
}
184+
185+
// BenchmarkZstdPooledEncodeAll compares zstd EncodeAll throughput and allocation
186+
// overhead for pooled encoders created with the default concurrency (GOMAXPROCS
187+
// inner block encoders) vs concurrency=1 (single inner block encoder).
188+
//
189+
// Each inner block encoder carries a ~1 MiB history buffer allocated on first
190+
// use (ensureHist). With the default, a pooled encoder pre-allocates GOMAXPROCS
191+
// of these buffers even though EncodeAll only ever uses one at a time. Setting
192+
// concurrency=1 eliminates the wasted allocations.
193+
//
194+
// The benchmark uses semi-random data (seeded random blocks mixed with repeated
195+
// patterns) to exercise the encoder's history window realistically — matching
196+
// typical parquet page payloads that contain a mix of unique and repeated values.
197+
func BenchmarkZstdPooledEncodeAll(b *testing.B) {
198+
// 256 KiB of semi-random data — typical parquet page size.
199+
// Mix random and repeated segments so the encoder exercises its full
200+
// match-finding and history-window code paths.
201+
data := make([]byte, 256*1024)
202+
r := rand.New(rand.NewSource(42))
203+
r.Read(data)
204+
// Overwrite ~25% with repeated pattern to give the encoder something to match.
205+
pattern := []byte("parquet-page-data-pattern-0123456789abcdef")
206+
for i := 0; i < len(data)/4; i += len(pattern) {
207+
copy(data[i:], pattern)
208+
}
209+
210+
for _, tc := range []struct {
211+
name string
212+
concurrency int
213+
}{
214+
{"Default", 0}, // GOMAXPROCS inner encoders
215+
{"Concurrency1", 1}, // single inner encoder
216+
} {
217+
b.Run(tc.name, func(b *testing.B) {
218+
pool := &sync.Pool{
219+
New: func() interface{} {
220+
opts := []zstd.EOption{
221+
zstd.WithZeroFrames(true),
222+
zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(3)),
223+
}
224+
if tc.concurrency > 0 {
225+
opts = append(opts, zstd.WithEncoderConcurrency(tc.concurrency))
226+
}
227+
enc, _ := zstd.NewWriter(nil, opts...)
228+
return enc
229+
},
230+
}
231+
codec, err := compress.GetCodec(compress.Codecs.Zstd)
232+
if err != nil {
233+
b.Fatal(err)
234+
}
235+
dst := make([]byte, codec.CompressBound(int64(len(data))))
236+
237+
b.SetBytes(int64(len(data)))
238+
b.ReportAllocs()
239+
b.ResetTimer()
240+
241+
for i := 0; i < b.N; i++ {
242+
enc := pool.Get().(*zstd.Encoder)
243+
enc.EncodeAll(data, dst[:0])
244+
enc.Reset(nil)
245+
pool.Put(enc)
246+
}
247+
})
248+
}
249+
}

parquet/compress/zstd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (p *zstdEncoderPool) getEncoderFromPool(level zstd.EncoderLevel) *zstd.Enco
6464
if !ok {
6565
pool = &sync.Pool{
6666
New: func() interface{} {
67-
enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level))
67+
enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level), zstd.WithEncoderConcurrency(1))
6868
return enc
6969
},
7070
}

0 commit comments

Comments
 (0)