Skip to content

Commit 7ce3c03

Browse files
authored
feat(parquet/pqarrow): read/write variant (#434)
### Rationale for this change resolves #310 ### What changes are included in this PR? Updating the `pqarrow` package to support full round trip read/write of Variant values via `arrow/extensions/variant` ### Are these changes tested? Yes, unit tests are added for both shredded and unshredded variants. ### Are there any user-facing changes? just the new features.
1 parent b196d3b commit 7ce3c03

6 files changed

Lines changed: 339 additions & 13 deletions

File tree

arrow/extensions/variant.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,83 @@ func NewDefaultVariantType() *VariantType {
6262
return vt
6363
}
6464

65+
func createShreddedField(dt arrow.DataType) arrow.DataType {
66+
switch t := dt.(type) {
67+
case arrow.ListLikeType:
68+
return arrow.ListOfNonNullable(arrow.StructOf(
69+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
70+
arrow.Field{Name: "typed_value", Type: createShreddedField(t.Elem()), Nullable: true},
71+
))
72+
case *arrow.StructType:
73+
fields := make([]arrow.Field, 0, t.NumFields())
74+
for i := range t.NumFields() {
75+
f := t.Field(i)
76+
fields = append(fields, arrow.Field{
77+
Name: f.Name,
78+
Type: arrow.StructOf(arrow.Field{
79+
Name: "value",
80+
Type: arrow.BinaryTypes.Binary,
81+
Nullable: true,
82+
}, arrow.Field{
83+
Name: "typed_value",
84+
Type: createShreddedField(f.Type),
85+
Nullable: true,
86+
}),
87+
Nullable: false,
88+
Metadata: f.Metadata,
89+
})
90+
}
91+
return arrow.StructOf(fields...)
92+
default:
93+
return dt
94+
}
95+
}
96+
97+
// NewShreddedVariantType creates a new VariantType extension type using the provided
98+
// type to define a shredded schema by setting the `typed_value` field accordingly and
99+
// properly constructing the shredded fields for structs, lists and so on.
100+
//
101+
// For example:
102+
//
103+
// NewShreddedVariantType(arrow.StructOf(
104+
// arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float64},
105+
// arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32}))
106+
//
107+
// Will create a variant type with the following structure:
108+
//
109+
// arrow.StructOf(
110+
// arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
111+
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
112+
// arrow.Field{Name: "typed_value", Type: arrow.StructOf(
113+
// arrow.Field{Name: "latitude", Type: arrow.StructOf(
114+
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
115+
// arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true}),
116+
// Nullable: false},
117+
// arrow.Field{Name: "longitude", Type: arrow.StructOf(
118+
// arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
119+
// arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true}),
120+
// Nullable: false},
121+
// ), Nullable: true})
122+
//
123+
// This is intended to be a convenient way to create a shredded variant type from a definition
124+
// of the fields to shred. If the provided data type is nil, it will create a default
125+
// variant type.
126+
func NewShreddedVariantType(dt arrow.DataType) *VariantType {
127+
if dt == nil {
128+
return NewDefaultVariantType()
129+
}
130+
131+
vt, _ := NewVariantType(arrow.StructOf(
132+
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Nullable: false},
133+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
134+
arrow.Field{
135+
Name: "typed_value",
136+
Type: createShreddedField(dt),
137+
Nullable: true,
138+
}))
139+
return vt
140+
}
141+
65142
// NewVariantType creates a new variant type based on the provided storage type.
66143
//
67144
// The rules for a variant storage type are:
@@ -1480,16 +1557,17 @@ type shreddedObjBuilder struct {
14801557
}
14811558

14821559
func (b *shreddedObjBuilder) AppendMissing() {
1483-
b.structBldr.Append(true)
1560+
b.structBldr.AppendValues([]bool{false})
14841561
for _, fieldBldr := range b.fieldBuilders {
1562+
fieldBldr.structBldr.Append(true)
14851563
fieldBldr.valueBldr.AppendNull()
14861564
fieldBldr.typedBldr.AppendMissing()
14871565
}
14881566
}
14891567

14901568
func (b *shreddedObjBuilder) tryTyped(v variant.Value) (residual []byte) {
14911569
if v.Type() != variant.Object {
1492-
b.structBldr.AppendNull()
1570+
b.AppendMissing()
14931571
return v.Bytes()
14941572
}
14951573

arrow/extensions/variant_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,3 +1574,84 @@ func TestVariantBuilderUnmarshalJSON(t *testing.T) {
15741574
assert.Equal(t, int8(5), innerVal2.Value())
15751575
})
15761576
}
1577+
1578+
func TestNewSimpleShreddedVariantType(t *testing.T) {
1579+
assert.True(t, arrow.TypeEqual(extensions.NewDefaultVariantType(),
1580+
extensions.NewShreddedVariantType(nil)))
1581+
1582+
vt := extensions.NewShreddedVariantType(arrow.PrimitiveTypes.Float32)
1583+
s := arrow.StructOf(
1584+
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
1585+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1586+
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true})
1587+
1588+
assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
1589+
}
1590+
1591+
func TestNewShreddedVariantType(t *testing.T) {
1592+
vt := extensions.NewShreddedVariantType(arrow.StructOf(arrow.Field{
1593+
Name: "event_type",
1594+
Type: arrow.BinaryTypes.String,
1595+
}, arrow.Field{
1596+
Name: "event_ts",
1597+
Type: arrow.FixedWidthTypes.Timestamp_us,
1598+
}))
1599+
1600+
assert.NotNil(t, vt)
1601+
s := arrow.StructOf(
1602+
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
1603+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1604+
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
1605+
arrow.Field{Name: "event_type", Type: arrow.StructOf(
1606+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1607+
arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true},
1608+
)},
1609+
arrow.Field{Name: "event_ts", Type: arrow.StructOf(
1610+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1611+
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
1612+
)},
1613+
), Nullable: true})
1614+
1615+
assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
1616+
}
1617+
1618+
func TestShreddedVariantNested(t *testing.T) {
1619+
vt := extensions.NewShreddedVariantType(arrow.StructOf(
1620+
arrow.Field{Name: "strval", Type: arrow.BinaryTypes.String},
1621+
arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
1622+
arrow.Field{Name: "location", Type: arrow.ListOf(arrow.StructOf(
1623+
arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float64},
1624+
arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32},
1625+
))}))
1626+
1627+
s := arrow.StructOf(
1628+
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
1629+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1630+
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
1631+
arrow.Field{Name: "strval", Type: arrow.StructOf(
1632+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1633+
arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true},
1634+
)},
1635+
arrow.Field{Name: "bool", Type: arrow.StructOf(
1636+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1637+
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Boolean, Nullable: true},
1638+
)},
1639+
arrow.Field{Name: "location", Type: arrow.StructOf(
1640+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1641+
arrow.Field{Name: "typed_value", Type: arrow.ListOfNonNullable(arrow.StructOf(
1642+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1643+
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
1644+
arrow.Field{Name: "latitude", Type: arrow.StructOf(
1645+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1646+
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true},
1647+
)},
1648+
arrow.Field{Name: "longitude", Type: arrow.StructOf(
1649+
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true},
1650+
arrow.Field{Name: "typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true},
1651+
)},
1652+
), Nullable: true},
1653+
)), Nullable: true})},
1654+
), Nullable: true})
1655+
1656+
assert.Truef(t, arrow.TypeEqual(vt.Storage, s), "expected %s, got %s", s, vt.Storage)
1657+
}

parquet/file/record_reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ func (rr *recordReader) ReadRecordData(numRecords int64) (int64, error) {
555555
// no repetition levels, skip delimiting logic. each level
556556
// represents null or not null entry
557557
recordsRead = utils.Min(rr.levelsWritten-rr.levelsPos, numRecords)
558+
valuesToRead = recordsRead
558559
// this is advanced by delimitRecords which we skipped
559560
rr.levelsPos += recordsRead
560561
} else {

parquet/pqarrow/encode_arrow_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2314,3 +2314,107 @@ func TestEmptyListDeltaBinaryPacked(t *testing.T) {
23142314
assert.True(t, schema.Equal(tbl.Schema()))
23152315
assert.EqualValues(t, 1, tbl.NumRows())
23162316
}
2317+
2318+
func TestReadWriteNonShreddedVariant(t *testing.T) {
2319+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
2320+
defer mem.AssertSize(t, 0)
2321+
2322+
bldr := extensions.NewVariantBuilder(mem, extensions.NewDefaultVariantType())
2323+
defer bldr.Release()
2324+
2325+
jsonData := `[
2326+
42,
2327+
"text",
2328+
[1, 2, 3],
2329+
{"name": "Alice"},
2330+
[{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}],
2331+
{"items": [1, "two", true], "metadata": {"created": "2025-01-01"}},
2332+
null
2333+
]`
2334+
2335+
err := bldr.UnmarshalJSON([]byte(jsonData))
2336+
require.NoError(t, err)
2337+
2338+
arr := bldr.NewArray()
2339+
defer arr.Release()
2340+
2341+
rec := array.NewRecord(arrow.NewSchema([]arrow.Field{
2342+
{Name: "variant", Type: arr.DataType(), Nullable: true},
2343+
}, nil), []arrow.Array{arr}, -1)
2344+
2345+
var buf bytes.Buffer
2346+
wr, err := pqarrow.NewFileWriter(rec.Schema(), &buf, nil,
2347+
pqarrow.DefaultWriterProps())
2348+
require.NoError(t, err)
2349+
2350+
require.NoError(t, wr.Write(rec))
2351+
rec.Release()
2352+
wr.Close()
2353+
2354+
rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
2355+
require.NoError(t, err)
2356+
reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
2357+
require.NoError(t, err)
2358+
defer rdr.Close()
2359+
2360+
tbl, err := reader.ReadTable(context.Background())
2361+
require.NoError(t, err)
2362+
defer tbl.Release()
2363+
2364+
assert.True(t, array.Equal(arr, tbl.Column(0).Data().Chunk(0)))
2365+
}
2366+
2367+
func TestReadWriteShreddedVariant(t *testing.T) {
2368+
vt := extensions.NewShreddedVariantType(arrow.StructOf(
2369+
arrow.Field{Name: "event_type", Type: arrow.BinaryTypes.String},
2370+
arrow.Field{Name: "event_ts", Type: arrow.FixedWidthTypes.Timestamp_us}))
2371+
2372+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
2373+
defer mem.AssertSize(t, 0)
2374+
2375+
bldr := vt.NewBuilder(mem)
2376+
defer bldr.Release()
2377+
2378+
jsonData := `[
2379+
{"event_type": "noop", "event_ts": "1970-01-21 00:29:54.114937Z"},
2380+
42,
2381+
{"event_type": "text", "event_ts": "1970-01-21 00:29:54.954163Z"},
2382+
{"event_type": "list", "event_ts": "1970-01-21 00:29:54.240241Z"},
2383+
"text",
2384+
{"event_type": "object", "event_ts": "1970-01-21 00:29:54.146402Z"},
2385+
null
2386+
]`
2387+
2388+
err := bldr.UnmarshalJSON([]byte(jsonData))
2389+
require.NoError(t, err)
2390+
2391+
arr := bldr.NewArray()
2392+
defer arr.Release()
2393+
2394+
rec := array.NewRecord(arrow.NewSchema([]arrow.Field{
2395+
{Name: "variant", Type: arr.DataType(), Nullable: true},
2396+
}, nil), []arrow.Array{arr}, -1)
2397+
2398+
var buf bytes.Buffer
2399+
wr, err := pqarrow.NewFileWriter(rec.Schema(), &buf,
2400+
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)),
2401+
pqarrow.DefaultWriterProps())
2402+
require.NoError(t, err)
2403+
2404+
require.NoError(t, wr.Write(rec))
2405+
rec.Release()
2406+
wr.Close()
2407+
2408+
rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
2409+
require.NoError(t, err)
2410+
reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
2411+
require.NoError(t, err)
2412+
defer rdr.Close()
2413+
2414+
tbl, err := reader.ReadTable(context.Background())
2415+
require.NoError(t, err)
2416+
defer tbl.Release()
2417+
2418+
assert.Truef(t, array.Equal(arr, tbl.Column(0).Data().Chunk(0)),
2419+
"expected: %s\ngot: %s", arr, tbl.Column(0).Data().Chunk(0))
2420+
}

parquet/pqarrow/file_reader.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,37 @@ func (fr *FileReader) Schema() (*arrow.Schema, error) {
111111
return FromParquet(fr.rdr.MetaData().Schema, &fr.Props, fr.rdr.MetaData().KeyValueMetadata())
112112
}
113113

114+
type extensionReader struct {
115+
colReaderImpl
116+
117+
fieldWithExt arrow.Field
118+
}
119+
120+
func (er *extensionReader) Field() *arrow.Field {
121+
return &er.fieldWithExt
122+
}
123+
124+
func (er *extensionReader) BuildArray(boundedLen int64) (*arrow.Chunked, error) {
125+
if er.colReaderImpl == nil {
126+
return nil, errors.New("extension reader has no underlying column reader implementation")
127+
}
128+
129+
chkd, err := er.colReaderImpl.BuildArray(boundedLen)
130+
if err != nil {
131+
return nil, err
132+
}
133+
defer chkd.Release()
134+
135+
extType := er.fieldWithExt.Type.(arrow.ExtensionType)
136+
137+
newChunks := make([]arrow.Array, len(chkd.Chunks()))
138+
for i, c := range chkd.Chunks() {
139+
newChunks[i] = array.NewExtensionArrayWithStorage(extType, c)
140+
}
141+
142+
return arrow.NewChunked(extType, newChunks), nil
143+
}
144+
114145
type colReaderImpl interface {
115146
LoadBatch(nrecs int64) error
116147
BuildArray(boundedLen int64) (*arrow.Chunked, error)
@@ -517,7 +548,14 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
517548

518549
switch arrowField.Type.ID() {
519550
case arrow.EXTENSION:
520-
return nil, xerrors.New("extension type not implemented")
551+
storageField := arrowField
552+
storageField.Type = arrowField.Type.(arrow.ExtensionType).StorageType()
553+
storageReader, err := fr.getReader(ctx, field, storageField)
554+
if err != nil {
555+
return nil, err
556+
}
557+
558+
return &ColumnReader{&extensionReader{colReaderImpl: storageReader, fieldWithExt: arrowField}}, nil
521559
case arrow.STRUCT:
522560

523561
childReaders := make([]*ColumnReader, len(field.Children))

0 commit comments

Comments
 (0)