Skip to content

Commit 41747a7

Browse files
crazycs520zz-jason
authored andcommitted
ddl: fix cancel drop table/database ddl job bug. (#8537) (#9509)
1 parent 0bd6b1b commit 41747a7

File tree

6 files changed

+145
-19
lines changed

6 files changed

+145
-19
lines changed

ddl/db_test.go

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,6 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
637637
for i := 0; i < 5; i++ {
638638
s.mustExec(c, "insert into t values (?, ?)", i, i)
639639
}
640-
641640
testCases := []struct {
642641
needAddIndex bool
643642
jobState model.JobState
@@ -650,7 +649,6 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
650649
{false, model.JobStateRunning, model.StateDeleteOnly, false},
651650
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
652651
}
653-
654652
var checkErr error
655653
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
656654
ddl.ReorgWaitTimeout = 50 * time.Millisecond
@@ -679,12 +677,10 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
679677
checkErr = errors.Trace(err)
680678
return
681679
}
682-
683680
if errs[0] != nil {
684681
checkErr = errors.Trace(errs[0])
685682
return
686683
}
687-
688684
checkErr = txn.Commit(context.Background())
689685
}
690686
}
@@ -698,22 +694,19 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
698694
if rs != nil {
699695
rs.Close()
700696
}
701-
702697
t := s.testGetTable(c, "t")
703698
indexInfo := schemautil.FindIndexByName("idx_c2", t.Meta().Indices)
704699
if testCase.cancelSucc {
705700
c.Assert(checkErr, IsNil)
706701
c.Assert(err, NotNil)
707702
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
708-
709703
c.Assert(indexInfo, NotNil)
710704
c.Assert(indexInfo.State, Equals, model.StatePublic)
711705
} else {
712706
err1 := admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID)
713707
c.Assert(err, IsNil)
714708
c.Assert(checkErr, NotNil)
715709
c.Assert(checkErr.Error(), Equals, err1.Error())
716-
717710
c.Assert(indexInfo, IsNil)
718711
}
719712
}
@@ -722,6 +715,97 @@ func (s *testDBSuite) TestCancelDropIndex(c *C) {
722715
s.mustExec(c, "alter table t drop index idx_c2")
723716
}
724717

718+
// TestCancelDropTable tests cancel ddl job which type is drop table.
719+
func (s *testDBSuite) TestCancelDropTableAndSchema(c *C) {
720+
s.tk = testkit.NewTestKit(c, s.store)
721+
testCases := []struct {
722+
needAddTableOrDB bool
723+
action model.ActionType
724+
jobState model.JobState
725+
JobSchemaState model.SchemaState
726+
cancelSucc bool
727+
}{
728+
// Check drop table.
729+
// model.JobStateNone means the jobs is canceled before the first run.
730+
{true, model.ActionDropTable, model.JobStateNone, model.StateNone, true},
731+
{false, model.ActionDropTable, model.JobStateRunning, model.StateWriteOnly, false},
732+
{true, model.ActionDropTable, model.JobStateRunning, model.StateDeleteOnly, false},
733+
734+
// Check drop database.
735+
{true, model.ActionDropSchema, model.JobStateNone, model.StateNone, true},
736+
{false, model.ActionDropSchema, model.JobStateRunning, model.StateWriteOnly, false},
737+
{true, model.ActionDropSchema, model.JobStateRunning, model.StateDeleteOnly, false},
738+
}
739+
var checkErr error
740+
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
741+
ddl.ReorgWaitTimeout = 50 * time.Millisecond
742+
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
743+
hook := &ddl.TestDDLCallback{}
744+
var jobID int64
745+
testCase := &testCases[0]
746+
hook.OnJobRunBeforeExported = func(job *model.Job) {
747+
if job.Type == testCase.action && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
748+
jobIDs := []int64{job.ID}
749+
jobID = job.ID
750+
hookCtx := mock.NewContext()
751+
hookCtx.Store = s.store
752+
err := hookCtx.NewTxn()
753+
if err != nil {
754+
checkErr = errors.Trace(err)
755+
return
756+
}
757+
txn, err := hookCtx.Txn(true)
758+
if err != nil {
759+
checkErr = errors.Trace(err)
760+
return
761+
}
762+
errs, err := admin.CancelJobs(txn, jobIDs)
763+
if err != nil {
764+
checkErr = errors.Trace(err)
765+
return
766+
}
767+
if errs[0] != nil {
768+
checkErr = errors.Trace(errs[0])
769+
return
770+
}
771+
checkErr = txn.Commit(context.Background())
772+
}
773+
}
774+
originHook := s.dom.DDL().(ddl.DDLForTest).GetHook()
775+
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
776+
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
777+
var err error
778+
sql := ""
779+
for i := range testCases {
780+
testCase = &testCases[i]
781+
if testCase.needAddTableOrDB {
782+
s.mustExec(c, "create database if not exists test_drop_db")
783+
s.mustExec(c, "use test_drop_db")
784+
s.mustExec(c, "create table if not exists t(c1 int, c2 int)")
785+
}
786+
787+
if testCase.action == model.ActionDropTable {
788+
sql = "drop table t;"
789+
} else if testCase.action == model.ActionDropSchema {
790+
sql = "drop database test_drop_db;"
791+
}
792+
793+
_, err = s.tk.Exec(sql)
794+
if testCase.cancelSucc {
795+
c.Assert(checkErr, IsNil)
796+
c.Assert(err, NotNil)
797+
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
798+
s.mustExec(c, "insert into t values (?, ?)", i, i)
799+
} else {
800+
c.Assert(err, IsNil)
801+
c.Assert(checkErr, NotNil)
802+
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
803+
_, err = s.tk.Exec("insert into t values (?, ?)", i, i)
804+
c.Assert(err, NotNil)
805+
}
806+
}
807+
}
808+
725809
func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
726810
s.tk = testkit.NewTestKit(c, s.store)
727811
s.tk.MustExec("use " + s.schemaName)

ddl/ddl_worker_test.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
303303
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
304304
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
305305
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
306-
if job.SchemaState == model.StateNone && job.State != model.JobStateDone {
306+
if job.SchemaState == model.StateNone && job.State != model.JobStateDone && job.Type != model.ActionCreateTable && job.Type != model.ActionCreateSchema {
307307
// If the schema state is none, we only test the job is finished.
308308
} else {
309309
errs, err := admin.CancelJobs(txn, test.jobIDs)
@@ -340,17 +340,19 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
340340

341341
// Test cancel drop index job , see TestCancelDropIndex.
342342

343-
// TODO: add create table back after we fix the cancel bug.
344-
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
345-
346343
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 5}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
347344
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 6}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
348345
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
349346
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StatePublic, ddlRetErr: err},
350347

351-
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 9)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
352-
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 10)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
353-
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 11)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
348+
// Test create table, watch out, table id will alloc a globalID.
349+
{act: model.ActionCreateTable, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
350+
// Test create database, watch out, database id will alloc a globalID.
351+
{act: model.ActionCreateSchema, jobIDs: []int64{firstID + 12}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
352+
353+
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
354+
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
355+
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
354356
}
355357

356358
return tests
@@ -516,22 +518,36 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
516518
c.Check(errors.ErrorStack(checkErr), Equals, "")
517519
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
518520

519-
// for drop column.
521+
// for create table
522+
tblInfo1 := testTableInfo(c, d, "t1", 2)
520523
test = &tests[8]
524+
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo1.ID, model.ActionCreateTable, []interface{}{tblInfo1}, &cancelState)
525+
c.Check(checkErr, IsNil)
526+
testCheckTableState(c, d, dbInfo, tblInfo1, model.StateNone)
527+
528+
// for create database
529+
dbInfo1 := testSchemaInfo(c, d, "test_cancel_job1")
530+
test = &tests[9]
531+
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo1.ID, 0, model.ActionCreateSchema, []interface{}{dbInfo1}, &cancelState)
532+
c.Check(checkErr, IsNil)
533+
testCheckSchemaState(c, d, dbInfo1, model.StateNone)
534+
535+
// for drop column.
536+
test = &tests[10]
521537
dropColName := "c3"
522538
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
523539
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
524540
c.Check(errors.ErrorStack(checkErr), Equals, "")
525541
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)
526542

527-
test = &tests[9]
543+
test = &tests[11]
528544
dropColName = "c4"
529545
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
530546
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
531547
c.Check(errors.ErrorStack(checkErr), Equals, "")
532548
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)
533549

534-
test = &tests[10]
550+
test = &tests[12]
535551
dropColName = "c5"
536552
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
537553
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)

ddl/rollingback.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
266266
ver, err = rollingbackTruncateTable(t, job)
267267
case model.ActionDropIndex:
268268
ver, err = rollingbackDropIndex(t, job)
269+
case model.ActionDropTable, model.ActionDropSchema:
270+
job.State = model.JobStateRollingback
269271
default:
270272
job.State = model.JobStateCancelled
271273
err = errCancelledDDLJob

ddl/schema.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,20 @@ func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
8080
return ver, infoschema.ErrDatabaseDropExists.GenWithStackByArgs("")
8181
}
8282

83+
if job.IsRollingback() {
84+
// To simplify the rollback logic, cannot be canceled after job start to run.
85+
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
86+
if dbInfo.State == model.StatePublic {
87+
job.State = model.JobStateCancelled
88+
return ver, errCancelledDDLJob
89+
}
90+
job.State = model.JobStateRunning
91+
}
92+
8393
ver, err = updateSchemaVersion(t, job)
8494
if err != nil {
8595
return ver, errors.Trace(err)
8696
}
87-
8897
switch dbInfo.State {
8998
case model.StatePublic:
9099
// public -> write only

ddl/table.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
8383
func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
8484
schemaID := job.SchemaID
8585
tableID := job.TableID
86-
8786
// Check this table's database.
8887
tblInfo, err := t.GetTable(schemaID, tableID)
8988
if err != nil {
@@ -105,6 +104,16 @@ func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
105104
))
106105
}
107106

107+
if job.IsRollingback() {
108+
// To simplify the rollback logic, cannot be canceled after job start to run.
109+
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
110+
if tblInfo.State == model.StatePublic {
111+
job.State = model.JobStateCancelled
112+
return ver, errCancelledDDLJob
113+
}
114+
job.State = model.JobStateRunning
115+
}
116+
108117
originalState := job.SchemaState
109118
switch tblInfo.State {
110119
case model.StatePublic:

util/admin/admin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ func isJobRollbackable(job *model.Job, id int64) error {
107107
job.SchemaState == model.StateDeleteReorganization {
108108
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
109109
}
110+
case model.ActionDropSchema, model.ActionDropTable:
111+
// To simplify the rollback logic, cannot be canceled in the following states.
112+
if job.SchemaState == model.StateWriteOnly ||
113+
job.SchemaState == model.StateDeleteOnly {
114+
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
115+
}
110116
}
111117
return nil
112118
}

0 commit comments

Comments
 (0)