Skip to content

Commit 0f7b399

Browse files
Read uncompressed data directly into the page buffer
1 parent 045f2d1 commit 0f7b399

3 files changed

Lines changed: 61 additions & 16 deletions

File tree

internal/utils/buf_reader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ func (r *byteReader) Reset(Reader) {}
111111

112112
func (r *byteReader) BufferSize() int { return len(r.buf) }
113113

114+
func (r *byteReader) Buffered() int { return len(r.buf) - r.pos }
115+
114116
func (r *byteReader) Free() {}
115117

116118
// bytesBufferReader is a byte slice with a bytes reader wrapped around it.

parquet/file/page_reader.go

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ type serializedPageReader struct {
374374
dataPageBuffer *memory.Buffer
375375
dictPageBuffer *memory.Buffer
376376
err error
377+
378+
isCompressed bool
377379
}
378380

379381
func (p *serializedPageReader) Close() error {
@@ -402,6 +404,7 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp
402404
return err
403405
}
404406
p.codec = codec
407+
p.isCompressed = compressType != compress.Codecs.Uncompressed
405408

406409
if ctx != nil {
407410
p.cryptoCtx = *ctx
@@ -444,6 +447,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
444447
dictPageBuffer: memory.NewResizableBuffer(mem),
445448
}
446449
rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
450+
rdr.isCompressed = compressType != compress.Codecs.Uncompressed
447451
if ctx != nil {
448452
rdr.cryptoCtx = *ctx
449453
rdr.initDecryption()
@@ -460,6 +464,8 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp
460464
if p.err != nil {
461465
return
462466
}
467+
p.isCompressed = compressType != compress.Codecs.Uncompressed
468+
463469
if ctx != nil {
464470
p.cryptoCtx = *ctx
465471
p.initDecryption()
@@ -502,6 +508,36 @@ func (p *serializedPageReader) Page() Page {
502508
return p.curPage
503509
}
504510

511+
func (p *serializedPageReader) stealFromBuffer(br parquet.BufferedReader, lenUncompressed int) ([]byte, error) {
512+
data, err := br.Peek(lenUncompressed)
513+
if err != nil {
514+
return nil, err
515+
}
516+
if p.cryptoCtx.DataDecryptor != nil {
517+
data = p.cryptoCtx.DataDecryptor.Decrypt(data)
518+
}
519+
// advance the reader
520+
_, err = br.Discard(lenUncompressed)
521+
if err != nil && err != io.EOF {
522+
return nil, err
523+
}
524+
return data, nil
525+
}
526+
527+
func (p *serializedPageReader) readUncompressed(br parquet.BufferedReader, lenUncompressed int, buf []byte) ([]byte, error) {
528+
n, err := io.ReadFull(br, buf[:lenUncompressed])
529+
if err != nil {
530+
return nil, err
531+
}
532+
if n != lenUncompressed {
533+
return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n)
534+
}
535+
if p.cryptoCtx.DataDecryptor != nil {
536+
buf = p.cryptoCtx.DataDecryptor.Decrypt(buf)
537+
}
538+
return buf, nil
539+
}
540+
505541
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) {
506542
p.decompressBuffer.ResizeNoShrink(lenCompressed)
507543
data := p.decompressBuffer.Bytes()
@@ -627,20 +663,17 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
627663
return nil, errors.New("parquet: invalid page header (negative number of values)")
628664
}
629665

630-
p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
631-
buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
632-
633-
data, err := p.decompress(rd, lenCompressed, buf.Bytes())
666+
data, err := p.getPageBytes(rd, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer)
634667
if err != nil {
635-
return nil, err
668+
return nil, fmt.Errorf("parquet: could not read dictionary page data: %w", err)
636669
}
637670
if len(data) != lenUncompressed {
638671
return nil, fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
639672
}
640673

641674
return &DictionaryPage{
642675
page: page{
643-
buf: buf,
676+
buf: memory.NewBufferBytes(data),
644677
typ: hdr.Type,
645678
nvals: dictHeader.GetNumValues(),
646679
encoding: dictHeader.GetEncoding(),
@@ -736,6 +769,20 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
736769
return p.err
737770
}
738771

772+
func (p *serializedPageReader) getPageBytes(
773+
r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer,
774+
) ([]byte, error) {
775+
if isCompressed {
776+
buffer.ResizeNoShrink(lenUncompressed)
777+
return p.decompress(r, lenCompressed, buffer.Bytes())
778+
}
779+
if r.Buffered() >= lenCompressed {
780+
return p.stealFromBuffer(r, lenCompressed)
781+
}
782+
buffer.ResizeNoShrink(lenUncompressed)
783+
return p.readUncompressed(r, lenCompressed, buffer.Bytes())
784+
}
785+
739786
func (p *serializedPageReader) Next() bool {
740787
// Loop here because there may be unhandled page types that we skip until
741788
// finding a page that we do know what to do with
@@ -775,10 +822,7 @@ func (p *serializedPageReader) Next() bool {
775822
return false
776823
}
777824

778-
p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
779-
buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
780-
781-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
825+
data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer)
782826
if err != nil {
783827
p.err = err
784828
return false
@@ -791,7 +835,7 @@ func (p *serializedPageReader) Next() bool {
791835
// make dictionary page
792836
p.curPage = &DictionaryPage{
793837
page: page{
794-
buf: buf,
838+
buf: memory.NewBufferBytes(data),
795839
typ: p.curPageHdr.Type,
796840
nvals: dictHeader.GetNumValues(),
797841
encoding: dictHeader.GetEncoding(),
@@ -807,13 +851,10 @@ func (p *serializedPageReader) Next() bool {
807851
return false
808852
}
809853

810-
p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
811-
buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
812-
813854
firstRowIdx := p.rowsSeen
814855
p.rowsSeen += int64(dataHeader.GetNumValues())
815856

816-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
857+
data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dataPageBuffer)
817858
if err != nil {
818859
p.err = err
819860
return false
@@ -826,7 +867,7 @@ func (p *serializedPageReader) Next() bool {
826867
// make datapagev1
827868
p.curPage = &DataPageV1{
828869
page: page{
829-
buf: buf,
870+
buf: memory.NewBufferBytes(data),
830871
typ: p.curPageHdr.Type,
831872
nvals: dataHeader.GetNumValues(),
832873
encoding: dataHeader.GetEncoding(),

parquet/reader_properties.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type BufferedReader interface {
5050
Peek(int) ([]byte, error)
5151
Discard(int) (int, error)
5252
Outer() utils.Reader
53+
// Buffered returns the number of bytes already read and stored in the buffer
54+
Buffered() int
5355
BufferSize() int
5456
Reset(utils.Reader)
5557
Free()

0 commit comments

Comments
 (0)