Skip to content

Commit 636a05d

Browse files
perf(parquet): optimize page reader for memory reuse and add comprehensive benchmarks
- Refactor page reader to unify V1/V2 page reading logic with better buffer management - Implement readOrStealData to optimize buffered vs direct reads - Reduce buffer allocations by strategically reusing decompress/data buffers Benchmark infrastructure improvements: - Add TestMain with setup/teardown for shared benchmark files - Create 8 benchmark file variants (V1/V2, plain/snappy, encrypted/unencrypted) - Add BenchmarkReadInt32 and BenchmarkReadInt32Buffered with sub-benchmarks - Move file creation to shared setup to eliminate per-benchmark overhead
1 parent d272032 commit 636a05d

2 files changed

Lines changed: 416 additions & 120 deletions

File tree

parquet/file/column_reader_test.go

Lines changed: 270 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -966,44 +966,213 @@ func TestDecryptColumns(t *testing.T) {
966966
}
967967
}
968968

969-
func BenchmarkReadInt32Column(b *testing.B) {
970-
// generate parquet with RLE-dictionary encoded int32 column
971-
tempdir := b.TempDir()
972-
filepath := filepath.Join(tempdir, "rle-dict-int32.parquet")
969+
var (
970+
benchmarkFilesOnce sync.Once
971+
benchmarkTempDir string
972+
benchmarkFiles = make(map[string]string)
973+
974+
benchmarkKeyRetriever = func() encryption.StringKeyIDRetriever {
975+
stringKr := make(encryption.StringKeyIDRetriever)
976+
stringKr.PutKey(FooterEncryptionKeyID, FooterEncryptionKey)
977+
stringKr.PutKey(ColumnEncryptionKey1ID, ColumnEncryptionKey1)
978+
return stringKr
979+
}
980+
)
981+
982+
func TestMain(m *testing.M) {
983+
// Create benchmark files
984+
err := setupBenchmarkFiles()
985+
if err != nil {
986+
fmt.Fprintf(os.Stderr, "Failed to set up benchmark files: %v\n", err)
987+
os.Exit(1)
988+
}
989+
990+
// Run the tests/benchmarks
991+
code := m.Run()
992+
993+
// Cleanup temp directory after everything is done
994+
if benchmarkTempDir != "" {
995+
os.RemoveAll(benchmarkTempDir)
996+
}
973997

998+
os.Exit(code)
999+
}
1000+
1001+
func setupBenchmarkFiles() error {
1002+
var err error
1003+
benchmarkTempDir, err = os.MkdirTemp("", "parquet-bench-*")
1004+
if err != nil {
1005+
return fmt.Errorf("failed to create temp dir: %v", err)
1006+
}
1007+
1008+
// Create V1 file
1009+
v1File := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.parquet")
9741010
props := parquet.NewWriterProperties(
9751011
parquet.WithDictionaryDefault(true),
9761012
parquet.WithDataPageSize(128*1024*1024), // 128MB
9771013
parquet.WithBatchSize(128*1024*1024),
978-
parquet.WithMaxRowGroupLength(100_000),
1014+
parquet.WithMaxRowGroupLength(1_000_000),
1015+
parquet.WithDataPageVersion(parquet.DataPageV1),
1016+
parquet.WithVersion(parquet.V2_LATEST),
1017+
)
1018+
if err := createBenchmarkFile(v1File, props); err != nil {
1019+
return err
1020+
}
1021+
benchmarkFiles["v1"] = v1File
1022+
1023+
// Create V2 file
1024+
v2File := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.parquet")
1025+
props = parquet.NewWriterProperties(
1026+
parquet.WithDictionaryDefault(true),
1027+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1028+
parquet.WithBatchSize(128*1024*1024),
1029+
parquet.WithMaxRowGroupLength(1_000_000),
1030+
parquet.WithDataPageVersion(parquet.DataPageV2),
1031+
parquet.WithVersion(parquet.V2_LATEST),
1032+
)
1033+
if err := createBenchmarkFile(v2File, props); err != nil {
1034+
return err
1035+
}
1036+
benchmarkFiles["v2"] = v2File
1037+
1038+
// Create V1 Snappy file
1039+
v1SnappyFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.snappy.parquet")
1040+
props = parquet.NewWriterProperties(
1041+
parquet.WithDictionaryDefault(true),
1042+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1043+
parquet.WithBatchSize(128*1024*1024),
1044+
parquet.WithMaxRowGroupLength(1_000_000),
1045+
parquet.WithCompression(compress.Codecs.Snappy),
1046+
parquet.WithDataPageVersion(parquet.DataPageV1),
1047+
parquet.WithVersion(parquet.V2_LATEST),
1048+
)
1049+
if err = createBenchmarkFile(v1SnappyFile, props); err != nil {
1050+
return err
1051+
}
1052+
benchmarkFiles["v1-snappy"] = v1SnappyFile
1053+
1054+
// Create V2 Snappy file
1055+
v2SnappyFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.snappy.parquet")
1056+
props = parquet.NewWriterProperties(
1057+
parquet.WithDictionaryDefault(true),
1058+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1059+
parquet.WithBatchSize(128*1024*1024),
1060+
parquet.WithMaxRowGroupLength(1_000_000),
1061+
parquet.WithCompression(compress.Codecs.Snappy),
1062+
parquet.WithDataPageVersion(parquet.DataPageV2),
1063+
parquet.WithVersion(parquet.V2_LATEST),
1064+
)
1065+
if err = createBenchmarkFile(v2SnappyFile, props); err != nil {
1066+
return err
1067+
}
1068+
benchmarkFiles["v2-snappy"] = v2SnappyFile
1069+
1070+
// Create encrypted files
1071+
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
1072+
encryptCols["col"] = parquet.NewColumnEncryptionProperties("col", parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID(ColumnEncryptionKey1ID))
1073+
encryptProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata(FooterEncryptionKeyID),
1074+
parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
1075+
1076+
// Create V1 encrypted file
1077+
v1EncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.encrypted.parquet")
1078+
props = parquet.NewWriterProperties(
1079+
parquet.WithDictionaryDefault(true),
1080+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1081+
parquet.WithBatchSize(128*1024*1024),
1082+
parquet.WithMaxRowGroupLength(1_000_000),
1083+
parquet.WithDataPageVersion(parquet.DataPageV1),
1084+
parquet.WithVersion(parquet.V2_LATEST),
1085+
parquet.WithEncryptionProperties(encryptProps.Clone("")),
1086+
)
1087+
if err := createBenchmarkFile(v1EncFile, props); err != nil {
1088+
return err
1089+
}
1090+
benchmarkFiles["v1-encrypted"] = v1EncFile
1091+
1092+
// Create V2 encrypted file
1093+
v2EncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.encrypted.parquet")
1094+
props = parquet.NewWriterProperties(
1095+
parquet.WithDictionaryDefault(true),
1096+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1097+
parquet.WithBatchSize(128*1024*1024),
1098+
parquet.WithMaxRowGroupLength(1_000_000),
1099+
parquet.WithDataPageVersion(parquet.DataPageV2),
1100+
parquet.WithVersion(parquet.V2_LATEST),
1101+
parquet.WithEncryptionProperties(encryptProps.Clone("")),
1102+
)
1103+
if err := createBenchmarkFile(v2EncFile, props); err != nil {
1104+
return err
1105+
}
1106+
benchmarkFiles["v2-encrypted"] = v2EncFile
1107+
1108+
// Create V1 Snappy encrypted file
1109+
v1SnappyEncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.snappy.encrypted.parquet")
1110+
props = parquet.NewWriterProperties(
1111+
parquet.WithDictionaryDefault(true),
1112+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1113+
parquet.WithBatchSize(128*1024*1024),
1114+
parquet.WithMaxRowGroupLength(1_000_000),
1115+
parquet.WithCompression(compress.Codecs.Snappy),
1116+
parquet.WithDataPageVersion(parquet.DataPageV1),
1117+
parquet.WithVersion(parquet.V2_LATEST),
1118+
parquet.WithEncryptionProperties(encryptProps.Clone("")),
1119+
)
1120+
if err := createBenchmarkFile(v1SnappyEncFile, props); err != nil {
1121+
return err
1122+
}
1123+
benchmarkFiles["v1-snappy-encrypted"] = v1SnappyEncFile
1124+
1125+
// Create V2 Snappy encrypted file
1126+
v2SnappyEncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.snappy.encrypted.parquet")
1127+
props = parquet.NewWriterProperties(
1128+
parquet.WithDictionaryDefault(true),
1129+
parquet.WithDataPageSize(128*1024*1024), // 128MB
1130+
parquet.WithBatchSize(128*1024*1024),
1131+
parquet.WithMaxRowGroupLength(1_000_000),
1132+
parquet.WithCompression(compress.Codecs.Snappy),
9791133
parquet.WithDataPageVersion(parquet.DataPageV2),
9801134
parquet.WithVersion(parquet.V2_LATEST),
1135+
parquet.WithEncryptionProperties(encryptProps.Clone("")),
9811136
)
1137+
if err := createBenchmarkFile(v2SnappyEncFile, props); err != nil {
1138+
return err
1139+
}
1140+
benchmarkFiles["v2-snappy-encrypted"] = v2SnappyEncFile
1141+
1142+
return nil
1143+
}
1144+
1145+
func createBenchmarkFile(filepath string, props *parquet.WriterProperties) error {
9821146
outFile, err := os.Create(filepath)
983-
require.NoError(b, err)
1147+
if err != nil {
1148+
return fmt.Errorf("failed to create benchmark file: %v", err)
1149+
}
1150+
defer outFile.Close()
9841151

9851152
sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
9861153
schema.NewInt32Node("col", parquet.Repetitions.Required, -1),
9871154
}, -1)
988-
require.NoError(b, err)
1155+
if err != nil {
1156+
return fmt.Errorf("failed to create schema: %v", err)
1157+
}
9891158

9901159
writer := file.NewParquetWriter(outFile, sc, file.WithWriterProps(props))
9911160

992-
// 10 row groups of 100 000 rows = 1 000 000 rows in total
1161+
// 10 row groups of 1_000_000 rows = 10_000_000 rows in total
9931162
value := int32(1)
9941163
for range 10 {
9951164
rgWriter := writer.AppendBufferedRowGroup()
9961165
cwr, _ := rgWriter.Column(0)
9971166
cw := cwr.(*file.Int32ColumnChunkWriter)
998-
valuesIn := make([]int32, 0, 100_000)
1167+
valuesIn := make([]int32, 0, 1_000_000)
9991168
repeats := 1
1000-
for len(valuesIn) < 100_000 {
1169+
for len(valuesIn) < 1_000_000 {
10011170
repeatedValue := make([]int32, repeats)
10021171
for i := range repeatedValue {
10031172
repeatedValue[i] = value
10041173
}
1005-
if len(valuesIn)+len(repeatedValue) > 100_000 {
1006-
repeatedValue = repeatedValue[:100_000-len(valuesIn)]
1174+
if len(valuesIn)+len(repeatedValue) > 1_000_000 {
1175+
repeatedValue = repeatedValue[:1_000_000-len(valuesIn)]
10071176
}
10081177
valuesIn = append(valuesIn, repeatedValue[:]...)
10091178
// repeat values from 1 to 50 times
@@ -1013,16 +1182,27 @@ func BenchmarkReadInt32Column(b *testing.B) {
10131182
cw.WriteBatch(valuesIn, nil, nil)
10141183
rgWriter.Close()
10151184
}
1016-
err = writer.Close()
1017-
require.NoError(b, err)
1185+
if err := writer.Close(); err != nil {
1186+
return fmt.Errorf("failed to close parquet writer: %v", err)
1187+
}
1188+
return nil
1189+
}
1190+
1191+
func benchmarkReadInt32ColumnWithDecryption(b *testing.B, filepath string, readProps *parquet.ReaderProperties) {
1192+
var reader *file.Reader
1193+
var err error
10181194

1019-
reader, err := file.OpenParquetFile(filepath, false)
1195+
if readProps != nil {
1196+
reader, err = file.OpenParquetFile(filepath, false, file.WithReadProps(readProps))
1197+
} else {
1198+
reader, err = file.OpenParquetFile(filepath, false)
1199+
}
10201200
require.NoError(b, err)
10211201
defer reader.Close()
10221202

10231203
numValues := reader.NumRows()
10241204
values := make([]int32, numValues)
1025-
b.StopTimer()
1205+
10261206
b.ResetTimer()
10271207
for i := 0; i < b.N; i++ {
10281208
startIndex := 0
@@ -1034,9 +1214,7 @@ func BenchmarkReadInt32Column(b *testing.B) {
10341214
cr, ok := colReader.(*file.Int32ColumnChunkReader)
10351215
require.True(b, ok)
10361216

1037-
b.StartTimer()
1038-
_, valuesRead, err := cr.ReadBatch(rgReader.NumRows(), values, nil, nil)
1039-
b.StopTimer()
1217+
_, valuesRead, err := cr.ReadBatch(rgReader.NumRows(), values[startIndex:], nil, nil)
10401218
require.NoError(b, err)
10411219

10421220
startIndex += valuesRead
@@ -1045,3 +1223,76 @@ func BenchmarkReadInt32Column(b *testing.B) {
10451223
require.Equal(b, numValues, int64(startIndex))
10461224
}
10471225
}
1226+
1227+
func BenchmarkReadInt32(b *testing.B) {
1228+
b.Run("V1Page", func(b *testing.B) {
1229+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1"], nil)
1230+
})
1231+
b.Run("V2Page", func(b *testing.B) {
1232+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2"], nil)
1233+
})
1234+
b.Run("V1PageSnappy", func(b *testing.B) {
1235+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy"], nil)
1236+
})
1237+
b.Run("V2PageSnappy", func(b *testing.B) {
1238+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy"], nil)
1239+
})
1240+
1241+
// Set up decryption properties
1242+
decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(benchmarkKeyRetriever()))
1243+
readProps := parquet.NewReaderProperties(mem)
1244+
b.Run("V1PageEncrypted", func(b *testing.B) {
1245+
readProps.FileDecryptProps = decryptProps.Clone("")
1246+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-encrypted"], readProps)
1247+
})
1248+
b.Run("V2PageEncrypted", func(b *testing.B) {
1249+
readProps.FileDecryptProps = decryptProps.Clone("")
1250+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-encrypted"], readProps)
1251+
})
1252+
1253+
b.Run("V1PageSnappyEncrypted", func(b *testing.B) {
1254+
readProps.FileDecryptProps = decryptProps.Clone("")
1255+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy-encrypted"], readProps)
1256+
})
1257+
1258+
b.Run("V2PageSnappyEncrypted", func(b *testing.B) {
1259+
readProps.FileDecryptProps = decryptProps.Clone("")
1260+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy-encrypted"], readProps)
1261+
})
1262+
}
1263+
1264+
func BenchmarkReadInt32Buffered(b *testing.B) {
1265+
readProps := parquet.NewReaderProperties(mem)
1266+
readProps.BufferedStreamEnabled = true
1267+
b.Run("V1Page", func(b *testing.B) {
1268+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1"], readProps)
1269+
})
1270+
b.Run("V2Page", func(b *testing.B) {
1271+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2"], readProps)
1272+
})
1273+
b.Run("V1PageSnappy", func(b *testing.B) {
1274+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy"], readProps)
1275+
})
1276+
b.Run("V2PageSnappy", func(b *testing.B) {
1277+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy"], readProps)
1278+
})
1279+
1280+
// Set up decryption properties
1281+
decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(benchmarkKeyRetriever()))
1282+
b.Run("V1PageEncrypted", func(b *testing.B) {
1283+
readProps.FileDecryptProps = decryptProps.Clone("")
1284+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-encrypted"], readProps)
1285+
})
1286+
b.Run("V2PageEncrypted", func(b *testing.B) {
1287+
readProps.FileDecryptProps = decryptProps.Clone("")
1288+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-encrypted"], readProps)
1289+
})
1290+
b.Run("V1PageSnappyEncrypted", func(b *testing.B) {
1291+
readProps.FileDecryptProps = decryptProps.Clone("")
1292+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy-encrypted"], readProps)
1293+
})
1294+
b.Run("V2PageSnappyEncrypted", func(b *testing.B) {
1295+
readProps.FileDecryptProps = decryptProps.Clone("")
1296+
benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy-encrypted"], readProps)
1297+
})
1298+
}

0 commit comments

Comments
 (0)