Skip to content

Commit 77d805b

Browse files
mysql.go: data may not consistent when no pk but has uk (#421) (#446)
* mysql.go: data may not consistent when no pk but has uk cause by drainer using uk as where to update (where a1 = * and a3 is NULL), may update the wrong row when any column is null, it will not be index, so we can't use it unless all column of index is not null
1 parent 5eab5bb commit 77d805b

File tree

4 files changed

+110
-21
lines changed

4 files changed

+110
-21
lines changed

drainer/translator/mysql.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,30 @@ func (m *mysqlTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (
308308

309309
func (m *mysqlTranslator) genWhere(table *model.TableInfo, columns []*model.ColumnInfo, data []interface{}) (string, []interface{}, error) {
310310
var kvs bytes.Buffer
311+
312+
check := func(ucs []*model.ColumnInfo) bool {
313+
ucsMap := make(map[int64]*model.ColumnInfo)
314+
for _, col := range ucs {
315+
ucsMap[col.ID] = col
316+
}
317+
318+
for i, col := range columns {
319+
_, ok := ucsMap[col.ID]
320+
if !ok {
321+
continue
322+
}
323+
324+
// set to false, so we use all column as where condition
325+
if data[i] == nil {
326+
return false
327+
}
328+
}
329+
return true
330+
}
331+
311332
// if has unique key, use it to construct where condition
312-
ucs, err := m.uniqueIndexColumns(table, false)
333+
ucs, err := m.uniqueIndexColumns(table, check)
334+
313335
if err != nil {
314336
return "", nil, errors.Trace(err)
315337
}
@@ -384,7 +406,8 @@ func (m *mysqlTranslator) pkHandleColumn(table *model.TableInfo) *model.ColumnIn
384406
return nil
385407
}
386408

387-
func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll bool) ([]*model.ColumnInfo, error) {
409+
// return primary key columns or any unique index columns which check return true
410+
func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, check func([]*model.ColumnInfo) bool) ([]*model.ColumnInfo, error) {
388411
// pkHandleCol may in table.Indices, use map to keep olny one same key.
389412
uniqueColsMap := make(map[string]interface{})
390413
uniqueCols := make([]*model.ColumnInfo, 0, 2)
@@ -394,18 +417,29 @@ func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll boo
394417
uniqueColsMap[pkHandleCol.Name.O] = pkHandleCol
395418
uniqueCols = append(uniqueCols, pkHandleCol)
396419

397-
if !findAll {
398-
return uniqueCols, nil
399-
}
420+
return uniqueCols, nil
400421
}
401422

402423
columns := make(map[string]*model.ColumnInfo)
403424
for _, col := range table.Columns {
404425
columns[col.Name.O] = col
405426
}
406427

407-
for _, idx := range table.Indices {
428+
// put primary key at [0], so we get primary key first if table has primary key
429+
indices := make([]*model.IndexInfo, len(table.Indices))
430+
copy(indices, table.Indices)
431+
for i := 0; i < len(indices); i++ {
432+
if indices[i].Primary {
433+
indices[i], indices[0] = indices[0], indices[i]
434+
break
435+
}
436+
}
437+
438+
for _, idx := range indices {
408439
if idx.Primary || idx.Unique {
440+
uniqueCols = uniqueCols[:0]
441+
// why need this? unique index should has no duplicate column
442+
uniqueColsMap = make(map[string]interface{})
409443
for _, col := range idx.Columns {
410444
if column, ok := columns[col.Name.O]; ok {
411445
if _, ok := uniqueColsMap[col.Name.O]; !ok {
@@ -419,13 +453,13 @@ func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll boo
419453
return nil, errors.New("primay/unique index is empty, but should not be empty")
420454
}
421455

422-
if !findAll {
456+
if check == nil || check(uniqueCols) {
423457
return uniqueCols, nil
424458
}
425459
}
426460
}
427461

428-
return uniqueCols, nil
462+
return uniqueCols[:0], nil
429463
}
430464

431465
func (m *mysqlTranslator) getIndexColumns(table *model.TableInfo) (indexColumns [][]*model.ColumnInfo, err error) {

drainer/translator/mysql_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ func (t *testTranslatorSuite) TestPkHandleColumn(c *C) {
4040
func (t *testTranslatorSuite) TestPkIndexColumns(c *C) {
4141
m := testGenMysqlTranslator(c)
4242
table := testGenTable("hasPK")
43-
cols, err := m.uniqueIndexColumns(table, true)
43+
cols, err := m.uniqueIndexColumns(table, nil)
4444
c.Assert(err, IsNil)
4545
c.Assert(len(cols), Equals, 2)
4646

4747
table = testGenTable("hasID")
48-
cols, err = m.uniqueIndexColumns(table, true)
48+
cols, err = m.uniqueIndexColumns(table, nil)
4949
c.Assert(err, IsNil)
5050
c.Assert(len(cols), Equals, 1)
5151

5252
table = testGenTable("normal")
53-
cols, err = m.uniqueIndexColumns(table, true)
53+
cols, err = m.uniqueIndexColumns(table, nil)
5454
c.Assert(err, IsNil)
5555
c.Assert(len(cols), Equals, 0)
5656
}

tests/dailytest/case.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ var case1Clean = []string{`
6565
drop table binlog_case1`,
6666
}
6767

68+
// https://internal.pingcap.net/jira/browse/TOOL-714
69+
var case2 = []string{`
70+
create table binlog_case2 (id int, a1 int, a3 int, unique key dex1(a1, a3));
71+
`,
72+
`
73+
insert into binlog_case2(id, a1, a3) values(1, 1, NULL);
74+
`,
75+
`
76+
insert into binlog_case2(id, a1, a3) values(2, 1, NULL);
77+
`,
78+
`
79+
update binlog_case2 set id = 10 where id = 1;
80+
`,
81+
`
82+
update binlog_case2 set id = 100 where id = 10;
83+
`,
84+
}
85+
86+
var case2Clean = []string{`
87+
drop table binlog_case2`,
88+
}
89+
6890
// RunCase run some simple test case
6991
func RunCase(cfg *diff.Config, src *sql.DB, dst *sql.DB) {
7092
RunTest(cfg, src, dst, func(src *sql.DB) {
@@ -82,6 +104,22 @@ func RunCase(cfg *diff.Config, src *sql.DB, dst *sql.DB) {
82104
}
83105
})
84106

107+
// run case2
108+
RunTest(cfg, src, dst, func(src *sql.DB) {
109+
err := execSQLs(src, case2)
110+
if err != nil {
111+
log.Fatal(err)
112+
}
113+
})
114+
115+
// clean table
116+
RunTest(cfg, src, dst, func(src *sql.DB) {
117+
err := execSQLs(src, case2Clean)
118+
if err != nil {
119+
log.Fatal(err)
120+
}
121+
})
122+
85123
// test big binlog msg
86124
RunTest(cfg, src, dst, func(src *sql.DB) {
87125
_, err := src.Query("create table binlog_big(id int primary key, data longtext);")

tests/kafka/kafka.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,21 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
147147
sqlArgs = append(sqlArgs, args)
148148
}
149149

150-
constructWhere := func() (sql string, usePK bool) {
150+
constructWhere := func(args []interface{}) (sql string, usePK bool) {
151151
var whereColumns []string
152-
for _, col := range table.GetColumnInfo() {
152+
var whereArgs []interface{}
153+
for i, col := range table.GetColumnInfo() {
153154
if col.GetIsPrimaryKey() {
154155
whereColumns = append(whereColumns, col.GetName())
156+
whereArgs = append(whereArgs, args[i])
155157
usePK = true
156158
}
157159
}
158160
// no primary key
159161
if len(whereColumns) == 0 {
160-
for _, col := range table.GetColumnInfo() {
162+
for i, col := range table.GetColumnInfo() {
161163
whereColumns = append(whereColumns, col.GetName())
164+
whereArgs = append(whereArgs, args[i])
162165
}
163166
}
164167

@@ -168,16 +171,18 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
168171
sql += " and "
169172
}
170173

171-
sql += fmt.Sprintf("%s = ? ", col)
174+
if whereArgs[i] == nil {
175+
sql += fmt.Sprintf("%s IS NULL ", col)
176+
} else {
177+
sql += fmt.Sprintf("%s = ? ", col)
178+
}
172179
}
173180

174181
sql += " limit 1"
175182

176183
return
177184
}
178185

179-
where, usePK := constructWhere()
180-
181186
for _, mutation := range table.Mutations {
182187
switch mutation.GetType() {
183188
case pb.MutationType_Insert:
@@ -193,8 +198,6 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
193198
sql += fmt.Sprintf("%s = ? ", col.Name)
194199
}
195200

196-
sql += where
197-
198201
row := mutation.Row
199202
changedRow := mutation.ChangeRow
200203

@@ -204,8 +207,14 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
204207
args = append(args, columnToArg(col))
205208
}
206209

210+
where, usePK := constructWhere(args)
211+
sql += where
212+
207213
// for where
208214
for i, col := range changedRow.GetColumns() {
215+
if columnToArg(col) == nil {
216+
continue
217+
}
209218
if !usePK || columnInfo[i].GetIsPrimaryKey() {
210219
args = append(args, columnToArg(col))
211220
}
@@ -216,13 +225,21 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
216225

217226
case pb.MutationType_Delete:
218227
columnInfo := table.GetColumnInfo()
219-
where, usePK := constructWhere()
228+
row := mutation.Row
229+
230+
var values []interface{}
231+
for _, col := range row.GetColumns() {
232+
values = append(values, columnToArg(col))
233+
}
234+
where, usePK := constructWhere(values)
220235

221236
sql := fmt.Sprintf("delete from `%s`.`%s` %s", table.GetSchemaName(), table.GetTableName(), where)
222237

223-
row := mutation.Row
224238
var args []interface{}
225239
for i, col := range row.GetColumns() {
240+
if columnToArg(col) == nil {
241+
continue
242+
}
226243
if !usePK || columnInfo[i].GetIsPrimaryKey() {
227244
args = append(args, columnToArg(col))
228245
}

0 commit comments

Comments
 (0)