Skip to content

Commit 38dc64b

Browse files
fix(parquet): decryption of V2 data pages (#596)
### Rationale for this change Fixing the decryption of V2 pages. ### What changes are included in this PR? This PR fixes the decryption of V2 pages. ### Are these changes tested? Yes, tests cover various scenarios for both V1 and V2 page encryption and decryption. ### Are there any user-facing changes? No.
1 parent 2b7d6c4 commit 38dc64b

5 files changed

Lines changed: 141 additions & 53 deletions

File tree

parquet/encryption_read_config_test.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
295295
// Read all rows in column
296296
i = 0
297297
for int96reader.HasNext() {
298-
var (
299-
val [1]parquet.Int96
300-
)
298+
var val [1]parquet.Int96
301299

302300
// read one value at a time. the number of rows read is returned. values
303301
// read contains the number of non-null rows
@@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, en
553551
// once the file is read and the second exists in parquet-testing/data folder
554552
func (d *TestDecryptionSuite) TestDecryption() {
555553
tests := []struct {
556-
file string
557-
config uint
554+
file string
555+
config uint
556+
isInDataStorage bool
558557
}{
559-
{"uniform_encryption.parquet.encrypted", 1},
560-
{"encrypt_columns_and_footer.parquet.encrypted", 2},
561-
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
562-
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
563-
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
564-
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
558+
{"uniform_encryption.parquet.encrypted", 1, true},
559+
{"uniform_encryption.parquet.uncompressed.encrypted", 1, false},
560+
{"uniform_encryption.parquet.v2.encrypted", 1, false},
561+
{"uniform_encryption.parquet.v2.uncompressed.encrypted", 1, false},
562+
{"encrypt_columns_and_footer.parquet.encrypted", 2, true},
563+
{"encrypt_columns_and_footer.parquet.uncompressed.encrypted", 2, false},
564+
{"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false},
565+
{"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false},
566+
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true},
567+
{"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false},
568+
{"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3, false},
569+
{"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3, false},
570+
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true},
571+
{"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false},
572+
{"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4, false},
573+
{"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false},
574+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true},
575+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", 5, false},
576+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5, false},
577+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", 5, false},
578+
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true},
579+
{"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false},
580+
{"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6, false},
581+
{"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false},
565582
}
566583
for _, tt := range tests {
567584
d.Run(tt.file, func() {
@@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() {
576593
}
577594
os.Remove(tmpFile)
578595

579-
file := path.Join(getDataDir(), tt.file)
580-
d.Require().FileExists(file)
596+
if tt.isInDataStorage {
597+
file := path.Join(getDataDir(), tt.file)
598+
d.Require().FileExists(file)
581599

582-
for idx := range d.decryptionConfigs {
583-
decConfig := idx + 1
584-
d.Run(fmt.Sprintf("config %d", decConfig), func() {
585-
d.checkResults(file, uint(decConfig), tt.config)
586-
})
600+
for idx := range d.decryptionConfigs {
601+
decConfig := idx + 1
602+
d.Run(fmt.Sprintf("config %d", decConfig), func() {
603+
d.checkResults(file, uint(decConfig), tt.config)
604+
})
605+
}
587606
}
588607
})
589608
}

parquet/encryption_write_config_test.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ import (
6161
* keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
6262
*/
6363

64-
var (
65-
tempdir string
66-
)
64+
var tempdir string
6765

6866
type EncryptionConfigTestSuite struct {
6967
suite.Suite
@@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct {
7977
columnEncryptionKey2 string
8078
}
8179

82-
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) {
80+
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string, writerOpts ...parquet.WriterProperty) {
8381
filename = filepath.Join(tempdir, filename)
8482

85-
props := parquet.NewWriterProperties(
83+
opts := []parquet.WriterProperty{
8684
parquet.WithPageIndexEnabled(true),
8785
parquet.WithCompression(compress.Codecs.Snappy),
88-
parquet.WithEncryptionProperties(configs))
86+
parquet.WithEncryptionProperties(configs),
87+
}
88+
opts = append(opts, writerOpts...)
89+
props := parquet.NewWriterProperties(opts...)
8990
outFile, err := os.Create(filename)
9091
en.Require().NoError(err)
9192
en.Require().NotNil(outFile)
@@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryption
135136

136137
// write the int64 column, each row repeats twice
137138
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
138-
for i := 0; i < 2*en.rowsPerRG; i++ {
139+
for i := 0; i < en.rowsPerRG; i++ {
139140
var (
140-
defLevel = [1]int16{1}
141-
repLevel = [1]int16{0}
142-
value = int64(i) * 1000 * 1000 * 1000 * 1000
141+
defLevels = []int16{1, 1}
142+
repLevels = []int16{0, 1}
143+
values = []int64{
144+
int64(i*2) * 1000 * 1000 * 1000 * 1000,
145+
int64(i*2+1) * 1000 * 1000 * 1000 * 1000,
146+
}
143147
)
144-
if i%2 == 0 {
145-
repLevel[0] = 0
146-
} else {
147-
repLevel[0] = 1
148-
}
149148

150-
n, err := int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:])
151-
en.EqualValues(1, n)
149+
n, err := int64Writer.WriteBatch(values, defLevels, repLevels)
150+
en.EqualValues(2, n)
152151
en.Require().NoError(err)
153152
}
154153

@@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() {
263262
// (uniform encryption)
264263
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
265264
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
266-
en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
265+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.encrypted")
266+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
267+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
268+
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
269+
parquet.WithDataPageVersion(parquet.DataPageV2))
267270
}
268271

269272
// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
@@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() {
273276
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
274277

275278
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
276-
en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted")
279+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.encrypted")
280+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
281+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
282+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
283+
parquet.WithDataPageVersion(parquet.DataPageV2))
277284
}
278285

279286
// Encryption Config 3: encrypt two columns, with different keys.
@@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() {
285292
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
286293

287294
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter())
288-
en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
295+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
296+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
297+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
298+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
299+
parquet.WithDataPageVersion(parquet.DataPageV2))
289300
}
290301

291302
// Encryption Config 4: Encrypt two columns and the footer, with different keys
@@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
296307
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
297308

298309
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName))
299-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
310+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
311+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
312+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
313+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
314+
parquet.WithDataPageVersion(parquet.DataPageV2))
300315
}
301316

302317
// Encryption Config 5: Encrypt Two columns and the footer, with different keys
@@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
307322
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
308323

309324
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage())
310-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
325+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
326+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
327+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
328+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
329+
parquet.WithDataPageVersion(parquet.DataPageV2))
311330
}
312331

313332
// Encryption Config 6: Encrypt two columns and the footer, with different keys.
@@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() {
318337
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
319338

320339
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
321-
en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
340+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
341+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
342+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
343+
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
344+
parquet.WithDataPageVersion(parquet.DataPageV2))
322345
}
323346

324347
func TestFileEncryption(t *testing.T) {

parquet/file/column_reader_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() {
551551
// bytes: the page header reports 1 byte for repetition levels even
552552
// though the max rep level is 0. If that byte isn't skipped then
553553
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
554-
pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
554+
pageData := [...]byte{
555+
0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
555556
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
556-
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
557+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
558+
}
557559

558560
p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
559561
parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false))
@@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) {
733735

734736
for _, dataPageVersion := range []parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
735737
t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), func(t *testing.T) {
736-
737738
props := parquet.NewWriterProperties(parquet.WithAllocator(mem),
738739
parquet.WithDataPageVersion(dataPageVersion), parquet.WithDataPageSize(1),
739740
parquet.WithPageIndexEnabled(true))

parquet/file/file_reader_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() {
207207

208208
p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
209209
p.dataPageHdrV2.NumValues = nrows
210-
p.WriteDataPageHeaderV2(1024, 20, 10)
210+
p.WriteDataPageHeaderV2(1024, 0, 0)
211211
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
212212
p.True(p.pageReader.Next())
213213
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
@@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() {
310310

311311
func TestWithEOFReader(t *testing.T) {
312312
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{
313-
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1)
313+
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
314+
}, -1)
314315
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))
315316

316317
var buf bytes.Buffer

parquet/file/page_reader.go

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
515515
return p.codec.Decode(buf, data), nil
516516
}
517517

518+
func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
519+
// if encrypted, we need to decrypt before decompressing
520+
p.decompressBuffer.ResizeNoShrink(lenCompressed)
521+
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
522+
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
523+
return err
524+
}
525+
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
526+
// encrypted + uncompressed -> just copy the decrypted data to output buffer
527+
if !compressed {
528+
copy(buf, data)
529+
return nil
530+
}
531+
532+
// definition + repetition levels are always uncompressed
533+
if levelsBytelen > 0 {
534+
copy(buf, data[:levelsBytelen])
535+
data = data[levelsBytelen:]
536+
}
537+
p.codec.Decode(buf[levelsBytelen:], data)
538+
return nil
539+
}
540+
541+
func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
542+
if !compressed {
543+
// uncompressed, just read into the buffer
544+
if _, err := io.ReadFull(rd, buf); err != nil {
545+
return err
546+
}
547+
return nil
548+
}
549+
550+
// definition + repetition levels are always uncompressed
551+
if levelsBytelen > 0 {
552+
if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil {
553+
return err
554+
}
555+
}
556+
if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil {
557+
return err
558+
}
559+
return nil
560+
}
561+
518562
type dataheader interface {
519563
IsSetStatistics() bool
520564
GetStatistics() *format.Statistics
@@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo
628672
}
629673
continue
630674
}
631-
632675
rd.Discard(len(view) - int(remaining) + extra)
633676
break
634677
}
@@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool {
812855
return false
813856
}
814857

815-
if compressed {
816-
if levelsBytelen > 0 {
817-
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
818-
}
819-
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
858+
if p.cryptoCtx.DataDecryptor != nil {
859+
if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
860+
p.err = err
820861
return false
821862
}
822863
} else {
823-
io.ReadFull(p.r, buf.Bytes())
864+
if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
865+
p.err = err
866+
return false
867+
}
824868
}
825869

826870
if buf.Len() != lenUncompressed {

0 commit comments

Comments
 (0)