Skip to content

Commit 68ebe43

Browse files
Add BufferedReaderV2 interface for advanced buffer management
- Introduce BufferedReaderV2 interface extending BufferedReader with Buffered() and Free() methods. - Enables explicit resource management and buffer introspection for readers using custom allocators. - Update GetStreamV2 to return BufferedReaderV2, allowing callers to free allocated buffers when done. - Keep BufferedReader and BufferedReader for backwards compatibility
1 parent ee5ac32 commit 68ebe43

6 files changed

Lines changed: 82 additions & 44 deletions

File tree

internal/utils/buf_reader.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type byteReader struct {
4040

4141
// NewByteReader creates a new ByteReader instance from the given byte slice.
4242
// It wraps the bytes.NewReader function to implement BufferedReader interface.
43-
// It is considered not to own the underlying byte slice, so the Free method is a no-op.
43+
// It is considered not to own the underlying byte slice.
4444
func NewByteReader(buf []byte) *byteReader {
4545
r := bytes.NewReader(buf)
4646
return &byteReader{
@@ -113,7 +113,11 @@ func (r *byteReader) BufferSize() int { return len(r.buf) }
113113

114114
func (r *byteReader) Buffered() int { return len(r.buf) - r.pos }
115115

116-
func (r *byteReader) Free() {}
116+
func (r *byteReader) Free() {
117+
r.r = nil
118+
r.buf = nil
119+
r.pos = 0
120+
}
117121

118122
// bytesBufferReader is a byte slice with a bytes reader wrapped around it.
119123
// It uses an allocator to allocate and free the underlying byte slice.

parquet/file/page_reader.go

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (d *DictionaryPage) Release() {
349349
func (d *DictionaryPage) IsSorted() bool { return d.sorted }
350350

351351
type serializedPageReader struct {
352-
r parquet.BufferedReader
352+
r parquet.BufferedReaderV2
353353
chunk *metadata.ColumnChunkMetaData
354354
colIdx int
355355
pgIndexReader *metadata.RowGroupPageIndexReader
@@ -421,6 +421,25 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp
421421
return nil
422422
}
423423

424+
type bufferedReaderV2Adapter struct {
425+
parquet.BufferedReader
426+
}
427+
428+
func (b *bufferedReaderV2Adapter) Buffered() int {
429+
return 0
430+
}
431+
432+
func (b *bufferedReaderV2Adapter) Free() {
433+
// no-op
434+
}
435+
436+
func getBufferedReaderV2(r parquet.BufferedReader) parquet.BufferedReaderV2 {
437+
if brV2, ok := r.(parquet.BufferedReaderV2); ok {
438+
return brV2
439+
}
440+
return &bufferedReaderV2Adapter{BufferedReader: r}
441+
}
442+
424443
// NewPageReader returns a page reader for the data which can be read from the provided reader and compression.
425444
//
426445
// Deprecated: This function isn't properly safe for public API use and should not be utilized
@@ -436,7 +455,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
436455
}
437456

438457
rdr := &serializedPageReader{
439-
r: r,
458+
r: getBufferedReaderV2(r),
440459
maxPageHeaderSize: defaultMaxPageHeaderSize,
441460
nrows: nrows,
442461
mem: mem,
@@ -458,7 +477,10 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
458477
func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext) {
459478
p.rowsSeen, p.pageOrd, p.nrows = 0, 0, nrows
460479
p.curPageHdr, p.curPage, p.err = nil, nil, nil
461-
p.r = r
480+
if p.r != nil && p.r != r {
481+
p.r.Free()
482+
}
483+
p.r = getBufferedReaderV2(r)
462484

463485
p.codec, p.err = compress.GetCodec(compressType)
464486
if p.err != nil {
@@ -508,26 +530,6 @@ func (p *serializedPageReader) Page() Page {
508530
return p.curPage
509531
}
510532

511-
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) {
512-
p.decompressBuffer.ResizeNoShrink(lenCompressed)
513-
514-
// Read directly into the memory.Buffer's backing slice
515-
n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
516-
if err != nil {
517-
return nil, err
518-
}
519-
if n != lenCompressed {
520-
return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n)
521-
}
522-
523-
data := p.decompressBuffer.Bytes()[:lenCompressed]
524-
if p.cryptoCtx.DataDecryptor != nil {
525-
data = p.cryptoCtx.DataDecryptor.Decrypt(data)
526-
}
527-
528-
return p.codec.Decode(buf, data), nil
529-
}
530-
531533
type dataheader interface {
532534
IsSetStatistics() bool
533535
GetStatistics() *format.Statistics
@@ -613,7 +615,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
613615
return nil, nil
614616
}
615617

616-
func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *format.PageHeader) error {
618+
func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReaderV2, hdr *format.PageHeader) error {
617619
allowedPgSz := defaultPageHeaderSize
618620
for {
619621
view, err := rd.Peek(allowedPgSz)
@@ -699,7 +701,7 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
699701

700702
// readOrStealData attempts to steal data from the buffered reader if enough is buffered,
701703
// otherwise reads from the underlying reader into the provided buffer.
702-
func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenCompressed int, buffer *memory.Buffer) ([]byte, error) {
704+
func (p *serializedPageReader) readOrStealData(r parquet.BufferedReaderV2, lenCompressed int, buffer *memory.Buffer) ([]byte, error) {
703705
// if enough data is buffered, steal it to avoid an extra copy
704706
if r.Buffered() >= lenCompressed {
705707
data, err := r.Peek(lenCompressed)
@@ -727,7 +729,7 @@ func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenComp
727729
}
728730

729731
func (p *serializedPageReader) getPageBytesV1(
730-
r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer,
732+
r parquet.BufferedReaderV2, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer,
731733
) ([]byte, error) {
732734
// 8 possible cases:
733735
// 1. enough data buffered (r.Buffered() >= lenCompressed)
@@ -773,7 +775,7 @@ func (p *serializedPageReader) getPageBytesV1(
773775
return data, nil
774776
}
775777

776-
func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.BufferedReader, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer) ([]byte, error) {
778+
func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.BufferedReaderV2, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer) ([]byte, error) {
777779
// Special case: unencrypted + compressed + has levels
778780
// Read levels directly into output buffer, compressed data into decompress buffer
779781
buffer.ResizeNoShrink(lenUncompressed)
@@ -800,7 +802,7 @@ func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.B
800802
}
801803

802804
func (p *serializedPageReader) getPageBytesV2(
803-
r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer,
805+
r parquet.BufferedReaderV2, isCompressed bool, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer,
804806
) ([]byte, error) {
805807
// Special case: unencrypted + compressed + has levels - read levels and compressed data separately
806808
if r.Buffered() < lenCompressed && p.cryptoCtx.DataDecryptor == nil && isCompressed && levelsBytelen > 0 {

parquet/file/row_group_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
114114
colLen += padding
115115
}
116116

117-
stream, err := r.props.GetStream(r.r, colStart, colLen)
117+
stream, err := r.props.GetStreamV2(r.r, colStart, colLen)
118118
if err != nil {
119119
return nil, err
120120
}

parquet/internal/testutils/pagebuilder.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ type DictionaryPageBuilder struct {
136136
func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder {
137137
return &DictionaryPageBuilder{
138138
encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain, true, d, mem).(encoding.DictEncoder),
139-
0, false}
139+
0, false,
140+
}
140141
}
141142

142143
func (d *DictionaryPageBuilder) AppendValues(values interface{}) encoding.Buffer {
@@ -243,9 +244,12 @@ func (m *MockPageReader) Err() error {
243244
}
244245

245246
func (m *MockPageReader) Reset(parquet.BufferedReader, int64, compress.Compression, *file.CryptoContext) {
247+
// no-op
246248
}
247249

248-
func (m *MockPageReader) SetMaxPageHeaderSize(int) {}
250+
func (m *MockPageReader) SetMaxPageHeaderSize(int) {
251+
// no-op
252+
}
249253

250254
func (m *MockPageReader) Page() file.Page {
251255
return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
@@ -278,8 +282,8 @@ func (m *MockPageReader) Next() bool {
278282
}
279283

280284
func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16,
281-
maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page {
282-
285+
maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding,
286+
) []file.Page {
283287
var (
284288
npages = len(valuesPerPage)
285289
defLvlStart = 0

parquet/reader_properties.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,20 @@ type ReaderProperties struct {
4646
BufferedStreamEnabled bool
4747
}
4848

49+
type BufferedReaderV2 interface {
50+
BufferedReader
51+
// Buffered returns the number of bytes already read and stored in the buffer
52+
Buffered() int
53+
// Free releases any resources held by the BufferedReader
54+
Free()
55+
}
56+
4957
type BufferedReader interface {
5058
Peek(int) ([]byte, error)
5159
Discard(int) (int, error)
5260
Outer() utils.Reader
53-
// Buffered returns the number of bytes already read and stored in the buffer
54-
Buffered() int
5561
BufferSize() int
5662
Reset(utils.Reader)
57-
Free()
5863
io.Reader
5964
}
6065

@@ -76,6 +81,29 @@ func (r *ReaderProperties) Allocator() memory.Allocator { return r.alloc }
7681
// If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise it will read the entire section
7782
// into a buffer in memory and return a bytes.NewReader for that buffer.
7883
func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) {
84+
if r.BufferedStreamEnabled {
85+
return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize), nil), nil
86+
}
87+
88+
data := make([]byte, nbytes)
89+
n, err := source.ReadAt(data, start)
90+
if err != nil {
91+
return nil, fmt.Errorf("parquet: tried reading from file, but got error: %w", err)
92+
}
93+
if n != int(nbytes) {
94+
return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n)
95+
}
96+
97+
return utils.NewByteReader(data), nil
98+
}
99+
100+
// GetStreamV2 returns a section of the underlying reader based on whether or not BufferedStream is enabled.
101+
//
102+
// If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise it will read the entire section
103+
// into a buffer in memory and return a bytes.NewReader for that buffer.
104+
// In comparison with GetStream, this version uses r.alloc to allocate the buffer for reading data and returns BufferedReaderV2,
105+
// to allow freeing the allocated buffer when no longer needed with the Free() method.
106+
func (r *ReaderProperties) GetStreamV2(source io.ReaderAt, start, nbytes int64) (BufferedReaderV2, error) {
79107
if r.BufferedStreamEnabled {
80108
return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize), r.alloc), nil
81109
}

parquet/reader_writer_properties_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,16 @@ func TestReaderPropsGetStreamWithAllocator(t *testing.T) {
8989

9090
// no leak on success
9191
props := parquet.NewReaderProperties(pool)
92-
bufRdr, err := props.GetStream(rdr, 0, int64(len(data)))
92+
bufRdr, err := props.GetStreamV2(rdr, 0, int64(len(data)))
9393
assert.NoError(t, err)
9494
bufRdr.Free()
9595

9696
// no leak on reader error
97-
_, err = props.GetStream(&mockReaderAt{}, 0, 10)
97+
_, err = props.GetStreamV2(&mockReaderAt{}, 0, 10)
9898
assert.Error(t, err)
9999

100100
// no leak on insufficient read
101-
_, err = props.GetStream(rdr, 0, int64(len(data)+10))
101+
_, err = props.GetStreamV2(rdr, 0, int64(len(data)+10))
102102
assert.Error(t, err)
103103
}
104104

@@ -113,19 +113,19 @@ func TestReaderPropsGetStreamBufferedWithAllocator(t *testing.T) {
113113
props.BufferedStreamEnabled = true
114114

115115
buf := make([]byte, len(data))
116-
bufRdr, err := props.GetStream(rdr, 0, int64(len(data)))
116+
bufRdr, err := props.GetStreamV2(rdr, 0, int64(len(data)))
117117
assert.NoError(t, err)
118118
_, err = bufRdr.Read(buf)
119119
assert.NoError(t, err)
120120
bufRdr.Free()
121121

122-
bufRdr, err = props.GetStream(&mockReaderAt{}, 0, 10)
122+
bufRdr, err = props.GetStreamV2(&mockReaderAt{}, 0, 10)
123123
assert.NoError(t, err)
124124
_, err = bufRdr.Read(buf)
125125
assert.Error(t, err)
126126
bufRdr.Free()
127127

128-
bufRdr, err = props.GetStream(rdr, 0, int64(len(data)+10))
128+
bufRdr, err = props.GetStreamV2(rdr, 0, int64(len(data)+10))
129129
assert.NoError(t, err)
130130
n, err := bufRdr.Read(buf)
131131
assert.NoError(t, err)

0 commit comments

Comments
 (0)