@@ -33,7 +33,9 @@ import (
3333 "github.com/apache/arrow-go/v18/arrow/memory"
3434 "github.com/apache/arrow-go/v18/internal/utils"
3535 "github.com/apache/arrow-go/v18/parquet"
36+ "github.com/apache/arrow-go/v18/parquet/compress"
3637 "github.com/apache/arrow-go/v18/parquet/file"
38+ "github.com/apache/arrow-go/v18/parquet/internal/encryption"
3739 "github.com/apache/arrow-go/v18/parquet/internal/testutils"
3840 "github.com/apache/arrow-go/v18/parquet/pqarrow"
3941 "github.com/apache/arrow-go/v18/parquet/schema"
@@ -42,6 +44,17 @@ import (
4244 "github.com/stretchr/testify/suite"
4345)
4446
47+ const (
48+ FooterEncryptionKey = "0123456789012345"
49+ ColumnEncryptionKey1 = "1234567890123450"
50+ ColumnEncryptionKey2 = "1234567890123451"
51+ ColumnEncryptionKey3 = "1234567890123452"
52+ FooterEncryptionKeyID = "kf"
53+ ColumnEncryptionKey1ID = "kc1"
54+ ColumnEncryptionKey2ID = "kc2"
55+ ColumnEncryptionKey3ID = "kc3"
56+ )
57+
4558func initValues (values reflect.Value ) {
4659 if values .Kind () != reflect .Slice {
4760 panic ("must init values with slice" )
@@ -814,6 +827,145 @@ func TestFullSeekRow(t *testing.T) {
814827 }
815828}
816829
830+ func checkDecryptedValues (t * testing.T , writerProps * parquet.WriterProperties , readProps * parquet.ReaderProperties ) {
831+ sc := arrow .NewSchema ([]arrow.Field {
832+ {Name : "c0" , Type : arrow .PrimitiveTypes .Int64 , Nullable : true },
833+ {Name : "c1" , Type : arrow .BinaryTypes .String , Nullable : true },
834+ {Name : "c2" , Type : arrow .ListOf (arrow .PrimitiveTypes .Int64 ), Nullable : true },
835+ }, nil )
836+
837+ tbl , err := array .TableFromJSON (mem , sc , []string {`[
838+ {"c0": 1, "c1": "a", "c2": [1]},
839+ {"c0": 2, "c1": "b", "c2": [1, 2]},
840+ {"c0": 3, "c1": "c", "c2": [null]},
841+ {"c0": null, "c1": "d", "c2": []},
842+ {"c0": 5, "c1": null, "c2": [3, 3, 3]},
843+ {"c0": 6, "c1": "f", "c2": null}
844+ ]` })
845+ require .NoError (t , err )
846+ defer tbl .Release ()
847+
848+ schema := tbl .Schema ()
849+ arrWriterProps := pqarrow .NewArrowWriterProperties ()
850+
851+ var buf bytes.Buffer
852+ wr , err := pqarrow .NewFileWriter (schema , & buf , writerProps , arrWriterProps )
853+ require .NoError (t , err )
854+
855+ require .NoError (t , wr .WriteTable (tbl , tbl .NumRows ()))
856+ require .NoError (t , wr .Close ())
857+
858+ rdr , err := file .NewParquetReader (bytes .NewReader (buf .Bytes ()), file .WithReadProps (readProps ))
859+ require .NoError (t , err )
860+ defer rdr .Close ()
861+
862+ rgr := rdr .RowGroup (0 )
863+ col0 , err := rgr .Column (0 )
864+ require .NoError (t , err )
865+
866+ icr := col0 .(* file.Int64ColumnChunkReader )
867+ // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently
868+
869+ vals := make ([]int64 , 6 )
870+ defLvls := make ([]int16 , 6 )
871+ repLvls := make ([]int16 , 6 )
872+
873+ totalLvls , read , err := icr .ReadBatch (6 , vals , defLvls , repLvls )
874+ require .NoError (t , err )
875+ assert .EqualValues (t , 6 , totalLvls )
876+ assert .EqualValues (t , 5 , read )
877+ assert .Equal (t , []int64 {1 , 2 , 3 , 5 , 6 }, vals [:read ])
878+ assert .Equal (t , []int16 {1 , 1 , 1 , 0 , 1 , 1 }, defLvls [:totalLvls ])
879+ assert .Equal (t , []int16 {0 , 0 , 0 , 0 , 0 , 0 }, repLvls [:totalLvls ])
880+
881+ col1 , err := rgr .Column (1 )
882+ require .NoError (t , err )
883+
884+ scr := col1 .(* file.ByteArrayColumnChunkReader )
885+
886+ bavals := make ([]parquet.ByteArray , 6 )
887+ badefLvls := make ([]int16 , 6 )
888+ barepLvls := make ([]int16 , 6 )
889+
890+ totalLvls , read , err = scr .ReadBatch (6 , bavals , badefLvls , barepLvls )
891+ require .NoError (t , err )
892+ assert .EqualValues (t , 6 , totalLvls )
893+ assert .EqualValues (t , 5 , read )
894+ expectedBAs := []parquet.ByteArray {
895+ []byte ("a" ),
896+ []byte ("b" ),
897+ []byte ("c" ),
898+ []byte ("d" ),
899+ []byte ("f" ),
900+ }
901+ assert .Equal (t , expectedBAs , bavals [:read ])
902+ assert .Equal (t , []int16 {1 , 1 , 1 , 1 , 0 , 1 }, badefLvls [:totalLvls ])
903+ assert .Equal (t , []int16 {0 , 0 , 0 , 0 , 0 , 0 }, barepLvls [:totalLvls ])
904+
905+ col2 , err := rgr .Column (2 )
906+ require .NoError (t , err )
907+
908+ lcr := col2 .(* file.Int64ColumnChunkReader )
909+ vals = make ([]int64 , 10 )
910+ defLvls = make ([]int16 , 10 )
911+ repLvls = make ([]int16 , 10 )
912+ totalLvls , read , err = lcr .ReadBatch (6 , vals , defLvls , repLvls )
913+ require .NoError (t , err )
914+
915+ assert .EqualValues (t , 6 , totalLvls )
916+ assert .EqualValues (t , 4 , read )
917+
918+ assert .Equal (t , []int64 {1 , 1 , 2 , 3 }, vals [:read ])
919+ assert .Equal (t , []int16 {3 , 3 , 3 , 2 , 1 , 3 }, defLvls [:totalLvls ])
920+ assert .Equal (t , []int16 {0 , 0 , 1 , 0 , 0 , 0 }, repLvls [:totalLvls ])
921+ }
922+
923+ func TestDecryptColumns (t * testing.T ) {
924+ encryptCols := make (parquet.ColumnPathToEncryptionPropsMap )
925+ encryptCols ["c0" ] = parquet .NewColumnEncryptionProperties ("c0" , parquet .WithKey (ColumnEncryptionKey1 ), parquet .WithKeyID (ColumnEncryptionKey1ID ))
926+ encryptCols ["c1" ] = parquet .NewColumnEncryptionProperties ("c1" , parquet .WithKey (ColumnEncryptionKey2 ), parquet .WithKeyID (ColumnEncryptionKey2ID ))
927+ encryptCols ["c2.list.element" ] = parquet .NewColumnEncryptionProperties ("c2.list.element" , parquet .WithKey (ColumnEncryptionKey3 ), parquet .WithKeyID (ColumnEncryptionKey3ID ))
928+ encryptProps := parquet .NewFileEncryptionProperties (FooterEncryptionKey , parquet .WithFooterKeyMetadata (FooterEncryptionKeyID ),
929+ parquet .WithEncryptedColumns (encryptCols ), parquet .WithAlg (parquet .AesCtr ))
930+
931+ stringKr1 := make (encryption.StringKeyIDRetriever )
932+ stringKr1 .PutKey (FooterEncryptionKeyID , FooterEncryptionKey )
933+ stringKr1 .PutKey (ColumnEncryptionKey1ID , ColumnEncryptionKey1 )
934+ stringKr1 .PutKey (ColumnEncryptionKey2ID , ColumnEncryptionKey2 )
935+ stringKr1 .PutKey (ColumnEncryptionKey3ID , ColumnEncryptionKey3 )
936+ decryptProps := parquet .NewFileDecryptionProperties (parquet .WithKeyRetriever (stringKr1 ))
937+
938+ tests := []struct {
939+ name string
940+ dataPageVersion parquet.DataPageVersion
941+ bufferedStream bool
942+ compression compress.Compression
943+ }{
944+ {"DataPageV2_BufferedRead" , parquet .DataPageV2 , true , compress .Codecs .Uncompressed },
945+ {"DataPageV2_DirectRead" , parquet .DataPageV2 , false , compress .Codecs .Uncompressed },
946+ {"DataPageV2_BufferedRead_Compressed" , parquet .DataPageV2 , true , compress .Codecs .Snappy },
947+ {"DataPageV2_DirectRead_Compressed" , parquet .DataPageV2 , false , compress .Codecs .Snappy },
948+ // {"DataPageV1_BufferedRead", parquet.DataPageV1, true, compress.Codecs.Uncompressed},
949+ // {"DataPageV1_DirectRead", parquet.DataPageV1, false, compress.Codecs.Uncompressed},
950+ // {"DataPageV1_BufferedRead_Compressed", parquet.DataPageV1, true, compress.Codecs.Snappy},
951+ // {"DataPageV1_DirectRead_Compressed", parquet.DataPageV1, false, compress.Codecs.Snappy},
952+ }
953+
954+ for _ , tt := range tests {
955+ t .Run (tt .name , func (t * testing.T ) {
956+ writerProps := parquet .NewWriterProperties (
957+ parquet .WithDataPageVersion (tt .dataPageVersion ),
958+ parquet .WithEncryptionProperties (encryptProps .Clone ("" )),
959+ parquet .WithCompression (tt .compression ),
960+ )
961+ readProps := parquet .NewReaderProperties (nil )
962+ readProps .FileDecryptProps = decryptProps .Clone ("" )
963+ readProps .BufferedStreamEnabled = tt .bufferedStream
964+ checkDecryptedValues (t , writerProps , readProps )
965+ })
966+ }
967+ }
968+
817969func BenchmarkReadInt32Column (b * testing.B ) {
818970 // generate parquet with RLE-dictionary encoded int32 column
819971 tempdir := b .TempDir ()
0 commit comments