Skip to content

Commit efc538a

Browse files
authored
fix(parquet/pqarrow): fix Decimal256 sign extension (#711)
bigEndianToDecimal256 in column_readers.go has a bug in the partial-word sign extension path: it shifts by wordLen (byte count) instead of wordLen*8 (bit count). ### Rationale for this change Found while reading a parquet file w/ a certain Decimal256 using go-arrow ### What changes are included in this PR? fix for bigEndianToDecimal256 ### Are these changes tested? Yes, round-trip test included ### Are there any user-facing changes? None
1 parent 9a3edcc commit efc538a

2 files changed

Lines changed: 93 additions & 2 deletions

File tree

parquet/pqarrow/column_readers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -939,8 +939,8 @@ func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) {
939939
result := initWord
940940
if len(buf) > 0 {
941941
// incorporate the actual values if present
942-
// shift left enough bits to make room for the incoming int64
943-
result = result << uint64(wordLen)
942+
// shift left enough bits to make room for the incoming bytes
943+
result = result << uint64(wordLen*8)
944944
// preserve the upper bits by inplace OR-ing the int64
945945
result |= uint64FromBigEndianShifted(word)
946946
}

parquet/pqarrow/encode_arrow_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package pqarrow_test
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/binary"
2223
"fmt"
2324
"math"
25+
"math/big"
2426
"strconv"
2527
"strings"
2628
"testing"
@@ -1229,6 +1231,95 @@ func (ps *ParquetIOTestSuite) TestReadDecimal256() {
12291231
ps.Truef(array.Equal(expected, chunked.Chunk(0)), "expected: %s\ngot: %s", expected, chunked.Chunk(0))
12301232
}
12311233

1234+
// TestReadDecimal256PartialWord verifies that bigEndianToDecimal256 correctly
1235+
// handles byte arrays whose length is not a multiple of 8, exercising the
1236+
// partial-word sign-extension path. This is a regression test for a bug where
1237+
// the shift was by wordLen (bytes) instead of wordLen*8 (bits).
1238+
func (ps *ParquetIOTestSuite) TestReadDecimal256PartialWord() {
1239+
// decimal256ToBigEndian converts a big.Int to big-endian two's complement
1240+
// bytes truncated to byteWidth, simulating what the Parquet encoder writes.
1241+
decimal256ToBigEndian := func(bi *big.Int, byteWidth int) []byte {
1242+
num := decimal256.FromBigInt(bi)
1243+
vals := num.Array()
1244+
var full [32]byte
1245+
binary.BigEndian.PutUint64(full[0:], vals[3])
1246+
binary.BigEndian.PutUint64(full[8:], vals[2])
1247+
binary.BigEndian.PutUint64(full[16:], vals[1])
1248+
binary.BigEndian.PutUint64(full[24:], vals[0])
1249+
return append([]byte{}, full[32-byteWidth:]...)
1250+
}
1251+
1252+
// maxForPrecision returns 10^precision - 1 as a *big.Int.
1253+
maxForPrecision := func(precision int) *big.Int {
1254+
v := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(precision)), nil)
1255+
return v.Sub(v, big.NewInt(1))
1256+
}
1257+
1258+
// One precision per partial-word remainder (1-7), plus remainder 0 as sanity.
1259+
// precision : DecimalSize : byteWidth % 8
1260+
// 39 : 17 : 1 41 : 18 : 2 45 : 19 : 3 47 : 20 : 4
1261+
// 49 : 21 : 5 51 : 22 : 6 53 : 23 : 7 76 : 32 : 0
1262+
precisions := []int32{39, 41, 45, 47, 49, 51, 53, 76}
1263+
1264+
for _, precision := range precisions {
1265+
byteWidth := int(pqarrow.DecimalSize(precision))
1266+
maxVal := maxForPrecision(int(precision))
1267+
minVal := new(big.Int).Neg(maxVal)
1268+
1269+
tests := []struct {
1270+
name string
1271+
val *big.Int
1272+
}{
1273+
{"max_positive", maxVal},
1274+
{"max_negative", minVal},
1275+
{"zero", big.NewInt(0)},
1276+
{"minus_one", big.NewInt(-1)},
1277+
}
1278+
1279+
for _, tt := range tests {
1280+
ps.Run(fmt.Sprintf("p%d_bw%d_r%d/%s", precision, byteWidth, byteWidth%8, tt.name), func() {
1281+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
1282+
defer mem.AssertSize(ps.T(), 0)
1283+
1284+
bigEndian := []parquet.ByteArray{decimal256ToBigEndian(tt.val, byteWidth)}
1285+
1286+
bldr := array.NewDecimal256Builder(mem, &arrow.Decimal256Type{Precision: precision, Scale: 0})
1287+
defer bldr.Release()
1288+
bldr.Append(decimal256.FromBigInt(tt.val))
1289+
expected := bldr.NewDecimal256Array()
1290+
defer expected.Release()
1291+
1292+
sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
1293+
schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required,
1294+
schema.NewDecimalLogicalType(precision, 0), parquet.Types.ByteArray, -1, -1)),
1295+
}, -1))
1296+
1297+
sink := encoding.NewBufferWriter(0, mem)
1298+
defer sink.Release()
1299+
writer := file.NewParquetWriter(sink, sc)
1300+
rgw := writer.AppendRowGroup()
1301+
cw, _ := rgw.NextColumn()
1302+
cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
1303+
cw.Close()
1304+
rgw.Close()
1305+
writer.Close()
1306+
1307+
rdr := ps.createReader(mem, sink.Bytes())
1308+
cr, err := rdr.GetColumn(context.TODO(), 0)
1309+
ps.Require().NoError(err)
1310+
1311+
chunked, err := cr.NextBatch(smallSize)
1312+
ps.Require().NoError(err)
1313+
defer chunked.Release()
1314+
1315+
ps.Require().Len(chunked.Chunks(), 1)
1316+
ps.Truef(array.Equal(expected, chunked.Chunk(0)),
1317+
"expected: %s\ngot: %s", expected, chunked.Chunk(0))
1318+
})
1319+
}
1320+
}
1321+
}
1322+
12321323
func (ps *ParquetIOTestSuite) TestReadNestedStruct() {
12331324
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
12341325
defer mem.AssertSize(ps.T(), 0)

0 commit comments

Comments
 (0)