Skip to content

Commit b196d3b

Browse files
Willem-J-anWillem Jan Noort
andauthored
fix(arrow/avro-reader): bunch of types that didn't work (#416)
### Rationale for this change #415 ### What changes are included in this PR? Fix decoding of avro types that no longer worked: fixed bytes, timestamp, time, date & decimals ### Are these changes tested? Added tests to the avro/reader_test.go ### Are there any user-facing changes? No API changes, just broader compatability. --------- Co-authored-by: Willem Jan Noort <WillemJan.Noort@essent.nl>
1 parent 2cf2b29 commit b196d3b

6 files changed

Lines changed: 612 additions & 400 deletions

File tree

arrow/avro/reader_test.go

Lines changed: 53 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -17,212 +17,24 @@
1717
package avro
1818

1919
import (
20+
"bytes"
21+
"encoding/json"
2022
"fmt"
23+
"os"
24+
"path/filepath"
2125
"testing"
2226

2327
"github.com/apache/arrow-go/v18/arrow"
28+
"github.com/apache/arrow-go/v18/arrow/avro/testdata"
2429
hamba "github.com/hamba/avro/v2"
30+
"github.com/stretchr/testify/assert"
2531
)
2632

27-
func TestEditSchemaStringEqual(t *testing.T) {
33+
func TestReader(t *testing.T) {
2834
tests := []struct {
29-
avroSchema string
3035
arrowSchema []arrow.Field
3136
}{
3237
{
33-
avroSchema: `{
34-
"fields": [
35-
{
36-
"name": "inheritNull",
37-
"type": {
38-
"name": "Simple",
39-
"symbols": [
40-
"a",
41-
"b"
42-
],
43-
"type": "enum"
44-
}
45-
},
46-
{
47-
"name": "explicitNamespace",
48-
"type": {
49-
"name": "test",
50-
"namespace": "org.hamba.avro",
51-
"size": 12,
52-
"type": "fixed"
53-
}
54-
},
55-
{
56-
"name": "fullName",
57-
"type": {
58-
"type": "record",
59-
"name": "fullName_data",
60-
"namespace": "ignored",
61-
"doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.",
62-
"fields": [{
63-
"name": "inheritNamespace",
64-
"type": {
65-
"type": "enum",
66-
"name": "Understanding",
67-
"doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.",
68-
"symbols": ["d", "e"]
69-
}
70-
}, {
71-
"name": "md5",
72-
"type": {
73-
"name": "md5_data",
74-
"type": "fixed",
75-
"size": 16,
76-
"namespace": "ignored"
77-
}
78-
}
79-
]
80-
}
81-
},
82-
{
83-
"name": "id",
84-
"type": "int"
85-
},
86-
{
87-
"name": "bigId",
88-
"type": "long"
89-
},
90-
{
91-
"name": "temperature",
92-
"type": [
93-
"null",
94-
"float"
95-
]
96-
},
97-
{
98-
"name": "fraction",
99-
"type": [
100-
"null",
101-
"double"
102-
]
103-
},
104-
{
105-
"name": "is_emergency",
106-
"type": "boolean"
107-
},
108-
{
109-
"name": "remote_ip",
110-
"type": [
111-
"null",
112-
"bytes"
113-
]
114-
},
115-
{
116-
"name": "person",
117-
"type": {
118-
"fields": [
119-
{
120-
"name": "lastname",
121-
"type": "string"
122-
},
123-
{
124-
"name": "address",
125-
"type": {
126-
"fields": [
127-
{
128-
"name": "streetaddress",
129-
"type": "string"
130-
},
131-
{
132-
"name": "city",
133-
"type": "string"
134-
}
135-
],
136-
"name": "AddressUSRecord",
137-
"type": "record"
138-
}
139-
},
140-
{
141-
"name": "mapfield",
142-
"type": {
143-
"default": {
144-
},
145-
"type": "map",
146-
"values": "long"
147-
}
148-
},
149-
{
150-
"name": "arrayField",
151-
"type": {
152-
"default": [
153-
],
154-
"items": "string",
155-
"type": "array"
156-
}
157-
}
158-
],
159-
"name": "person_data",
160-
"type": "record"
161-
}
162-
},
163-
{
164-
"name": "decimalField",
165-
"type": {
166-
"logicalType": "decimal",
167-
"precision": 4,
168-
"scale": 2,
169-
"type": "bytes"
170-
}
171-
},
172-
{
173-
"logicalType": "uuid",
174-
"name": "uuidField",
175-
"type": "string"
176-
},
177-
{
178-
"name": "timemillis",
179-
"type": {
180-
"type": "int",
181-
"logicalType": "time-millis"
182-
}
183-
},
184-
{
185-
"name": "timemicros",
186-
"type": {
187-
"type": "long",
188-
"logicalType": "time-micros"
189-
}
190-
},
191-
{
192-
"name": "timestampmillis",
193-
"type": {
194-
"type": "long",
195-
"logicalType": "timestamp-millis"
196-
}
197-
},
198-
{
199-
"name": "timestampmicros",
200-
"type": {
201-
"type": "long",
202-
"logicalType": "timestamp-micros"
203-
}
204-
},
205-
{
206-
"name": "duration",
207-
"type": {
208-
"name": "duration",
209-
"namespace": "whyowhy",
210-
"logicalType": "duration",
211-
"size": 12,
212-
"type": "fixed"
213-
}
214-
},
215-
{
216-
"name": "date",
217-
"type": {
218-
"logicalType": "date",
219-
"type": "int"
220-
}
221-
}
222-
],
223-
"name": "Example",
224-
"type": "record"
225-
}`,
22638
arrowSchema: []arrow.Field{
22739
{
22840
Name: "explicitNamespace",
@@ -303,6 +115,10 @@ func TestEditSchemaStringEqual(t *testing.T) {
303115
Name: "decimalField",
304116
Type: &arrow.Decimal128Type{Precision: 4, Scale: 2},
305117
},
118+
{
119+
Name: "decimal256Field",
120+
Type: &arrow.Decimal256Type{Precision: 60, Scale: 2},
121+
},
306122
{
307123
Name: "uuidField",
308124
Type: arrow.BinaryTypes.String,
@@ -336,12 +152,15 @@ func TestEditSchemaStringEqual(t *testing.T) {
336152
}
337153

338154
for _, test := range tests {
339-
t.Run("", func(t *testing.T) {
155+
tp := testdata.Generate()
156+
defer os.RemoveAll(filepath.Dir(tp.Avro))
157+
158+
t.Run("ShouldParseSchemaWithEdits", func(t *testing.T) {
340159
want := arrow.NewSchema(test.arrowSchema, nil)
341160

342-
schema, err := hamba.ParseBytes([]byte(test.avroSchema))
161+
schema, err := testdata.AllTypesAvroSchema()
343162
if err != nil {
344-
t.Fatalf("%v", err)
163+
t.Fatal(err)
345164
}
346165
r := new(OCFReader)
347166
r.avroSchema = schema.String()
@@ -354,11 +173,45 @@ func TestEditSchemaStringEqual(t *testing.T) {
354173
if err != nil {
355174
t.Fatalf("%v", err)
356175
}
176+
assert.Equal(t, want.String(), got.String())
357177
if fmt.Sprintf("%+v", want.String()) != fmt.Sprintf("%+v", got.String()) {
358178
t.Fatalf("got=%v,\n want=%v", got.String(), want.String())
359-
} else {
360-
t.Logf("schema.String() comparison passed")
361179
}
362180
})
181+
182+
t.Run("ShouldLoadExpectedRecords", func(t *testing.T) {
183+
b, err := os.ReadFile(tp.Avro)
184+
if err != nil {
185+
t.Error(err)
186+
}
187+
r := bytes.NewReader(b)
188+
189+
opts := []Option{WithChunk(-1)}
190+
ar, err := NewOCFReader(r, opts...)
191+
if err != nil {
192+
t.Error(err)
193+
}
194+
defer ar.Close()
195+
196+
exists := ar.Next()
197+
198+
if ar.Err() != nil {
199+
t.Error("failed to read next record: %w", ar.Err())
200+
}
201+
if !exists {
202+
t.Error("no record exists")
203+
}
204+
a, err := ar.Record().MarshalJSON()
205+
assert.NoError(t, err)
206+
var avroParsed []map[string]any
207+
json.Unmarshal(a, &avroParsed)
208+
209+
j, err := os.ReadFile(tp.Json)
210+
assert.NoError(t, err)
211+
var jsonParsed map[string]any
212+
json.Unmarshal(j, &jsonParsed)
213+
214+
assert.Equal(t, jsonParsed, avroParsed[0])
215+
})
363216
}
364217
}

0 commit comments

Comments
 (0)