Skip to content

Commit 3ea31db

Browse files
committed
sync table metadata
1 parent 456c584 commit 3ea31db

File tree

8 files changed

+242
-43
lines changed

8 files changed

+242
-43
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/goccy/go-json v0.10.0
1313
github.com/goccy/go-yaml v1.9.5
1414
github.com/goccy/go-zetasql v0.5.1
15-
github.com/goccy/go-zetasqlite v0.14.1
15+
github.com/goccy/go-zetasqlite v0.16.0
1616
github.com/google/go-cmp v0.5.9
1717
github.com/googleapis/gax-go/v2 v2.7.1
1818
github.com/gorilla/mux v1.8.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ github.com/goccy/go-yaml v1.9.5 h1:Eh/+3uk9kLxG4koCX6lRMAPS1OaMSAi+FJcya0INdB0=
7979
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
8080
github.com/goccy/go-zetasql v0.5.1 h1:vP9wgUH5gylw8yL+gwO0wF7uW/fW5Dr1AAAzi8Kgevg=
8181
github.com/goccy/go-zetasql v0.5.1/go.mod h1:6W14CJVKh7crrSPyj6NPk4c49L2NWnxvyDLsRkOm4BI=
82-
github.com/goccy/go-zetasqlite v0.14.1 h1:Jt1mtRY/xpln6dbVo4AuMdAEWC8xKYF0hAJg2hYb3So=
83-
github.com/goccy/go-zetasqlite v0.14.1/go.mod h1:ikLN7nRFum4sXL6kDxgIWrhH/9iZSdwXWXZzMUnuTjM=
82+
github.com/goccy/go-zetasqlite v0.16.0 h1:kjd6g8tY1OIEefCHjeZYvfu0tQwVswEaYHR844XFTLY=
83+
github.com/goccy/go-zetasqlite v0.16.0/go.mod h1:ikLN7nRFum4sXL6kDxgIWrhH/9iZSdwXWXZzMUnuTjM=
8484
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
8585
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
8686
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=

internal/contentdata/repository.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
172172
return nil, err
173173
}
174174
defer rows.Close()
175+
changedCatalog, err := zetasqlite.ChangedCatalogFromRows(rows)
176+
if err != nil {
177+
return nil, fmt.Errorf("failed to get changed catalog: %w", err)
178+
}
175179
colNames, err := rows.Columns()
176180
if err != nil {
177181
return nil, fmt.Errorf("failed to get columns: %w", err)
@@ -238,10 +242,11 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
238242
Schema: &bigqueryv2.TableSchema{
239243
Fields: fields,
240244
},
241-
TotalRows: uint64(len(tableRows)),
242-
JobComplete: true,
243-
Rows: tableRows,
244-
TotalBytes: totalBytes,
245+
TotalRows: uint64(len(tableRows)),
246+
JobComplete: true,
247+
Rows: tableRows,
248+
TotalBytes: totalBytes,
249+
ChangedCatalog: changedCatalog,
245250
}, nil
246251
}
247252

internal/types/types.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ type (
2121
}
2222

2323
QueryResponse struct {
24-
JobReference *bigqueryv2.JobReference `json:"jobReference"`
25-
Schema *bigqueryv2.TableSchema `json:"schema"`
26-
Rows []*TableRow `json:"rows"`
27-
TotalRows uint64 `json:"totalRows,string"`
28-
JobComplete bool `json:"jobComplete"`
29-
TotalBytes int64 `json:"-"`
24+
JobReference *bigqueryv2.JobReference `json:"jobReference"`
25+
Schema *bigqueryv2.TableSchema `json:"schema"`
26+
Rows []*TableRow `json:"rows"`
27+
TotalRows uint64 `json:"totalRows,string"`
28+
JobComplete bool `json:"jobComplete"`
29+
TotalBytes int64 `json:"-"`
30+
ChangedCatalog *zetasqlite.ChangedCatalog `json:"-"`
3031
}
3132

3233
TableDataList struct {

server/handler.go

Lines changed: 137 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"cloud.google.com/go/storage"
2323
"github.com/goccy/go-json"
24+
"github.com/goccy/go-zetasqlite"
2425
"go.uber.org/zap"
2526
bigqueryv2 "google.golang.org/api/bigquery/v2"
2627
"google.golang.org/api/iterator"
@@ -1478,10 +1479,112 @@ func (h *jobsInsertHandler) Handle(ctx context.Context, r *jobsInsertRequest) (*
14781479
if err := tx.Commit(); err != nil {
14791480
return nil, fmt.Errorf("failed to commit job: %w", err)
14801481
}
1482+
if response != nil && response.ChangedCatalog.Changed() {
1483+
if err := syncCatalog(ctx, r.server, response.ChangedCatalog); err != nil {
1484+
return nil, err
1485+
}
1486+
}
14811487
}
1488+
14821489
return job, nil
14831490
}
14841491

1492+
func syncCatalog(ctx context.Context, server *Server, cat *zetasqlite.ChangedCatalog) error {
1493+
for _, table := range cat.Table.Added {
1494+
if err := addTableMetadata(ctx, server, table); err != nil {
1495+
return err
1496+
}
1497+
}
1498+
for _, table := range cat.Table.Deleted {
1499+
if err := deleteTableMetadata(ctx, server, table); err != nil {
1500+
return err
1501+
}
1502+
}
1503+
return nil
1504+
}
1505+
1506+
func addTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.TableSpec) error {
1507+
if len(spec.NamePath) != 3 {
1508+
return fmt.Errorf("unexpected table name path: %v", spec.NamePath)
1509+
}
1510+
projectID := spec.NamePath[0]
1511+
datasetID := spec.NamePath[1]
1512+
tableID := spec.NamePath[2]
1513+
project, err := server.metaRepo.FindProject(ctx, projectID)
1514+
if err != nil {
1515+
return err
1516+
}
1517+
dataset := project.Dataset(datasetID)
1518+
if dataset == nil {
1519+
return fmt.Errorf("dataset %s is not found", datasetID)
1520+
}
1521+
fields := make([]*bigqueryv2.TableFieldSchema, 0, len(spec.Columns))
1522+
for _, column := range spec.Columns {
1523+
zetasqlType, err := column.Type.ToZetaSQLType()
1524+
if err != nil {
1525+
return err
1526+
}
1527+
fields = append(fields, types.TableFieldSchemaFromZetaSQLType(column.Name, zetasqlType))
1528+
}
1529+
conn, err := server.connMgr.Connection(ctx, projectID, datasetID)
1530+
if err != nil {
1531+
return err
1532+
}
1533+
tx, err := conn.Begin(ctx)
1534+
if err != nil {
1535+
return err
1536+
}
1537+
defer tx.RollbackIfNotCommitted()
1538+
if _, err := createTableMetadata(ctx, tx, server, project, dataset, &bigqueryv2.Table{
1539+
TableReference: &bigqueryv2.TableReference{
1540+
ProjectId: projectID,
1541+
DatasetId: datasetID,
1542+
TableId: tableID,
1543+
},
1544+
Schema: &bigqueryv2.TableSchema{Fields: fields},
1545+
}); err != nil {
1546+
return err
1547+
}
1548+
if err := tx.Commit(); err != nil {
1549+
return err
1550+
}
1551+
return nil
1552+
}
1553+
1554+
func deleteTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.TableSpec) error {
1555+
if len(spec.NamePath) != 3 {
1556+
return fmt.Errorf("unexpected table name path: %v", spec.NamePath)
1557+
}
1558+
projectID := spec.NamePath[0]
1559+
datasetID := spec.NamePath[1]
1560+
tableID := spec.NamePath[2]
1561+
project, err := server.metaRepo.FindProject(ctx, projectID)
1562+
if err != nil {
1563+
return err
1564+
}
1565+
dataset := project.Dataset(datasetID)
1566+
if dataset == nil {
1567+
return fmt.Errorf("dataset %s is not found", datasetID)
1568+
}
1569+
table := dataset.Table(tableID)
1570+
conn, err := server.connMgr.Connection(ctx, projectID, datasetID)
1571+
if err != nil {
1572+
return err
1573+
}
1574+
tx, err := conn.Begin(ctx)
1575+
if err != nil {
1576+
return err
1577+
}
1578+
defer tx.RollbackIfNotCommitted()
1579+
if err := table.Delete(ctx, tx.Tx()); err != nil {
1580+
return err
1581+
}
1582+
if err := tx.Commit(); err != nil {
1583+
return err
1584+
}
1585+
return nil
1586+
}
1587+
14851588
func (h *jobsInsertHandler) addQueryResultToDynamicDestinationTable(ctx context.Context, tx *connection.Tx, r *jobsInsertRequest, response *internaltypes.QueryResponse) error {
14861589
projectID := r.project.ID
14871590
jobID := r.job.JobReference.JobId
@@ -1618,6 +1721,11 @@ func (h *jobsQueryHandler) Handle(ctx context.Context, r *jobsQueryRequest) (*in
16181721
if err := tx.Commit(); err != nil {
16191722
return nil, err
16201723
}
1724+
if response.ChangedCatalog.Changed() {
1725+
if err := syncCatalog(ctx, r.server, response.ChangedCatalog); err != nil {
1726+
return nil, err
1727+
}
1728+
}
16211729
}
16221730
jobID := r.queryRequest.RequestId
16231731
if jobID == "" {
@@ -2434,10 +2542,9 @@ const (
24342542
SnapshotTableType TableType = "SNAPSHOT"
24352543
)
24362544

2437-
func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest) (*bigqueryv2.Table, *ServerError) {
2438-
table := r.table
2545+
func createTableMetadata(ctx context.Context, tx *connection.Tx, server *Server, project *metadata.Project, dataset *metadata.Dataset, table *bigqueryv2.Table) (*bigqueryv2.Table, *ServerError) {
24392546
now := time.Now().Unix()
2440-
table.Id = fmt.Sprintf("%s:%s.%s", r.project.ID, r.dataset.ID, r.table.TableReference.TableId)
2547+
table.Id = fmt.Sprintf("%s:%s.%s", project.ID, dataset.ID, table.TableReference.TableId)
24412548
table.CreationTime = now
24422549
table.LastModifiedTime = uint64(now)
24432550
table.Type = string(DefaultTableType) // TODO: need to handle other table types
@@ -2447,10 +2554,10 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
24472554
table.Kind = "bigquery#table"
24482555
table.SelfLink = fmt.Sprintf(
24492556
"http://%s/bigquery/v2/projects/%s/datasets/%s/tables/%s",
2450-
r.server.httpServer.Addr,
2451-
r.project.ID,
2452-
r.dataset.ID,
2453-
r.table.TableReference.TableId,
2557+
server.httpServer.Addr,
2558+
project.ID,
2559+
dataset.ID,
2560+
table.TableReference.TableId,
24542561
)
24552562
encodedTableData, err := json.Marshal(table)
24562563
if err != nil {
@@ -2460,24 +2567,14 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
24602567
if err := json.Unmarshal(encodedTableData, &tableMetadata); err != nil {
24612568
return nil, errInternalError(err.Error())
24622569
}
2463-
2464-
conn, err := r.server.connMgr.Connection(ctx, r.project.ID, r.dataset.ID)
2465-
if err != nil {
2466-
return nil, errInternalError(err.Error())
2467-
}
2468-
tx, err := conn.Begin(ctx)
2469-
if err != nil {
2470-
return nil, errInternalError(err.Error())
2471-
}
2472-
defer tx.RollbackIfNotCommitted()
2473-
if err := r.dataset.AddTable(
2570+
if err := dataset.AddTable(
24742571
ctx,
24752572
tx.Tx(),
24762573
metadata.NewTable(
2477-
r.server.metaRepo,
2478-
r.project.ID,
2479-
r.dataset.ID,
2480-
r.table.TableReference.TableId,
2574+
server.metaRepo,
2575+
project.ID,
2576+
dataset.ID,
2577+
table.TableReference.TableId,
24812578
tableMetadata,
24822579
),
24832580
); err != nil {
@@ -2486,6 +2583,24 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
24862583
}
24872584
return nil, errInternalError(err.Error())
24882585
}
2586+
return table, nil
2587+
}
2588+
2589+
func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest) (*bigqueryv2.Table, *ServerError) {
2590+
conn, err := r.server.connMgr.Connection(ctx, r.project.ID, r.dataset.ID)
2591+
if err != nil {
2592+
return nil, errInternalError(err.Error())
2593+
}
2594+
tx, err := conn.Begin(ctx)
2595+
if err != nil {
2596+
return nil, errInternalError(err.Error())
2597+
}
2598+
defer tx.RollbackIfNotCommitted()
2599+
2600+
table, serverErr := createTableMetadata(ctx, tx, r.server, r.project, r.dataset, r.table)
2601+
if serverErr != nil {
2602+
return nil, serverErr
2603+
}
24892604
if r.table.Schema != nil {
24902605
if err := r.server.contentRepo.CreateTable(ctx, tx, r.table); err != nil {
24912606
return nil, errInternalError(err.Error())

server/middleware.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"compress/gzip"
55
"fmt"
66
"net/http"
7+
"runtime"
78
"sync"
89

910
"github.com/gorilla/mux"
@@ -30,6 +31,15 @@ func recoveryMiddleware(s *Server) func(http.Handler) http.Handler {
3031
if err := recover(); err != nil {
3132
ctx := logger.WithLogger(r.Context(), s.logger)
3233
errorResponse(ctx, w, errInternalError(fmt.Sprintf("%+v", err)))
34+
var frame int = 1
35+
for {
36+
_, file, line, ok := runtime.Caller(frame)
37+
if !ok {
38+
break
39+
}
40+
s.logger.Error(fmt.Sprintf("%d: %v:%d", frame, file, line))
41+
frame++
42+
}
3343
return
3444
}
3545
}()

0 commit comments

Comments
 (0)