Skip to content

Commit 1770906

Browse files
authored
Merge pull request #44 from Recidiviz/dan/struct-field-test
Bump to `v0.18.0-recidiviz.16.2`
2 parents da2c9b9 + 9785531 commit 1770906

File tree

5 files changed

+187
-26
lines changed

5 files changed

+187
-26
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,4 @@ require (
118118
modernc.org/sqlite v1.37.0
119119
)
120120

121-
replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.1
121+
replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
4242
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.51.0/go.mod h1:SZiPHWGOOk3bl8tkevxkoiwPgsIl6CwrWcbwjfHZpdM=
4343
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI=
4444
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
45-
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.1 h1:OhF/RKqkgZ2hVdhdzlSs+a2EIZPRmeutfUfAeC5a8uM=
46-
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.1/go.mod h1:xtUAGxrJMK0vqv5Yj/AYvrcP3g338Tbh9oTyYk1VML8=
45+
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.2 h1:GoVmgEOtY7H0UpqIySDkOUm/bXWuapnA8qZ1CanWLZE=
46+
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.16.2/go.mod h1:xtUAGxrJMK0vqv5Yj/AYvrcP3g338Tbh9oTyYk1VML8=
4747
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
4848
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
4949
github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=

internal/contentdata/repository.go

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
251251
// GoogleSQL for BigQuery translates a NULL array into an empty array in the query result
252252
v = []interface{}{}
253253
}
254-
cell, err := r.convertValueToCell(v)
254+
cell, err := r.convertValueToCell(v, fields[idx])
255255
if err != nil {
256256
return nil, fmt.Errorf("failed to convert value to cell: %w", err)
257257
}
@@ -307,48 +307,82 @@ func (r *Repository) queryParameterValueToGoValue(value *bigqueryv2.QueryParamet
307307
return value.Value, nil
308308
}
309309

310-
// zetasqlite returns []map[string]interface{} value as struct value, also returns []interface{} value as array value.
310+
// zetasqlite returns map[string]interface{} value as struct value, also returns []interface{} value as array value.
311311
// we need to convert them to specifically TableRow and TableCell type.
312-
func (r *Repository) convertValueToCell(value interface{}) (*internaltypes.TableCell, error) {
312+
// schema provides the field ordering for struct types to ensure deterministic field order.
313+
func (r *Repository) convertValueToCell(value interface{}, schema *bigqueryv2.TableFieldSchema) (*internaltypes.TableCell, error) {
313314
if value == nil {
314315
return &internaltypes.TableCell{V: nil}, nil
315316
}
316317
rv := reflect.ValueOf(value)
317318
kind := rv.Type().Kind()
318-
if kind != reflect.Slice && kind != reflect.Array {
319-
v := fmt.Sprint(value)
320-
return &internaltypes.TableCell{V: v, Bytes: int64(len(v))}, nil
321-
}
322-
elemType := rv.Type().Elem()
323-
if elemType.Kind() == reflect.Map {
319+
if kind == reflect.Map {
324320
// value is struct type
325321
var (
326322
cells []*internaltypes.TableCell
327323
totalBytes int64
328324
)
329-
for i := 0; i < rv.Len(); i++ {
330-
fieldV := rv.Index(i)
331-
keys := fieldV.MapKeys()
332-
if len(keys) != 1 {
333-
return nil, fmt.Errorf("unexpected key number of field map value. expected 1 but got %d", len(keys))
325+
326+
// Build a map of field values for quick lookup
327+
fieldValues := make(map[string]reflect.Value)
328+
keys := rv.MapKeys()
329+
for _, key := range keys {
330+
fieldValues[key.Interface().(string)] = rv.MapIndex(key)
331+
}
332+
333+
// Process fields in schema order to ensure deterministic ordering
334+
// (Go map iteration order is randomized)
335+
if schema != nil && schema.Fields != nil {
336+
for _, fieldSchema := range schema.Fields {
337+
fieldValue, exists := fieldValues[fieldSchema.Name]
338+
if !exists {
339+
// Field not present in data, skip it
340+
continue
341+
}
342+
cell, err := r.convertValueToCell(fieldValue.Interface(), fieldSchema)
343+
if err != nil {
344+
return nil, err
345+
}
346+
cell.Name = fieldSchema.Name
347+
totalBytes += cell.Bytes
348+
cells = append(cells, cell)
334349
}
335-
cell, err := r.convertValueToCell(fieldV.MapIndex(keys[0]).Interface())
336-
if err != nil {
337-
return nil, err
350+
} else {
351+
// Fallback: no schema available, process in arbitrary order
352+
for _, key := range keys {
353+
cell, err := r.convertValueToCell(rv.MapIndex(key).Interface(), nil)
354+
if err != nil {
355+
return nil, err
356+
}
357+
cell.Name = key.Interface().(string)
358+
totalBytes += cell.Bytes
359+
cells = append(cells, cell)
338360
}
339-
cell.Name = keys[0].Interface().(string)
340-
totalBytes += cell.Bytes
341-
cells = append(cells, cell)
342361
}
343362
return &internaltypes.TableCell{V: internaltypes.TableRow{F: cells}, Bytes: totalBytes}, nil
344363
}
364+
if kind != reflect.Slice && kind != reflect.Array {
365+
v := fmt.Sprint(value)
366+
return &internaltypes.TableCell{V: v, Bytes: int64(len(v))}, nil
367+
}
345368
// array type
346369
var (
347370
cells = []*internaltypes.TableCell{}
348371
totalBytes int64 = 0
349372
)
373+
// For array elements, schema.Type will be the element type (e.g., STRUCT for array of structs)
374+
// and schema.Fields will contain the struct fields
375+
var elementSchema *bigqueryv2.TableFieldSchema
376+
if schema != nil {
377+
elementSchema = &bigqueryv2.TableFieldSchema{
378+
Name: schema.Name,
379+
Type: schema.Type,
380+
Mode: "NULLABLE", // Array elements can be nullable
381+
Fields: schema.Fields,
382+
}
383+
}
350384
for i := 0; i < rv.Len(); i++ {
351-
cell, err := r.convertValueToCell(rv.Index(i).Interface())
385+
cell, err := r.convertValueToCell(rv.Index(i).Interface(), elementSchema)
352386
if err != nil {
353387
return nil, err
354388
}

internal/types/types.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,20 @@ func (r *TableRow) Data() (map[string]interface{}, error) {
7171
}
7272

7373
func (r *TableRow) AVROValue(fields []*types.AVROFieldSchema) (map[string]interface{}, error) {
74+
// Create a map of field names to schemas for efficient lookup
75+
// This is necessary because r.F may have fields in a different order than the schema
76+
fieldSchemaMap := make(map[string]*types.AVROFieldSchema, len(fields))
77+
for _, field := range fields {
78+
fieldSchemaMap[field.Name] = field
79+
}
80+
7481
rowMap := map[string]interface{}{}
75-
for idx, cell := range r.F {
76-
v, err := cell.AVROValue(fields[idx])
82+
for _, cell := range r.F {
83+
fieldSchema, ok := fieldSchemaMap[cell.Name]
84+
if !ok {
85+
return nil, fmt.Errorf("field %q not found in schema", cell.Name)
86+
}
87+
v, err := cell.AVROValue(fieldSchema)
7788
if err != nil {
7889
return nil, err
7990
}

server/struct_field_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server_test
22

33
import (
44
"context"
5+
"strings"
56
"testing"
67

78
"cloud.google.com/go/bigquery"
@@ -387,3 +388,118 @@ func TestStructFieldNamesWithBackticks(t *testing.T) {
387388
}
388389
})
389390
}
391+
392+
// TestEmptyStructInsertion tests that loading JSON data with empty STRUCT values
393+
// works correctly when IgnoreUnknownValues is set to true. This corresponds to
394+
// the Python BigQuery client test that loads {"conductor": {}} into a table
395+
// with a STRUCT field containing nested fields.
396+
func TestEmptyStructInsertion(t *testing.T) {
397+
ctx := context.Background()
398+
399+
bqServer, err := server.New(server.TempStorage)
400+
if err != nil {
401+
t.Fatal(err)
402+
}
403+
404+
const (
405+
projectID = "test"
406+
datasetID = "ds1"
407+
tableID = "t1"
408+
)
409+
410+
// Create a table with a STRUCT field "conductor" containing a FLOAT64 field "length"
411+
project := types.NewProject(
412+
projectID,
413+
types.NewDataset(
414+
datasetID,
415+
types.NewTable(
416+
tableID,
417+
[]*types.Column{
418+
types.NewColumn(
419+
"conductor",
420+
types.STRUCT,
421+
types.ColumnFields(
422+
types.NewColumn("length", types.FLOAT64),
423+
),
424+
),
425+
},
426+
nil,
427+
),
428+
),
429+
)
430+
431+
if err := bqServer.Load(server.StructSource(project)); err != nil {
432+
t.Fatal(err)
433+
}
434+
435+
testServer := bqServer.TestServer()
436+
defer func() {
437+
testServer.Close()
438+
bqServer.Close()
439+
}()
440+
441+
client, err := bigquery.NewClient(
442+
ctx,
443+
projectID,
444+
option.WithEndpoint(testServer.URL),
445+
option.WithoutAuthentication(),
446+
)
447+
if err != nil {
448+
t.Fatal(err)
449+
}
450+
defer client.Close()
451+
452+
// Test loading JSON with an empty struct {"conductor": {}}
453+
t.Run("load empty struct", func(t *testing.T) {
454+
jsonData := `{"conductor": {}}`
455+
456+
source := bigquery.NewReaderSource(strings.NewReader(jsonData))
457+
source.SourceFormat = bigquery.JSON
458+
459+
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
460+
461+
job, err := loader.Run(ctx)
462+
if err != nil {
463+
t.Fatalf("failed to run load job: %v", err)
464+
}
465+
466+
status, err := job.Wait(ctx)
467+
if err != nil {
468+
t.Fatalf("failed to wait for load job: %v", err)
469+
}
470+
471+
if err := status.Err(); err != nil {
472+
t.Fatalf("load job failed: %v", err)
473+
}
474+
475+
t.Log("Successfully loaded empty struct")
476+
})
477+
478+
// Test querying the loaded data
479+
t.Run("query conductor.length from loaded data", func(t *testing.T) {
480+
query := client.Query("SELECT conductor.length FROM `" + datasetID + "." + tableID + "`")
481+
482+
it, err := query.Read(ctx)
483+
if err != nil {
484+
t.Fatalf("failed to execute query: %v", err)
485+
}
486+
487+
var row struct {
488+
Length bigquery.NullFloat64 `bigquery:"length"`
489+
}
490+
491+
if err := it.Next(&row); err != nil {
492+
if err == iterator.Done {
493+
t.Fatal("expected at least one row")
494+
}
495+
t.Fatalf("failed to read row: %v", err)
496+
}
497+
498+
// The empty struct should result in NULL for the length field
499+
if row.Length.Valid {
500+
t.Errorf("expected NULL for conductor.length, got %v", row.Length.Float64)
501+
}
502+
503+
t.Logf("Query succeeded: conductor.length is NULL as expected for empty struct")
504+
})
505+
}

0 commit comments

Comments
 (0)