Skip to content

Commit b0f6e2c

Browse files
authored
fix(parquet/file): regression with decompressing data (#652)
### Rationale for this change fixes #619 ### What changes are included in this PR? Uses the correct buffer during decompression ### Are these changes tested? A test case was added to confirm the issue ### Are there any user-facing changes? only the bug fix
1 parent c03dad8 commit b0f6e2c

2 files changed

Lines changed: 133 additions & 8 deletions

File tree

parquet/file/file_writer_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,3 +1198,119 @@ func TestWriteBloomFilters(t *testing.T) {
11981198
assert.False(t, byteArrayFilter.Check(parquet.ByteArray("bar")))
11991199
assert.False(t, byteArrayFilter.Check(parquet.ByteArray("baz")))
12001200
}
1201+
1202+
// TestBufferedStreamDictionaryCompressed tests the fix for issue #619
1203+
// where BufferedStreamEnabled=true with dictionary encoding and compression
1204+
// caused "dict spaced eof exception" and "snappy: corrupt input" errors.
1205+
// This was due to a bug in the decompress() method that read from the wrong buffer.
1206+
func TestBufferedStreamDictionaryCompressed(t *testing.T) {
1207+
// Create schema with a string column that will use dictionary encoding
1208+
fields := schema.FieldList{
1209+
schema.NewByteArrayNode("dict_col", parquet.Repetitions.Required, -1),
1210+
schema.NewInt32Node("value_col", parquet.Repetitions.Required, -1),
1211+
}
1212+
sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1)
1213+
require.NoError(t, err)
1214+
1215+
sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
1216+
defer sink.Release()
1217+
1218+
// Write with dictionary encoding and Snappy compression
1219+
props := parquet.NewWriterProperties(
1220+
parquet.WithDictionaryDefault(true),
1221+
parquet.WithCompression(compress.Codecs.Snappy),
1222+
)
1223+
1224+
writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))
1225+
rgWriter := writer.AppendBufferedRowGroup()
1226+
1227+
// Write dictionary column with repeated values to trigger dictionary encoding
1228+
dictCol, err := rgWriter.Column(0)
1229+
require.NoError(t, err)
1230+
dictWriter := dictCol.(*file.ByteArrayColumnChunkWriter)
1231+
1232+
const numValues = 1000
1233+
dictValues := make([]parquet.ByteArray, numValues)
1234+
for i := 0; i < numValues; i++ {
1235+
// Use only 10 unique values to ensure dictionary encoding is used
1236+
dictValues[i] = parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
1237+
}
1238+
_, err = dictWriter.WriteBatch(dictValues, nil, nil)
1239+
require.NoError(t, err)
1240+
1241+
// Write value column
1242+
valueCol, err := rgWriter.Column(1)
1243+
require.NoError(t, err)
1244+
valueWriter := valueCol.(*file.Int32ColumnChunkWriter)
1245+
1246+
values := make([]int32, numValues)
1247+
for i := 0; i < numValues; i++ {
1248+
values[i] = int32(i)
1249+
}
1250+
_, err = valueWriter.WriteBatch(values, nil, nil)
1251+
require.NoError(t, err)
1252+
1253+
require.NoError(t, rgWriter.Close())
1254+
require.NoError(t, writer.Close())
1255+
1256+
buffer := sink.Finish()
1257+
defer buffer.Release()
1258+
1259+
// Verify dictionary page was written
1260+
reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()))
1261+
require.NoError(t, err)
1262+
defer reader.Close()
1263+
1264+
rgReader := reader.RowGroup(0)
1265+
chunk, err := rgReader.MetaData().ColumnChunk(0)
1266+
require.NoError(t, err)
1267+
assert.True(t, chunk.HasDictionaryPage(), "Expected dictionary page to be written")
1268+
1269+
// Now read with BufferedStreamEnabled=true (the issue #619 condition)
1270+
readProps := parquet.NewReaderProperties(memory.DefaultAllocator)
1271+
readProps.BufferSize = 1024
1272+
readProps.BufferedStreamEnabled = true
1273+
1274+
bufferedReader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(readProps))
1275+
require.NoError(t, err)
1276+
defer bufferedReader.Close()
1277+
1278+
// Read the data back
1279+
bufferedRgReader := bufferedReader.RowGroup(0)
1280+
assert.EqualValues(t, numValues, bufferedRgReader.NumRows())
1281+
1282+
// Read dictionary column
1283+
dictColReader, err := bufferedRgReader.Column(0)
1284+
require.NoError(t, err)
1285+
dictChunkReader := dictColReader.(*file.ByteArrayColumnChunkReader)
1286+
1287+
readDictValues := make([]parquet.ByteArray, numValues)
1288+
defLevels := make([]int16, numValues)
1289+
repLevels := make([]int16, numValues)
1290+
1291+
total, valuesRead, err := dictChunkReader.ReadBatch(int64(numValues), readDictValues, defLevels, repLevels)
1292+
require.NoError(t, err, "Should not get 'dict spaced eof exception' or 'snappy: corrupt input'")
1293+
assert.EqualValues(t, numValues, total)
1294+
assert.EqualValues(t, numValues, valuesRead)
1295+
1296+
// Verify the data is correct
1297+
for i := 0; i < numValues; i++ {
1298+
expected := parquet.ByteArray(fmt.Sprintf("value_%d", i%10))
1299+
assert.Equal(t, expected, readDictValues[i], "Value mismatch at index %d", i)
1300+
}
1301+
1302+
// Read value column to ensure it also works
1303+
valueColReader, err := bufferedRgReader.Column(1)
1304+
require.NoError(t, err)
1305+
valueChunkReader := valueColReader.(*file.Int32ColumnChunkReader)
1306+
1307+
readValues := make([]int32, numValues)
1308+
total, valuesRead, err = valueChunkReader.ReadBatch(int64(numValues), readValues, defLevels, repLevels)
1309+
require.NoError(t, err)
1310+
assert.EqualValues(t, numValues, total)
1311+
assert.EqualValues(t, numValues, valuesRead)
1312+
1313+
for i := 0; i < numValues; i++ {
1314+
assert.Equal(t, int32(i), readValues[i])
1315+
}
1316+
}

parquet/file/page_reader.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package file
1818

1919
import (
20-
"bytes"
2120
"errors"
2221
"fmt"
2322
"io"
@@ -502,14 +501,19 @@ func (p *serializedPageReader) Page() Page {
502501

503502
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) {
504503
p.decompressBuffer.ResizeNoShrink(lenCompressed)
505-
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
506-
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
504+
505+
// Read directly into the memory.Buffer's backing slice
506+
n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
507+
if err != nil {
507508
return nil, err
508509
}
510+
if n != lenCompressed {
511+
return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n)
512+
}
509513

510-
data := p.decompressBuffer.Bytes()
514+
data := p.decompressBuffer.Bytes()[:lenCompressed]
511515
if p.cryptoCtx.DataDecryptor != nil {
512-
data = p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
516+
data = p.cryptoCtx.DataDecryptor.Decrypt(data)
513517
}
514518

515519
return p.codec.Decode(buf, data), nil
@@ -518,11 +522,16 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
518522
func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
519523
// if encrypted, we need to decrypt before decompressing
520524
p.decompressBuffer.ResizeNoShrink(lenCompressed)
521-
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
522-
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
525+
526+
n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed])
527+
if err != nil {
523528
return err
524529
}
525-
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
530+
if n != lenCompressed {
531+
return fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n)
532+
}
533+
534+
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()[:lenCompressed])
526535
// encrypted + uncompressed -> just copy the decrypted data to output buffer
527536
if !compressed {
528537
copy(buf, data)

0 commit comments

Comments
 (0)