Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions parquet/compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"bytes"
"io"
"math/rand"
"sync"
"testing"

"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -179,3 +181,69 @@ func TestUnmarshalTextError(t *testing.T) {
err := compression.UnmarshalText([]byte("NO SUCH CODEC"))
assert.EqualError(t, err, "not a valid CompressionCodec string")
}

// BenchmarkZstdPooledEncodeAll compares zstd EncodeAll throughput and allocation
// overhead for pooled encoders created with the default concurrency (GOMAXPROCS
// inner block encoders) vs concurrency=1 (single inner block encoder).
//
// Each inner block encoder carries a ~1 MiB history buffer allocated on first
// use (ensureHist). With the default, a pooled encoder pre-allocates GOMAXPROCS
// of these buffers even though EncodeAll only ever uses one at a time. Setting
// concurrency=1 eliminates the wasted allocations.
//
// The benchmark uses semi-random data (seeded random blocks mixed with repeated
// patterns) to exercise the encoder's history window realistically — matching
// typical parquet page payloads that contain a mix of unique and repeated values.
func BenchmarkZstdPooledEncodeAll(b *testing.B) {
// 256 KiB of semi-random data — typical parquet page size.
// Mix random and repeated segments so the encoder exercises its full
// match-finding and history-window code paths.
data := make([]byte, 256*1024)
r := rand.New(rand.NewSource(42))
r.Read(data)
// Overwrite ~25% with repeated pattern to give the encoder something to match.
pattern := []byte("parquet-page-data-pattern-0123456789abcdef")
for i := 0; i < len(data)/4; i += len(pattern) {
copy(data[i:], pattern)
}

for _, tc := range []struct {
name string
concurrency int
}{
{"Default", 0}, // GOMAXPROCS inner encoders
{"Concurrency1", 1}, // single inner encoder
} {
b.Run(tc.name, func(b *testing.B) {
pool := &sync.Pool{
New: func() interface{} {
opts := []zstd.EOption{
zstd.WithZeroFrames(true),
zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(3)),
}
if tc.concurrency > 0 {
opts = append(opts, zstd.WithEncoderConcurrency(tc.concurrency))
}
enc, _ := zstd.NewWriter(nil, opts...)
return enc
},
}
codec, err := compress.GetCodec(compress.Codecs.Zstd)
if err != nil {
b.Fatal(err)
}
dst := make([]byte, codec.CompressBound(int64(len(data))))

b.SetBytes(int64(len(data)))
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
enc := pool.Get().(*zstd.Encoder)
enc.EncodeAll(data, dst[:0])
enc.Reset(nil)
pool.Put(enc)
}
})
}
}
2 changes: 1 addition & 1 deletion parquet/compress/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (p *zstdEncoderPool) getEncoderFromPool(level zstd.EncoderLevel) *zstd.Enco
if !ok {
pool = &sync.Pool{
New: func() interface{} {
enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level))
enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level), zstd.WithEncoderConcurrency(1))
return enc
},
}
Expand Down