Skip to content

Commit a97bd0b

Browse files
authored
fix(parquet/pqarrow): normalize the element name in the stored ARROW:schema (#746)
### Rationale for this change closes #744 ### What changes are included in this PR? Normalizes the element name of list fields in the stored ARROW:schema of a parquet file to avoid mismatches. ### Are these changes tested? Yes, a new unit test is added. ### Are there any user-facing changes? Yes, should be a bug fix to ensure consistency.
1 parent 65f1182 commit a97bd0b

4 files changed

Lines changed: 184 additions & 6 deletions

File tree

parquet/pqarrow/encode_arrow_test.go

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pqarrow_test
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/base64"
2223
"encoding/binary"
2324
"fmt"
2425
"math"
@@ -1532,9 +1533,9 @@ func makeListArray(values arrow.Array, size, nullcount int) arrow.Array {
15321533
nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size))))
15331534

15341535
curOffset := 0
1535-
for i := 0; i < size; i++ {
1536+
for i := range size {
15361537
offsetsArr[i] = int32(curOffset)
1537-
if !(((i % 2) == 0) && ((i / 2) < nullcount)) {
1538+
if i%2 != 0 || i/2 >= nullcount {
15381539
// non-null list (list with index 1 is always empty)
15391540
bitutil.SetBit(nullBitmap, i)
15401541
if i != 1 {
@@ -2108,6 +2109,105 @@ func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() {
21082109
ps.roundTripTable(mem, expected, false)
21092110
}
21102111

2112+
// TestListOfStructWithEmptyListStoreSchema tests that ARROW:schema metadata stored
2113+
// in a Parquet file uses "element" (not "item") as the list element field name, to
2114+
// match the actual Parquet column paths. This is required for compatibility with
2115+
// readers like Snowflake that resolve columns by matching ARROW:schema field names
2116+
// to Parquet column path segments. See https://github.com/apache/arrow-go/issues/744.
2117+
func TestListOfStructWithEmptyListStoreSchema(t *testing.T) {
2118+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
2119+
defer mem.AssertSize(t, 0)
2120+
2121+
opsStruct := arrow.StructOf(
2122+
arrow.Field{Name: "id", Type: arrow.BinaryTypes.String, Nullable: false},
2123+
arrow.Field{Name: "token", Type: arrow.BinaryTypes.String, Nullable: true},
2124+
arrow.Field{Name: "amount", Type: arrow.BinaryTypes.String, Nullable: true},
2125+
)
2126+
// arrow.ListOf uses "item" as the element field name, which would mismatch
2127+
// the Parquet column path that uses "element". The fix ensures the stored
2128+
// ARROW:schema uses "element" to stay consistent with the Parquet columns.
2129+
schema := arrow.NewSchema([]arrow.Field{
2130+
{Name: "block_num", Type: arrow.PrimitiveTypes.Uint64, Nullable: false},
2131+
{Name: "tx_id", Type: arrow.BinaryTypes.String, Nullable: false},
2132+
{Name: "ops", Type: arrow.ListOf(opsStruct), Nullable: true},
2133+
}, nil)
2134+
2135+
b := array.NewRecordBuilder(mem, schema)
2136+
defer b.Release()
2137+
2138+
b.Field(0).(*array.Uint64Builder).AppendValues([]uint64{100, 101, 102}, nil)
2139+
b.Field(1).(*array.StringBuilder).AppendValues([]string{"tx-a", "tx-b", "tx-c"}, nil)
2140+
2141+
lb := b.Field(2).(*array.ListBuilder)
2142+
sb := lb.ValueBuilder().(*array.StructBuilder)
2143+
idb := sb.FieldBuilder(0).(*array.StringBuilder)
2144+
tokb := sb.FieldBuilder(1).(*array.StringBuilder)
2145+
amtb := sb.FieldBuilder(2).(*array.StringBuilder)
2146+
2147+
lb.Append(true)
2148+
sb.Append(true)
2149+
idb.Append("op-1")
2150+
tokb.Append("USDC")
2151+
amtb.Append("10")
2152+
sb.Append(true)
2153+
idb.Append("op-2")
2154+
tokb.Append("ETH")
2155+
amtb.Append("1.5")
2156+
lb.Append(true) // empty list
2157+
lb.Append(true)
2158+
sb.Append(true)
2159+
idb.Append("op-3")
2160+
tokb.AppendNull()
2161+
amtb.Append("42")
2162+
2163+
rec := b.NewRecordBatch()
2164+
defer rec.Release()
2165+
2166+
var buf bytes.Buffer
2167+
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), parquet.WithStats(true))
2168+
arrowProps := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
2169+
2170+
pw, err := pqarrow.NewFileWriter(schema, &buf, props, arrowProps)
2171+
require.NoError(t, err)
2172+
require.NoError(t, pw.Write(rec))
2173+
require.NoError(t, pw.Close())
2174+
2175+
// Verify round-trip data is correct.
2176+
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
2177+
require.NoError(t, err)
2178+
defer pf.Close()
2179+
2180+
fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
2181+
require.NoError(t, err)
2182+
2183+
tbl, err := fr.ReadTable(context.Background())
2184+
require.NoError(t, err)
2185+
defer tbl.Release()
2186+
2187+
require.EqualValues(t, 3, tbl.NumRows())
2188+
2189+
// Verify the stored ARROW:schema uses "element" as the list element field name
2190+
// (consistent with the Parquet column path "ops.list.element.*"), not "item"
2191+
// (the default Arrow field name from arrow.ListOf()).
2192+
arrowSchemaEncoded := pf.MetaData().KeyValueMetadata().FindValue("ARROW:schema")
2193+
require.NotNil(t, arrowSchemaEncoded, "ARROW:schema metadata key must be present")
2194+
decoded, err := base64.StdEncoding.DecodeString(*arrowSchemaEncoded)
2195+
require.NoError(t, err)
2196+
// DeserializeSchema wraps bytes in an IPC stream; use ipc.NewReader to decode.
2197+
ipcRdr, err := ipc.NewReader(bytes.NewReader(decoded), ipc.WithAllocator(mem))
2198+
require.NoError(t, err)
2199+
defer ipcRdr.Release()
2200+
storedSchema := ipcRdr.Schema()
2201+
2202+
opsField, ok := storedSchema.FieldsByName("ops")
2203+
require.True(t, ok)
2204+
opsListType, ok := opsField[0].Type.(*arrow.ListType)
2205+
require.True(t, ok)
2206+
// Must be "element" (matching Parquet column path) not "item" (Arrow default).
2207+
assert.Equal(t, "element", opsListType.ElemField().Name,
2208+
"ARROW:schema element name must match the Parquet column path segment")
2209+
}
2210+
21112211
func TestParquetArrowIO(t *testing.T) {
21122212
suite.Run(t, new(ParquetIOTestSuite))
21132213
}

parquet/pqarrow/file_writer.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,74 @@ import (
3333
"github.com/apache/arrow-go/v18/parquet/metadata"
3434
)
3535

36+
// normalizeFieldForParquet recursively normalizes an Arrow field so that its
37+
// type matches the Parquet column structure that fieldToNode would produce.
38+
// Specifically, list element field names are set to "element" because
39+
// ListOfWithName (used by fieldToNode) always names the Parquet element group
40+
// "element", regardless of the original Arrow element field name.
41+
func normalizeFieldForParquet(f arrow.Field) (arrow.Field, error) {
42+
switch dt := f.Type.(type) {
43+
case *arrow.ListType:
44+
elem, err := normalizeFieldForParquet(dt.ElemField())
45+
if err != nil {
46+
return arrow.Field{}, err
47+
}
48+
elem.Name = "element"
49+
return arrow.Field{Name: f.Name, Type: arrow.ListOfField(elem), Nullable: f.Nullable, Metadata: f.Metadata}, nil
50+
case *arrow.FixedSizeListType:
51+
elem, err := normalizeFieldForParquet(dt.ElemField())
52+
if err != nil {
53+
return arrow.Field{}, err
54+
}
55+
elem.Name = "element"
56+
return arrow.Field{Name: f.Name, Type: arrow.FixedSizeListOfField(dt.Len(), elem), Nullable: f.Nullable, Metadata: f.Metadata}, nil
57+
case *arrow.StructType:
58+
fields := make([]arrow.Field, dt.NumFields())
59+
for i := 0; i < dt.NumFields(); i++ {
60+
field, err := normalizeFieldForParquet(dt.Field(i))
61+
if err != nil {
62+
return arrow.Field{}, err
63+
}
64+
fields[i] = field
65+
}
66+
return arrow.Field{Name: f.Name, Type: arrow.StructOf(fields...), Nullable: f.Nullable, Metadata: f.Metadata}, nil
67+
case *arrow.MapType:
68+
key, err := normalizeFieldForParquet(dt.KeyField())
69+
if err != nil {
70+
return arrow.Field{}, err
71+
}
72+
item, err := normalizeFieldForParquet(dt.ItemField())
73+
if err != nil {
74+
return arrow.Field{}, err
75+
}
76+
return arrow.Field{Name: f.Name, Type: arrow.MapOfFields(key, item), Nullable: f.Nullable, Metadata: f.Metadata}, nil
77+
case *arrow.RunEndEncodedType:
78+
return arrow.Field{}, fmt.Errorf("RunEndEncoded types are not supported for writing to Parquet files: field %s", f.Name)
79+
case *arrow.ListViewType:
80+
return arrow.Field{}, fmt.Errorf("ListView types are not supported for writing to Parquet files: field %s", f.Name)
81+
case *arrow.LargeListViewType:
82+
return arrow.Field{}, fmt.Errorf("LargeListView types are not supported for writing to Parquet files: field %s", f.Name)
83+
}
84+
return f, nil
85+
}
86+
87+
// normalizeSchemaForParquet returns a copy of the Arrow schema with list element
88+
// field names updated to "element" to match the Parquet column paths produced by
89+
// fieldToNode. This is used when storing the ARROW:schema metadata to ensure
90+
// consistency between the stored schema and the actual Parquet column structure.
91+
func normalizeSchemaForParquet(sc *arrow.Schema) (*arrow.Schema, error) {
92+
fields := make([]arrow.Field, sc.NumFields())
93+
for i, f := range sc.Fields() {
94+
field, err := normalizeFieldForParquet(f)
95+
if err != nil {
96+
return nil, err
97+
}
98+
fields[i] = field
99+
}
100+
meta := sc.Metadata()
101+
return arrow.NewSchema(fields, &meta), nil
102+
}
103+
36104
// WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema
37105
// and columns will be determined by the schema of the table, writing the file out to the provided writer.
38106
// The chunksize will be utilized in order to determine the size of the row groups.
@@ -80,7 +148,17 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterPr
80148
}
81149

82150
if arrprops.storeSchema {
83-
serializedSchema := flight.SerializeSchema(arrschema, props.Allocator())
151+
// Normalize the Arrow schema so that list element field names match the
152+
// Parquet column group names. fieldToNode always uses "element" as the
153+
// Parquet group name for list element fields (via ListOfWithName), but
154+
// arrow.ListOf() uses "item" as the Arrow element field name. This
155+
// inconsistency causes readers (e.g. Snowflake) that map ARROW:schema field
156+
// names to Parquet column paths to fail to locate the correct columns.
157+
schemaToStore, err := normalizeSchemaForParquet(arrschema)
158+
if err != nil {
159+
return nil, err
160+
}
161+
serializedSchema := flight.SerializeSchema(schemaToStore, props.Allocator())
84162
meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema))
85163
}
86164

parquet/pqarrow/schema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
299299
}
300300
case arrow.STRUCT:
301301
return structToNode(field, props, arrprops)
302-
case arrow.FIXED_SIZE_LIST, arrow.LIST:
302+
case arrow.FIXED_SIZE_LIST, arrow.LIST, arrow.LARGE_LIST:
303303
elemField := field.Type.(arrow.ListLikeType).ElemField()
304304

305305
child, err := fieldToNode(name, elemField, props, arrprops)
@@ -722,7 +722,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
722722
// If the name is array or ends in _tuple, this should be a list of struct
723723
// even for single child elements.
724724
listGroup := listNode.(*schema.GroupNode)
725-
if listGroup.NumFields() == 1 && !(listGroup.Name() == "array" || listGroup.Name() == (n.Name()+"_tuple")) {
725+
if listGroup.NumFields() == 1 && (listGroup.Name() != "array" && listGroup.Name() != n.Name()+"_tuple") {
726726
// list of primitive type
727727
if err := nodeToSchemaField(listGroup.Field(0), currentLevels, ctx, out, &out.Children[0]); err != nil {
728728
return err

parquet/schema/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func NewPrimitiveNodeLogical(name string, repetition parquet.Repetition, logical
172172
n.convertedType, n.decimalMetaData = n.logicalType.ToConvertedType()
173173
}
174174

175-
if !(n.logicalType != nil && !n.logicalType.IsNested() && n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
175+
if n.logicalType == nil || n.logicalType.IsNested() || !n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData) {
176176
return nil, fmt.Errorf("invalid logical type %s", n.logicalType)
177177
}
178178

0 commit comments

Comments
 (0)