Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions arrow/extensions/variant.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ func (v *VariantType) Value() arrow.Field {
return v.StorageType().(*arrow.StructType).Field(v.valueFieldIdx)
}

func (v *VariantType) TypedValue() arrow.Field {
if v.typedValueFieldIdx == -1 {
return arrow.Field{}
}

return v.StorageType().(*arrow.StructType).Field(v.typedValueFieldIdx)
}

func (*VariantType) ExtensionName() string { return "parquet.variant" }

func (v *VariantType) String() string {
Expand Down
20 changes: 15 additions & 5 deletions parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,19 @@ func variantToNode(t *extensions.VariantType, field arrow.Field, props *parquet.
return nil, err
}

//TODO: implement shredding
fields := schema.FieldList{metadataNode, valueNode}

typedField := t.TypedValue()
if typedField.Type != nil {
typedNode, err := fieldToNode("typed_value", typedField, props, arrProps)
if err != nil {
return nil, err
}
fields = append(fields, typedNode)
}

return schema.NewGroupNodeLogical(field.Name, repFromNullable(field.Nullable),
schema.FieldList{metadataNode, valueNode}, schema.VariantLogicalType{},
fieldIDFromMeta(field.Metadata))
fields, schema.VariantLogicalType{}, fieldIDFromMeta(field.Metadata))
}

func structToNode(field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (schema.Node, error) {
Expand Down Expand Up @@ -859,8 +867,10 @@ func mapToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *sc
func variantToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *schemaTree, _, out *SchemaField) error {
// this is for unshredded variants. shredded variants may have more fields
// TODO: implement support for shredded variants
Comment thread
zeroshade marked this conversation as resolved.
Outdated
if n.NumFields() != 2 {
return errors.New("VARIANT group must have exactly 2 children")
switch n.NumFields() {
case 2, 3:
default:
return errors.New("VARIANT group must have exactly 2 or 3 children")
}

var err error
Expand Down
104 changes: 104 additions & 0 deletions parquet/pqarrow/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,107 @@ func TestConvertSchemaParquetVariant(t *testing.T) {
require.NoError(t, err)
assert.True(t, pqschema.Equals(sc), pqschema.String(), sc.String())
}

func TestShreddedVariantSchema(t *testing.T) {
metaNoFieldID := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})

s := arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Metadata: metaNoFieldID},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
arrow.Field{Name: "typed_value", Type: arrow.StructOf(
arrow.Field{Name: "tsmicro", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true, Metadata: metaNoFieldID},
), Metadata: metaNoFieldID},
arrow.Field{Name: "strval", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: metaNoFieldID},
), Metadata: metaNoFieldID},
arrow.Field{Name: "bool", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Boolean, Nullable: true, Metadata: metaNoFieldID},
), Metadata: metaNoFieldID},
arrow.Field{Name: "uuid", Type: arrow.StructOf(
arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
arrow.Field{Name: "typed_value", Type: extensions.NewUUIDType(), Nullable: true, Metadata: metaNoFieldID},
), Metadata: metaNoFieldID},
), Nullable: true, Metadata: metaNoFieldID})

vt, err := extensions.NewVariantType(s)
require.NoError(t, err)

arrSchema := arrow.NewSchema([]arrow.Field{
{Name: "variant_col", Type: vt, Nullable: true, Metadata: metaNoFieldID},
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: metaNoFieldID},
}, nil)

sc, err := pqarrow.ToParquet(arrSchema, nil, pqarrow.DefaultWriterProps())
require.NoError(t, err)

// the equivalent shredded variant parquet schema looks like this:
// repeated group field_id=-1 schema {
// optional group field_id=-1 variant_col (Variant) {
// required byte_array field_id=-1 metadata;
// optional byte_array field_id=-1 value;
// optional group field_id=-1 typed_value {
// required group field_id=-1 tsmicro {
// optional byte_array field_id=-1 value;
// optional int64 field_id=-1 typed_value (Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=true));
// }
// required group field_id=-1 strval {
// optional byte_array field_id=-1 value;
// optional byte_array field_id=-1 typed_value (String);
// }
// required group field_id=-1 bool {
// optional byte_array field_id=-1 value;
// optional boolean field_id=-1 typed_value;
// }
// required group field_id=-1 uuid {
// optional byte_array field_id=-1 value;
// optional fixed_len_byte_array field_id=-1 typed_value (UUID);
// }
// }
// }
// required int64 field_id=-1 id (Int(bitWidth=64, isSigned=true));
// }

expected := schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema",
parquet.Repetitions.Repeated, schema.FieldList{
schema.Must(schema.NewGroupNodeLogical("variant_col", parquet.Repetitions.Optional, schema.FieldList{
schema.MustPrimitive(schema.NewPrimitiveNode("metadata", parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1)),
schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
schema.MustGroup(schema.NewGroupNode("typed_value", parquet.Repetitions.Optional, schema.FieldList{
schema.MustGroup(schema.NewGroupNode("tsmicro", parquet.Repetitions.Required, schema.FieldList{
schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional, schema.NewTimestampLogicalTypeWithOpts(
schema.WithTSTimeUnitType(schema.TimeUnitMicros), schema.WithTSIsAdjustedToUTC(), schema.WithTSForceConverted(),
), parquet.Types.Int64, -1, -1)),
}, -1)),
schema.MustGroup(schema.NewGroupNode("strval", parquet.Repetitions.Required, schema.FieldList{
schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional,
schema.StringLogicalType{}, parquet.Types.ByteArray, -1, -1)),
}, -1)),
schema.MustGroup(schema.NewGroupNode("bool", parquet.Repetitions.Required, schema.FieldList{
schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
schema.MustPrimitive(schema.NewPrimitiveNode("typed_value", parquet.Repetitions.Optional,
parquet.Types.Boolean, -1, -1)),
}, -1)),
schema.MustGroup(schema.NewGroupNode("uuid", parquet.Repetitions.Required, schema.FieldList{
schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional,
schema.UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16, -1)),
}, -1)),
}, -1)),
}, schema.VariantLogicalType{}, -1)),
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("id", parquet.Repetitions.Required,
schema.NewIntLogicalType(64, true), parquet.Types.Int64, -1, -1)),
}, -1)))

assert.True(t, sc.Equals(expected), "expected: %s\ngot: %s", expected, sc)

arrsc, err := pqarrow.FromParquet(sc, nil, metadata.KeyValueMetadata{})
require.NoError(t, err)

assert.True(t, arrSchema.Equal(arrsc), "expected: %s\ngot: %s", arrSchema, arrsc)
}
Loading