Skip to content

Commit 1c2dfea

Browse files
fix: correct decryption of V2 data pages
Extract separate decryptV2 and decompressV2 methods to properly handle decryption and decompression of V2 data pages. Previously, V2 encrypted pages were not being decrypted before decompression, leading to failures. The fix ensures that: - Encrypted V2 pages are decrypted before decompression - Uncompressed encrypted V2 pages are properly handled - Level bytes (definition/repetition) are correctly preserved in both paths Expand encryption test coverage to validate the fix across: - Multiple compression types (snappy, uncompressed) - Both data page versions (V1, V2) - All encryption configurations (uniform, column-only, column+footer)
1 parent 4acb3d1 commit 1c2dfea

5 files changed

Lines changed: 141 additions & 53 deletions

File tree

parquet/encryption_read_config_test.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
295295
// Read all rows in column
296296
i = 0
297297
for int96reader.HasNext() {
298-
var (
299-
val [1]parquet.Int96
300-
)
298+
var val [1]parquet.Int96
301299

302300
// read one value at a time. the number of rows read is returned. values
303301
// read contains the number of non-null rows
@@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, en
553551
// once the file is read and the second exists in parquet-testing/data folder
554552
func (d *TestDecryptionSuite) TestDecryption() {
555553
tests := []struct {
556-
file string
557-
config uint
554+
file string
555+
config uint
556+
isInDataStorage bool
558557
}{
559-
{"uniform_encryption.parquet.encrypted", 1},
560-
{"encrypt_columns_and_footer.parquet.encrypted", 2},
561-
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
562-
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
563-
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
564-
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
558+
{"uniform_encryption.parquet.encrypted", 1, true},
559+
{"uniform_encryption.parquet.uncompressed.encrypted", 1, false},
560+
{"uniform_encryption.parquet.v2.encrypted", 1, false},
561+
{"uniform_encryption.parquet.v2.uncompressed.encrypted", 1, false},
562+
{"encrypt_columns_and_footer.parquet.encrypted", 2, true},
563+
{"encrypt_columns_and_footer.parquet.uncompressed.encrypted", 2, false},
564+
{"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false},
565+
{"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false},
566+
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true},
567+
{"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false},
568+
{"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3, false},
569+
{"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3, false},
570+
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true},
571+
{"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false},
572+
{"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4, false},
573+
{"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false},
574+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true},
575+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", 5, false},
576+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5, false},
577+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", 5, false},
578+
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true},
579+
{"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false},
580+
{"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6, false},
581+
{"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false},
565582
}
566583
for _, tt := range tests {
567584
d.Run(tt.file, func() {
@@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() {
576593
}
577594
os.Remove(tmpFile)
578595

579-
file := path.Join(getDataDir(), tt.file)
580-
d.Require().FileExists(file)
596+
if tt.isInDataStorage {
597+
file := path.Join(getDataDir(), tt.file)
598+
d.Require().FileExists(file)
581599

582-
for idx := range d.decryptionConfigs {
583-
decConfig := idx + 1
584-
d.Run(fmt.Sprintf("config %d", decConfig), func() {
585-
d.checkResults(file, uint(decConfig), tt.config)
586-
})
600+
for idx := range d.decryptionConfigs {
601+
decConfig := idx + 1
602+
d.Run(fmt.Sprintf("config %d", decConfig), func() {
603+
d.checkResults(file, uint(decConfig), tt.config)
604+
})
605+
}
587606
}
588607
})
589608
}

parquet/encryption_write_config_test.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ import (
6161
* keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
6262
*/
6363

64-
var (
65-
tempdir string
66-
)
64+
var tempdir string
6765

6866
type EncryptionConfigTestSuite struct {
6967
suite.Suite
@@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct {
7977
columnEncryptionKey2 string
8078
}
8179

82-
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) {
80+
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string, writerOpts ...parquet.WriterProperty) {
8381
filename = filepath.Join(tempdir, filename)
8482

85-
props := parquet.NewWriterProperties(
83+
opts := []parquet.WriterProperty{
8684
parquet.WithPageIndexEnabled(true),
8785
parquet.WithCompression(compress.Codecs.Snappy),
88-
parquet.WithEncryptionProperties(configs))
86+
parquet.WithEncryptionProperties(configs),
87+
}
88+
opts = append(opts, writerOpts...)
89+
props := parquet.NewWriterProperties(opts...)
8990
outFile, err := os.Create(filename)
9091
en.Require().NoError(err)
9192
en.Require().NotNil(outFile)
@@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryption
135136

136137
// write the int64 column, each row repeats twice
137138
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
138-
for i := 0; i < 2*en.rowsPerRG; i++ {
139+
for i := 0; i < en.rowsPerRG; i++ {
139140
var (
140-
defLevel = [1]int16{1}
141-
repLevel = [1]int16{0}
142-
value = int64(i) * 1000 * 1000 * 1000 * 1000
141+
defLevels = []int16{1, 1}
142+
repLevels = []int16{0, 1}
143+
values = []int64{
144+
int64(i*2) * 1000 * 1000 * 1000 * 1000,
145+
int64(i*2+1) * 1000 * 1000 * 1000 * 1000,
146+
}
143147
)
144-
if i%2 == 0 {
145-
repLevel[0] = 0
146-
} else {
147-
repLevel[0] = 1
148-
}
149148

150-
n, err := int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:])
151-
en.EqualValues(1, n)
149+
n, err := int64Writer.WriteBatch(values, defLevels, repLevels)
150+
en.EqualValues(2, n)
152151
en.Require().NoError(err)
153152
}
154153

@@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() {
263262
// (uniform encryption)
264263
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
265264
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
266-
en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
265+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.encrypted")
266+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
267+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
268+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
269+
parquet.WithDataPageVersion(parquet.DataPageV2))
267270
}
268271

269272
// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
@@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() {
273276
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
274277

275278
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
276-
en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted")
279+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.encrypted")
280+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
281+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
282+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
283+
parquet.WithDataPageVersion(parquet.DataPageV2))
277284
}
278285

279286
// Encryption Config 3: encrypt two columns, with different keys.
@@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() {
285292
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
286293

287294
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter())
288-
en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
295+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
296+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
297+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
298+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
299+
parquet.WithDataPageVersion(parquet.DataPageV2))
289300
}
290301

291302
// Encryption Config 4: Encrypt two columns and the footer, with different keys
@@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
296307
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
297308

298309
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName))
299-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
310+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
311+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
312+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
313+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
314+
parquet.WithDataPageVersion(parquet.DataPageV2))
300315
}
301316

302317
// Encryption Config 5: Encrypt Two columns and the footer, with different keys
@@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
307322
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
308323

309324
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage())
310-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
325+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
326+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
327+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
328+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
329+
parquet.WithDataPageVersion(parquet.DataPageV2))
311330
}
312331

313332
// Encryption Config 6: Encrypt two columns and the footer, with different keys.
@@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() {
318337
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
319338

320339
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
321-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
340+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
341+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
342+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
343+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
344+
parquet.WithDataPageVersion(parquet.DataPageV2))
322345
}
323346

324347
func TestFileEncryption(t *testing.T) {

parquet/file/column_reader_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() {
551551
// bytes: the page header reports 1 byte for repetition levels even
552552
// though the max rep level is 0. If that byte isn't skipped then
553553
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
554-
pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
554+
pageData := [...]byte{
555+
0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
555556
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
556-
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
557+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
558+
}
557559

558560
p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
559561
parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false))
@@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) {
733735

734736
for _, dataPageVersion := range []parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
735737
t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), func(t *testing.T) {
736-
737738
props := parquet.NewWriterProperties(parquet.WithAllocator(mem),
738739
parquet.WithDataPageVersion(dataPageVersion), parquet.WithDataPageSize(1),
739740
parquet.WithPageIndexEnabled(true))

parquet/file/file_reader_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() {
207207

208208
p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
209209
p.dataPageHdrV2.NumValues = nrows
210-
p.WriteDataPageHeaderV2(1024, 20, 10)
210+
p.WriteDataPageHeaderV2(1024, 0, 0)
211211
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
212212
p.True(p.pageReader.Next())
213213
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
@@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() {
310310

311311
func TestWithEOFReader(t *testing.T) {
312312
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{
313-
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1)
313+
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
314+
}, -1)
314315
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))
315316

316317
var buf bytes.Buffer

parquet/file/page_reader.go

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
515515
return p.codec.Decode(buf, data), nil
516516
}
517517

518+
func (p *serializedPageReader) decryptV2(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
519+
// if encrypted, we need to decrypt before decompressing
520+
p.decompressBuffer.ResizeNoShrink(lenCompressed)
521+
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
522+
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
523+
return err
524+
}
525+
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
526+
// encrypted + uncompressed -> just copy the decrypted data to output buffer
527+
if !compressed {
528+
copy(buf, data)
529+
return nil
530+
}
531+
532+
// definition + repetition levels are always uncompressed
533+
if levelsBytelen > 0 {
534+
copy(buf, data[:levelsBytelen])
535+
data = data[levelsBytelen:]
536+
}
537+
p.codec.Decode(buf[levelsBytelen:], data)
538+
return nil
539+
}
540+
541+
func (p *serializedPageReader) decompressV2(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
542+
if !compressed {
543+
// uncompressed, just read into the buffer
544+
if _, err := io.ReadFull(rd, buf); err != nil {
545+
return err
546+
}
547+
return nil
548+
}
549+
550+
// definition + repetition levels are always uncompressed
551+
if levelsBytelen > 0 {
552+
if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil {
553+
return err
554+
}
555+
}
556+
if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil {
557+
return err
558+
}
559+
return nil
560+
}
561+
518562
type dataheader interface {
519563
IsSetStatistics() bool
520564
GetStatistics() *format.Statistics
@@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo
628672
}
629673
continue
630674
}
631-
632675
rd.Discard(len(view) - int(remaining) + extra)
633676
break
634677
}
@@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool {
812855
return false
813856
}
814857

815-
if compressed {
816-
if levelsBytelen > 0 {
817-
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
818-
}
819-
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
858+
if p.cryptoCtx.DataDecryptor != nil {
859+
if err := p.decryptV2(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
860+
p.err = err
820861
return false
821862
}
822863
} else {
823-
io.ReadFull(p.r, buf.Bytes())
864+
if err := p.decompressV2(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
865+
p.err = err
866+
return false
867+
}
824868
}
825869

826870
if buf.Len() != lenUncompressed {

0 commit comments

Comments
 (0)