Skip to content

Commit 361041b

Browse files
Correctly unencrypt V2 data pages
1 parent 38b8fac commit 361041b

30 files changed

Lines changed: 519 additions & 136 deletions

parquet/file/file_reader_test.go

Lines changed: 324 additions & 109 deletions
Large diffs are not rendered by default.

parquet/file/page_reader.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,36 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
515515
return p.codec.Decode(buf, data), nil
516516
}
517517

518+
func (p *serializedPageReader) decompressV2(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) ([]byte, error) {
519+
// if encrypted, we need to decrypt before decompressing
520+
p.decompressBuffer.ResizeNoShrink(lenCompressed)
521+
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
522+
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
523+
return nil, err
524+
}
525+
data := p.decompressBuffer.Bytes()
526+
if p.cryptoCtx.DataDecryptor != nil {
527+
data = p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
528+
// encrypted + uncompressed -> just copy the decrypted data to output buffer
529+
if !compressed {
530+
copy(buf, data)
531+
return buf, nil
532+
}
533+
} else {
534+
// unencryped + uncompressed -> nothing to be done, data is already read into the buffer
535+
if !compressed {
536+
return buf, nil
537+
}
538+
}
539+
540+
// definition + repetition levels are always uncompressed
541+
if levelsBytelen > 0 {
542+
copy(buf, data[:levelsBytelen])
543+
data = data[levelsBytelen:]
544+
}
545+
return p.codec.Decode(buf[levelsBytelen:], data), nil
546+
}
547+
518548
type dataheader interface {
519549
IsSetStatistics() bool
520550
GetStatistics() *format.Statistics
@@ -628,7 +658,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo
628658
}
629659
continue
630660
}
631-
632661
rd.Discard(len(view) - int(remaining) + extra)
633662
break
634663
}
@@ -812,15 +841,9 @@ func (p *serializedPageReader) Next() bool {
812841
return false
813842
}
814843

815-
if compressed {
816-
if levelsBytelen > 0 {
817-
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
818-
}
819-
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
820-
return false
821-
}
822-
} else {
823-
io.ReadFull(p.r, buf.Bytes())
844+
if _, err := p.decompressV2(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
845+
p.err = err
846+
return false
824847
}
825848

826849
if buf.Len() != lenUncompressed {
2.61 KB
Binary file not shown.
2.6 KB
Binary file not shown.
2.61 KB
Binary file not shown.
2.61 KB
Binary file not shown.
2.48 KB
Binary file not shown.
2.47 KB
Binary file not shown.
2.49 KB
Binary file not shown.
2.48 KB
Binary file not shown.

0 commit comments

Comments
 (0)