Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules/
.venv/
40 changes: 40 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,43 @@ jobs:
env:
BIGQUERY_EMULATOR_REPOSITORY: bigquery-emulator
BIGQUERY_EMULATOR_VERSION: test

node-integration-test:
needs: build
name: Node.js Integration Test
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v4
- name: setup docker buildx
uses: docker/setup-buildx-action@v3
- name: build docker image
uses: docker/build-push-action@v6
with:
context: .
load: true
tags: bigquery-emulator:test
platforms: linux/amd64
push: false
- name: setup node
uses: actions/setup-node@v4
with:
node-version: '24'
- name: enable corepack
run: corepack enable
- name: cache yarn dependencies
uses: actions/cache@v4
with:
path: test/node/.yarn/cache
key: ${{ runner.os }}-yarn-${{ hashFiles('test/node/yarn.lock') }}
restore-keys: |
${{ runner.os }}-yarn-
- name: install dependencies
run: yarn install --immutable
working-directory: test/node
- name: run integration tests
run: yarn test
working-directory: test/node
env:
BIGQUERY_EMULATOR_REPOSITORY: bigquery-emulator
BIGQUERY_EMULATOR_VERSION: test
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@ require (
modernc.org/sqlite v1.37.0
)

replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.17
replace github.com/goccy/go-zetasqlite => github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.19
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.51.0/go.mod h1:SZiPHWGOOk3bl8tkevxkoiwPgsIl6CwrWcbwjfHZpdM=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.17 h1:9Ui62Bg4QGOOQeNhQkgo91N6tFDwGuPbPh7zvPIxke0=
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.17/go.mod h1:xtUAGxrJMK0vqv5Yj/AYvrcP3g338Tbh9oTyYk1VML8=
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.19 h1://hrbIeXf8WRcM1j8DD5K6uR68zBgQFG9cBuV4juZ2o=
github.com/Recidiviz/go-zetasqlite v0.18.0-recidiviz.19/go.mod h1:xtUAGxrJMK0vqv5Yj/AYvrcP3g338Tbh9oTyYk1VML8=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=
Expand Down
10 changes: 9 additions & 1 deletion internal/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"database/sql"
"fmt"
"github.com/goccy/go-zetasqlite"
"sync"

"github.com/goccy/go-zetasqlite"
)

const (
Expand Down Expand Up @@ -107,6 +108,8 @@ func (t *Tx) unregister() {
t.finalized = true
}

func (t *Tx) Conn() *ManagedConnection { return t.conn }

func (t *Tx) RollbackIfNotCommitted() error {
defer t.unregister()
if t.committed {
Expand Down Expand Up @@ -205,6 +208,11 @@ func (c *ManagedConnection) ConfigureScope(projectID, datasetID string) *Managed
return c
}

// Raw executes the given function with the underlying ZetaSQLite connection.
func (c *ManagedConnection) Raw(fn func(interface{}) error) error {
return c.zetasqliteConnection.Raw(fn)
}

func (c *ManagedConnection) Begin(ctx context.Context) (*Tx, error) {
tx, err := c.zetasqliteConnection.BeginTx(ctx, nil)
if err != nil {
Expand Down
26 changes: 24 additions & 2 deletions internal/contentdata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"database/sql"
"encoding/base64"
"fmt"
"reflect"
"strings"

"github.com/goccy/go-json"
"github.com/goccy/go-zetasqlite"
"go.uber.org/zap"
bigqueryv2 "google.golang.org/api/bigquery/v2"
"reflect"
"strings"

"github.com/goccy/bigquery-emulator/internal/connection"
"github.com/goccy/bigquery-emulator/internal/logger"
Expand Down Expand Up @@ -195,6 +196,17 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
zap.String("query", query),
zap.Any("values", values),
)
// We must pass the query parameters to zetasqlite so the analyzer uses the proper typings
if err := tx.Conn().Raw(func(c interface{}) error {
zetasqliteConn, ok := c.(*zetasqlite.ZetaSQLiteConn)
if !ok {
return fmt.Errorf("failed to get ZetaSQLiteConn from %T", c)
}
zetasqliteConn.SetQueryParameters(params)
return nil
}); err != nil {
return nil, fmt.Errorf("failed to setup connection: %w", err)
}
rows, err := tx.Tx().QueryContext(ctx, query, values...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -305,6 +317,16 @@ func (r *Repository) queryParameterValueToGoValue(value *bigqueryv2.QueryParamet
}
return st, nil
}

// Check if the Value field is marked as null in NullFields
// This is how Google's API client indicates a null value even though
// Value is typed as string (not *string)
for _, field := range value.NullFields {
if field == "Value" {
return nil, nil
}
}

return value.Value, nil
}

Expand Down
209 changes: 209 additions & 0 deletions server/flexible_rpc_marshalling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package server

import (
"strconv"

"github.com/goccy/go-json"
bigqueryv2 "google.golang.org/api/bigquery/v2"
)

// flexibleQueryRequest wraps bigqueryv2.QueryRequest to handle flexible unmarshalling
// of query parameter values. The Node.js BigQuery client sends numeric values as JSON
// numbers, but the generated QueryParameterValue struct expects all values as strings.
type flexibleQueryRequest struct {
bigqueryv2.QueryRequest
}

// UnmarshalJSON implements custom unmarshalling that converts numeric parameter values
// to strings to match the expected QueryParameterValue.Value type.
func (f *flexibleQueryRequest) UnmarshalJSON(data []byte) error {
// First unmarshal into a generic map to inspect the structure
var raw map[string]interface{}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}

// Process queryParameters if present
var nullValuePaths [][]int
if params, ok := raw["queryParameters"].([]interface{}); ok {
nullValuePaths = processQueryParameters(params)
}

// Marshal the modified structure back to JSON
modified, err := json.Marshal(raw)
if err != nil {
return err
}

// Unmarshal into the embedded QueryRequest using a type alias to avoid recursion
type queryRequestAlias bigqueryv2.QueryRequest
if err := json.Unmarshal(modified, (*queryRequestAlias)(&f.QueryRequest)); err != nil {
return err
}

// Apply NullFields to parameter values that had null scalar values
for _, path := range nullValuePaths {
if len(path) > 0 && path[0] < len(f.QueryRequest.QueryParameters) {
applyNullField(f.QueryRequest.QueryParameters[path[0]].ParameterValue, path[1:])
}
}

return nil
}

// flexibleJob wraps bigqueryv2.Job to handle flexible unmarshalling
// of query parameter values in job configurations.
type flexibleJob struct {
bigqueryv2.Job
}

// UnmarshalJSON implements custom unmarshalling that converts numeric parameter values
// to strings to match the expected QueryParameterValue.Value type.
func (f *flexibleJob) UnmarshalJSON(data []byte) error {
// First unmarshal into a generic map to inspect the structure
var raw map[string]interface{}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}

// Process configuration.query.queryParameters if present
var nullValuePaths [][]int
if config, ok := raw["configuration"].(map[string]interface{}); ok {
if query, ok := config["query"].(map[string]interface{}); ok {
if params, ok := query["queryParameters"].([]interface{}); ok {
nullValuePaths = processQueryParameters(params)
}
}
}

// Marshal the modified structure back to JSON
modified, err := json.Marshal(raw)
if err != nil {
return err
}

// Unmarshal into the embedded Job using a type alias to avoid recursion
type jobAlias bigqueryv2.Job
if err := json.Unmarshal(modified, (*jobAlias)(&f.Job)); err != nil {
return err
}

// Apply NullFields to parameter values that had null scalar values
if f.Job.Configuration != nil && f.Job.Configuration.Query != nil {
for _, path := range nullValuePaths {
if len(path) > 0 && path[0] < len(f.Job.Configuration.Query.QueryParameters) {
applyNullField(f.Job.Configuration.Query.QueryParameters[path[0]].ParameterValue, path[1:])
}
}
}

return nil
}

// processQueryParameters handles both null tracking and normalization of query parameters.
// It processes all parameters in the list, collecting paths to null values and normalizing
// numeric/boolean values to strings. Returns the paths to parameters with null values.
func processQueryParameters(params []interface{}) [][]int {
var nullValuePaths [][]int

for i := range params {
if paramMap, ok := params[i].(map[string]interface{}); ok {
if paramValue, ok := paramMap["parameterValue"].(map[string]interface{}); ok {
collectNullPaths(paramValue, []int{i}, &nullValuePaths)
normalizeParameterValue(paramValue)
}
}
}

return nullValuePaths
}

// collectNullPaths walks through a parameter value structure and records paths
// to any scalar values that are null. This information is used later to set
// NullFields on the unmarshaled struct.
func collectNullPaths(paramValue map[string]interface{}, currentPath []int, nullPaths *[][]int) {
// Check if the scalar value field is null
if value, ok := paramValue["value"]; ok && value == nil {
// Record this path as having a null value
pathCopy := make([]int, len(currentPath))
copy(pathCopy, currentPath)
*nullPaths = append(*nullPaths, pathCopy)
}

// Recursively check array values
if arrayValues, ok := paramValue["arrayValues"].([]interface{}); ok {
for i, arrVal := range arrayValues {
if arrValMap, ok := arrVal.(map[string]interface{}); ok {
collectNullPaths(arrValMap, append(currentPath, i), nullPaths)
}
}
}

// Recursively check struct values (note: struct values are keyed by field name, not index)
// For simplicity, we'll handle this separately if needed
}

// applyNullField sets the NullFields on a QueryParameterValue to indicate
// that the Value field should be serialized as null.
func applyNullField(pv *bigqueryv2.QueryParameterValue, path []int) {
if pv == nil {
return
}

if len(path) == 0 {
// We've reached the target - mark Value as null
pv.NullFields = append(pv.NullFields, "Value")
return
}

// Navigate deeper into the structure
nextIndex := path[0]
remainingPath := path[1:]

// Check if we're navigating through array values
if nextIndex < len(pv.ArrayValues) {
applyNullField(pv.ArrayValues[nextIndex], remainingPath)
}
}

// normalizeParameterValue recursively converts numeric values to strings in parameter values
func normalizeParameterValue(paramValue map[string]interface{}) {
// Handle the scalar value field
if value, ok := paramValue["value"]; ok {
switch v := value.(type) {
case float64:
// JSON numbers are unmarshalled as float64
// Convert to string, using integer format if it's a whole number
if v == float64(int64(v)) {
paramValue["value"] = strconv.FormatInt(int64(v), 10)
} else {
paramValue["value"] = strconv.FormatFloat(v, 'f', -1, 64)
}
case bool:
// Convert booleans to string
paramValue["value"] = strconv.FormatBool(v)
case string:
// Already a string, no conversion needed
case nil:
// Null value, leave as is
}
}

// Handle array values recursively
if arrayValues, ok := paramValue["arrayValues"].([]interface{}); ok {
for _, arrVal := range arrayValues {
if arrValMap, ok := arrVal.(map[string]interface{}); ok {
normalizeParameterValue(arrValMap)
}
}
}

// Handle struct values recursively
if structValues, ok := paramValue["structValues"].(map[string]interface{}); ok {
for _, structVal := range structValues {
if structValMap, ok := structVal.(map[string]interface{}); ok {
normalizeParameterValue(structValMap)
}
}
}
}
10 changes: 5 additions & 5 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,15 +1543,15 @@ func (h *jobsInsertHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
server := serverFromContext(ctx)
project := projectFromContext(ctx)
var job bigqueryv2.Job
var job flexibleJob
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
errorResponse(ctx, w, errInvalid(err.Error()))
return
}
res, err := h.Handle(ctx, &jobsInsertRequest{
server: server,
project: project,
job: &job,
job: &job.Job,
})
if err != nil {
errorResponse(ctx, w, errInvalidQuery(err.Error()))
Expand Down Expand Up @@ -2397,20 +2397,20 @@ func (h *jobsQueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
server := serverFromContext(ctx)
project := projectFromContext(ctx)
var req bigqueryv2.QueryRequest
var req flexibleQueryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
errorResponse(ctx, w, errInvalid(err.Error()))
return
}
useInt64Timestamp := false
if options := req.FormatOptions; options != nil {
if options := req.QueryRequest.FormatOptions; options != nil {
useInt64Timestamp = options.UseInt64Timestamp
}
useInt64Timestamp = useInt64Timestamp || isFormatOptionsUseInt64Timestamp(r)
res, err := h.Handle(ctx, &jobsQueryRequest{
server: server,
project: project,
queryRequest: &req,
queryRequest: &req.QueryRequest,
useInt64Timestamp: useInt64Timestamp,
})
if err != nil {
Expand Down
Loading
Loading