Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ type RowData []interface{}
// https://github.com/Shopify/ghostferry/issues/165.
//
// In summary:
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
func (r RowData) GetUint64(colIdx int) (uint64, error) {
u64, ok := Uint64Value(r[colIdx])
if ok {
Expand Down Expand Up @@ -168,14 +168,15 @@ func (e *BinlogInsertEvent) NewValues() RowData {
}

func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) {
if err := verifyValuesHasTheSameLengthAsColumns(e.table, e.newValues); err != nil {
filteredNewValues, err := e.table.FilterGeneratedColumnsOnRowData(e.newValues)
if err != nil {
return "", err
}

query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(quotedColumnNames(e.table), ",") + ")" +
" VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")"
" VALUES (" + buildStringListForValues(e.table, filteredNewValues) + ")"

return query, nil
}
Expand Down Expand Up @@ -227,8 +228,8 @@ func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, e
}

query := "UPDATE " + QuotedTableNameFromString(schemaName, tableName) +
" SET " + buildStringMapForSet(e.table.Columns, e.newValues) +
" WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues)
" SET " + buildStringMapForSet(e.table, e.newValues) +
" WHERE " + buildStringMapForWhere(e.table, e.oldValues)

return query, nil
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, e
}

query := "DELETE FROM " + QuotedTableNameFromString(schemaName, tableName) +
" WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues)
" WHERE " + buildStringMapForWhere(e.table, e.oldValues)

return query, nil
}
Expand All @@ -281,16 +282,22 @@ func (e *BinlogDeleteEvent) PaginationKey() (string, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
if len(row) != len(table.Columns) {
for _, rawRow := range rowsEvent.Rows {
if len(rawRow) != len(table.Columns) {
return nil, fmt.Errorf(
"table %s.%s has %d columns but event has %d columns instead",
table.Schema,
table.Name,
len(table.Columns),
len(row),
len(rawRow),
)
}

row, err := table.FilterGeneratedColumnsOnRowData(rawRow)
if err != nil {
return nil, err
}

for i, col := range table.Columns {
if col.IsUnsigned {
switch v := row[i].(type) {
Expand Down Expand Up @@ -324,9 +331,9 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
}

func quotedColumnNames(table *TableSchema) []string {
cols := make([]string, len(table.Columns))
for i, column := range table.Columns {
cols[i] = QuoteField(column.Name)
cols := make([]string, 0, len(table.Columns))
for _, name := range table.NonGeneratedColumnNames() {
cols = append(cols, QuoteField(name))
}

return cols
Expand All @@ -347,53 +354,62 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData
return nil
}

func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string {
func buildStringListForValues(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
if table.IsColumnIndexGenerated(i) {
continue
}
if len(buffer) > 0 {
buffer = append(buffer, ',')
}

buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}

return string(buffer)
}

func buildStringMapForWhere(columns []schema.TableColumn, values []interface{}) string {
func buildStringMapForWhere(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
if table.IsColumnIndexGenerated(i) {
continue
}
if len(buffer) > 0 {
buffer = append(buffer, " AND "...)
}

buffer = append(buffer, QuoteField(columns[i].Name)...)
buffer = append(buffer, QuoteField(table.Columns[i].Name)...)

if isNilValue(value) {
// "WHERE value = NULL" will never match rows.
buffer = append(buffer, " IS NULL"...)
} else {
buffer = append(buffer, '=')
buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}
}

return string(buffer)
}

func buildStringMapForSet(columns []schema.TableColumn, values []interface{}) string {
func buildStringMapForSet(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
if table.IsColumnIndexGenerated(i) {
continue
}
if len(buffer) > 0 {
buffer = append(buffer, ',')
}

buffer = append(buffer, QuoteField(columns[i].Name)...)
buffer = append(buffer, QuoteField(table.Columns[i].Name)...)
buffer = append(buffer, '=')
buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}

return string(buffer)
Expand Down Expand Up @@ -504,10 +520,10 @@ func Int64Value(value interface{}) (int64, bool) {
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {
Expand Down
16 changes: 10 additions & 6 deletions row_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,27 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface
return "", nil, err
}

valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
filteredColumns := e.table.NonGeneratedColumnNames()

valuesStr := "(" + strings.Repeat("?,", len(filteredColumns)-1) + "?)"
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr

query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
" (" + strings.Join(filteredColumns, ",") + ") VALUES " + valuesStr

return query, e.flattenRowData(), nil
}

func (e *RowBatch) flattenRowData() []interface{} {
rowSize := len(e.values[0])
flattened := make([]interface{}, rowSize*len(e.values))
flattened := make([]interface{}, 0, len(e.values))

for rowIdx, row := range e.values {
for _, row := range e.values {
for colIdx, col := range row {
flattened[rowIdx*rowSize+colIdx] = col
if e.table.IsColumnIndexGenerated(colIdx) {
continue
}
flattened = append(flattened, col)
}
}

Expand Down
115 changes: 103 additions & 12 deletions table_schema_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,89 @@ type TableSchema struct {
rowMd5Query string
}

// IsColumnGenerated evaluates wheter a go_myslq.schema.TableColumn is generated or not.
func IsColumnGenerated(tc *schema.TableColumn) bool {
return tc.IsVirtual || tc.IsStored
}

// IsColumnIndexGenerated evaluetes whether a TableSchema column is generated, by index.
func (t *TableSchema) IsColumnIndexGenerated(idx int) bool {
return IsColumnGenerated(&t.Columns[idx])
}

// Evaluates whether a TableSchema column is generated, by name.
func (t *TableSchema) IsColumnNameGenerated(name string) bool {
for _, col := range t.Columns {
if name == col.Name && IsColumnGenerated(&col) {
return true
}
}

return false
}

// Returns a count of total, generated and non-generated columns for a TableSchema.
func (t *TableSchema) ColumnsCount() (int, int, int) {
var generated int

for _, col := range t.Columns {
if IsColumnGenerated(&col) {
generated += 1
}
}

return len(t.Columns), generated, len(t.Columns) - generated
}

// Returns a list of all non-generated column nams for a TableSchema, in schema order.
func (t *TableSchema) NonGeneratedColumnNames() []string {
res := make([]string, 0, len(t.Columns))

for _, col := range t.Columns {
if IsColumnGenerated(&col) {
continue
}
res = append(res, col.Name)
}

return res
}

// FilterGeneratedColumnsOnRowData takes a row (as slice of RowData elements) and returns
// a copy with elements for generated columns removed.
func (t *TableSchema) FilterGeneratedColumnsOnRowData(row []interface{}) ([]interface{}, error) {
columnsCount, _, nonGeneratedColumnsCount := t.ColumnsCount()

if len(row) != columnsCount {
return nil, fmt.Errorf(
"table %s.%s has %d columns but row has %d columns instead",
t.Schema,
t.Name,
columnsCount,
len(row),
)
}

res := make([]interface{}, 0, len(row))
for i, val := range row {
if t.IsColumnIndexGenerated(i) {
continue
}
res = append(res, val)
}

if len(res) != nonGeneratedColumnsCount {
return nil, fmt.Errorf(
"table %s.%s has %d updatable columns but processed row has %d updatable columns instead",
t.Schema,
t.Name,
nonGeneratedColumnsCount,
len(res),
)
}
return res, nil
}

// This query returns the MD5 hash for a row on this table. This query is valid
// for both the source and the target shard.
//
Expand Down Expand Up @@ -90,11 +173,12 @@ func (t *TableSchema) RowMd5Query() string {
}

columns := make([]schema.TableColumn, 0, len(t.Columns))
for _, column := range t.Columns {
for i, column := range t.Columns {
_, isCompressed := t.CompressedColumnsForVerification[column.Name]
_, isIgnored := t.IgnoredColumnsForVerification[column.Name]
isGenerated := t.IsColumnIndexGenerated(i)

if isCompressed || isIgnored {
if isCompressed || isIgnored || isGenerated {
continue
}

Expand Down Expand Up @@ -151,6 +235,19 @@ func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry)
return tablesWithData, emptyTables, nil
}

// removeInvisibleIndeces removes all invisible idx references from a go_mysql.schema.Table.
func removeInvisibleIndeces(ts *schema.Table) {
j := 0
for i, index := range ts.Indexes {
if !index.Visible {
continue
}
ts.Indexes[j] = ts.Indexes[i]
j++
}
ts.Indexes = ts.Indexes[:j]
}

func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error) {
logger := logrus.WithField("tag", "table_schema_cache")

Expand Down Expand Up @@ -189,14 +286,8 @@ func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig Col
return tableSchemaCache, err
}

// Filter out invisible indexes
visibleIndexes := make([]*schema.Index, 0, len(tableSchema.Indexes))
for _, index := range tableSchema.Indexes {
if index.Visible {
visibleIndexes = append(visibleIndexes, index)
}
}
tableSchema.Indexes = visibleIndexes
// filter out unwanted indeces and columns
removeInvisibleIndeces(tableSchema)

tableSchemas = append(tableSchemas, &TableSchema{
Table: tableSchema,
Expand Down Expand Up @@ -448,7 +539,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
if err != nil {
break
}

var binValue []byte
switch v := val.(type) {
case []byte:
Expand All @@ -458,7 +549,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro
default:
err = fmt.Errorf("expected binary/string for max key, got %T", val)
}

if err == nil {
result = NewBinaryKeyWithColumn(primaryKeyColumn.Name, binValue)
}
Expand Down
Loading
Loading