Skip to content

Commit ba2be49

Browse files
authored
master(dm): avoid slow OpenAPI delete on unavailable downstream (pingcap#12563)
close pingcap#12562
1 parent a00349b commit ba2be49

File tree

5 files changed

+185
-33
lines changed

5 files changed

+185
-33
lines changed

dm/master/openapi_controller.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"encoding/json"
2424
"fmt"
2525
"strings"
26+
"time"
2627

2728
"github.com/pingcap/log"
2829
"github.com/pingcap/tiflow/dm/checker"
@@ -39,6 +40,8 @@ import (
3940
"go.uber.org/zap"
4041
)
4142

43+
var openAPIDeleteTaskDownstreamTimeout = 10 * time.Second
44+
4245
// nolint:unparam
4346
func (s *Server) getClusterInfo(ctx context.Context) (*openapi.GetClusterInfoResponse, error) {
4447
info := &openapi.GetClusterInfoResponse{}
@@ -459,33 +462,29 @@ func (s *Server) deleteTask(ctx context.Context, taskName string, force bool) er
459462
if err != nil {
460463
return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName)
461464
}
462-
defer release()
463-
464-
ignoreCannotConnectError := func(err error) bool {
465-
if err == nil {
466-
return true
467-
}
468-
if force && strings.Contains(err.Error(), "connect: connection refused") {
469-
log.L().Warn("connect downstream error when fore delete task", zap.Error(err))
470-
return true
465+
released := false
466+
defer func() {
467+
if !released {
468+
release()
471469
}
472-
return false
470+
}()
471+
472+
metaSchema := *task.MetaSchema
473+
if err = s.removeInternalMetaData(taskName); err != nil {
474+
return terror.Annotate(err, "while removing metadata")
473475
}
474476

475477
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
476-
if adjustErr := AdjustTargetDBSessionCfg(ctx, toDBCfg); adjustErr != nil {
477-
if !ignoreCannotConnectError(adjustErr) {
478-
return adjustErr
479-
}
480-
}
481-
metaSchema := *task.MetaSchema
482-
err = s.removeMetaData(ctx, taskName, metaSchema, toDBCfg)
483-
if err != nil {
484-
if !ignoreCannotConnectError(err) {
485-
return terror.Annotate(err, "while removing metadata")
486-
}
478+
// Bound the entire downstream cleanup flow, not just the initial connection setup.
479+
cleanupCtx, cleanupCancel := context.WithTimeout(ctx, openAPIDeleteTaskDownstreamTimeout)
480+
defer cleanupCancel()
481+
if adjustErr := AdjustTargetDBSessionCfgWithTimeout(cleanupCtx, toDBCfg, openAPIDeleteTaskDownstreamTimeout); adjustErr != nil {
482+
log.L().Warn("skip downstream metadata cleanup when deleting task", zap.String("task", taskName), zap.Error(adjustErr))
483+
} else if err = s.removeDownstreamMetaData(cleanupCtx, taskName, metaSchema, toDBCfg, openAPIDeleteTaskDownstreamTimeout); err != nil {
484+
log.L().Warn("failed to remove downstream metadata when deleting task", zap.String("task", taskName), zap.Error(err))
487485
}
488486
release()
487+
released = true
489488
sourceNameList := s.getTaskSourceNameList(taskName)
490489
// delete subtask on worker
491490
return s.scheduler.RemoveSubTasks(taskName, sourceNameList...)

dm/master/openapi_controller_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"fmt"
2424
"testing"
25+
"time"
2526

2627
"github.com/golang/mock/gomock"
2728
"github.com/pingcap/failpoint"
@@ -401,7 +402,14 @@ func (s *OpenAPIControllerSuite) TestTaskController() {
401402

402403
// delete
403404
{
404-
s.Nil(server.deleteTask(ctx, s.testTask.Name, true)) // delete with fore
405+
s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/master/MockSkipRemoveMetaData"))
406+
s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/master/MockRemoveDownstreamMetaDataError", `return(true)`))
407+
408+
s.NoError(server.deleteTask(ctx, s.testTask.Name, true))
409+
410+
s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/master/MockRemoveDownstreamMetaDataError"))
411+
s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/master/MockSkipRemoveMetaData", `return(true)`))
412+
405413
taskList, err := server.listTask(ctx, openapi.DMAPIGetTaskListParams{})
406414
s.Nil(err)
407415
s.Len(taskList, 0)
@@ -567,6 +575,48 @@ func (s *OpenAPIControllerSuite) TestTaskStatusSourceErrorFallback() {
567575
}
568576
}
569577

578+
func (s *OpenAPIControllerSuite) TestDeleteTaskUsesTimeoutForWholeDownstreamCleanup() {
579+
ctx, cancel := context.WithCancel(context.Background())
580+
server := setupTestServer(ctx, s.T())
581+
defer func() {
582+
cancel()
583+
server.Close()
584+
}()
585+
586+
worker1Name := "worker1"
587+
worker1Addr := "172.16.10.72:8262"
588+
s.NoError(server.scheduler.AddWorker(worker1Name, worker1Addr))
589+
worker1 := server.scheduler.GetWorkerByName(worker1Name)
590+
worker1.ToFree()
591+
592+
_, err := server.createSource(ctx, openapi.CreateSourceRequest{Source: *s.testSource, WorkerName: &worker1Name})
593+
s.NoError(err)
594+
595+
task := *s.testTask
596+
task.Name = "test-delete-task-timeout"
597+
_, err = server.createTask(ctx, openapi.CreateTaskRequest{Task: task})
598+
s.NoError(err)
599+
600+
oldTimeout := openAPIDeleteTaskDownstreamTimeout
601+
openAPIDeleteTaskDownstreamTimeout = 50 * time.Millisecond
602+
defer func() {
603+
openAPIDeleteTaskDownstreamTimeout = oldTimeout
604+
}()
605+
606+
s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/master/MockBlockOnDownstreamMetaDataCleanup", `return(200)`))
607+
defer func() {
608+
s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/master/MockBlockOnDownstreamMetaDataCleanup"))
609+
}()
610+
611+
start := time.Now()
612+
s.NoError(server.deleteTask(context.Background(), task.Name, true))
613+
s.Less(time.Since(start), 150*time.Millisecond)
614+
615+
taskList, err := server.listTask(ctx, openapi.DMAPIGetTaskListParams{})
616+
s.NoError(err)
617+
s.Len(taskList, 0)
618+
}
619+
570620
func (s *OpenAPIControllerSuite) TestTaskControllerWithInvalidTask() {
571621
ctx, cancel := context.WithCancel(context.Background())
572622
server := setupTestServer(ctx, s.T())

dm/master/server.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,6 +1434,10 @@ func GetLatestMeta(ctx context.Context, flavor string, dbConfig *dbconfig.DBConf
14341434
}
14351435

14361436
func AdjustTargetDBSessionCfg(ctx context.Context, dbConfig *dbconfig.DBConfig) error {
1437+
return AdjustTargetDBSessionCfgWithTimeout(ctx, dbConfig, conn.DefaultDBTimeout)
1438+
}
1439+
1440+
func AdjustTargetDBSessionCfgWithTimeout(ctx context.Context, dbConfig *dbconfig.DBConfig, timeout time.Duration) error {
14371441
cfg := *dbConfig
14381442
if len(cfg.Password) > 0 {
14391443
cfg.Password = utils.DecryptOrPlaintext(cfg.Password)
@@ -1443,7 +1447,7 @@ func AdjustTargetDBSessionCfg(ctx context.Context, dbConfig *dbconfig.DBConfig)
14431447
failpoint.Return(nil)
14441448
})
14451449

1446-
toDB, err := conn.GetDownstreamDB(&cfg)
1450+
toDB, err := conn.GetDownstreamDBWithTimeout(&cfg, timeout)
14471451
if err != nil {
14481452
return err
14491453
}
@@ -1774,11 +1778,30 @@ func withHost(addr string) string {
17741778
}
17751779

17761780
func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string, toDBCfg *dbconfig.DBConfig) error {
1781+
return s.removeMetaDataWithTimeout(ctx, taskName, metaSchema, toDBCfg, conn.DefaultDBTimeout)
1782+
}
1783+
1784+
func (s *Server) removeMetaDataWithTimeout(
1785+
ctx context.Context,
1786+
taskName,
1787+
metaSchema string,
1788+
toDBCfg *dbconfig.DBConfig,
1789+
dbTimeout time.Duration,
1790+
) error {
17771791
failpoint.Inject("MockSkipRemoveMetaData", func() {
17781792
failpoint.Return(nil)
17791793
})
17801794
toDBCfg.Adjust()
17811795

1796+
err := s.removeInternalMetaData(taskName)
1797+
if err != nil {
1798+
return err
1799+
}
1800+
1801+
return s.removeDownstreamMetaData(ctx, taskName, metaSchema, toDBCfg, dbTimeout)
1802+
}
1803+
1804+
func (s *Server) removeInternalMetaData(taskName string) error {
17821805
// clear shard meta data for pessimistic/optimist
17831806
err := s.pessimist.RemoveMetaData(taskName)
17841807
if err != nil {
@@ -1792,9 +1815,36 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
17921815
if err != nil {
17931816
return err
17941817
}
1818+
return nil
1819+
}
1820+
1821+
func (s *Server) removeDownstreamMetaData(
1822+
ctx context.Context,
1823+
taskName,
1824+
metaSchema string,
1825+
toDBCfg *dbconfig.DBConfig,
1826+
dbTimeout time.Duration,
1827+
) error {
1828+
failpoint.Inject("MockRemoveDownstreamMetaDataError", func() {
1829+
failpoint.Return(terror.WithScope(
1830+
terror.ErrDBDriverError.Generate("mock remove downstream metadata failed"),
1831+
terror.ScopeDownstream,
1832+
))
1833+
})
1834+
failpoint.Inject("MockBlockOnDownstreamMetaDataCleanup", func(val failpoint.Value) {
1835+
wait := time.Duration(val.(int)) * time.Millisecond
1836+
timer := time.NewTimer(wait)
1837+
defer timer.Stop()
1838+
select {
1839+
case <-ctx.Done():
1840+
failpoint.Return(ctx.Err())
1841+
case <-timer.C:
1842+
failpoint.Return(nil)
1843+
}
1844+
})
17951845

17961846
// set up db and clear meta data in downstream db
1797-
baseDB, err := conn.GetDownstreamDB(toDBCfg)
1847+
baseDB, err := conn.GetDownstreamDBWithTimeout(toDBCfg, dbTimeout)
17981848
if err != nil {
17991849
return terror.WithScope(err, terror.ScopeDownstream)
18001850
}

dm/pkg/conn/basedb.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"sync/atomic"
27+
"time"
2728

2829
"github.com/coreos/go-semver/semver"
2930
"github.com/go-sql-driver/mysql"
@@ -78,15 +79,35 @@ func DownstreamDBConfig(cfg *dbconfig.DBConfig) ScopedDBConfig {
7879
}
7980

8081
func GetUpstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error) {
81-
return DefaultDBProvider.Apply(UpstreamDBConfig(cfg))
82+
return getDBWithTimeout(UpstreamDBConfig(cfg), netTimeout)
8283
}
8384

8485
func GetDownstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error) {
85-
return DefaultDBProvider.Apply(DownstreamDBConfig(cfg))
86+
return getDBWithTimeout(DownstreamDBConfig(cfg), netTimeout)
87+
}
88+
89+
func GetDownstreamDBWithTimeout(cfg *dbconfig.DBConfig, timeout time.Duration) (*BaseDB, error) {
90+
return getDBWithTimeout(DownstreamDBConfig(cfg), timeout)
91+
}
92+
93+
func getDBWithTimeout(config ScopedDBConfig, timeout time.Duration) (*BaseDB, error) {
94+
type timeoutDBProvider interface {
95+
ApplyWithPingTimeout(config ScopedDBConfig, timeout time.Duration) (*BaseDB, error)
96+
}
97+
98+
if provider, ok := DefaultDBProvider.(timeoutDBProvider); ok {
99+
return provider.ApplyWithPingTimeout(config, timeout)
100+
}
101+
return DefaultDBProvider.Apply(config)
86102
}
87103

88104
// Apply will build BaseDB with DBConfig.
89105
func (d *DefaultDBProviderImpl) Apply(config ScopedDBConfig) (*BaseDB, error) {
106+
return d.ApplyWithPingTimeout(config, netTimeout)
107+
}
108+
109+
// ApplyWithPingTimeout will build BaseDB with DBConfig and a custom ping timeout.
110+
func (d *DefaultDBProviderImpl) ApplyWithPingTimeout(config ScopedDBConfig, pingTimeout time.Duration) (*BaseDB, error) {
90111
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
91112
// https://github.com/go-sql-driver/mysql#maxallowedpacket
92113
hostPort := net.JoinHostPort(config.Host, strconv.Itoa(config.Port))
@@ -158,7 +179,7 @@ func (d *DefaultDBProviderImpl) Apply(config ScopedDBConfig) (*BaseDB, error) {
158179
return nil, terror.DBErrorAdapt(err, config.Scope, terror.ErrDBDriverError)
159180
}
160181

161-
ctx, cancel := context.WithTimeout(context.Background(), netTimeout)
182+
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
162183
defer cancel()
163184
err = db.PingContext(ctx)
164185
failpoint.Inject("failDBPing", func(_ failpoint.Value) {

dm/tests/openapi/run.sh

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -815,11 +815,8 @@ function test_delete_task_with_stopped_downstream() {
815815
# stop downstream
816816
cleanup_tidb_server
817817

818-
# delete task failed because downstream is stopped.
819-
openapi_task_check "delete_task_failed" "$task_name"
820-
821-
# delete task success with force
822-
openapi_task_check "delete_task_with_force_success" "$task_name"
818+
# delete task success even if downstream is stopped.
819+
openapi_task_check "delete_task_success" "$task_name"
823820
openapi_task_check "get_task_list" 0
824821

825822
# restart downstream
@@ -829,6 +826,41 @@ function test_delete_task_with_stopped_downstream() {
829826
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: DELETE TASK WITH STOPPED DOWNSTREAM SUCCESS"
830827
}
831828

829+
function test_delete_task_with_downstream_meta_cleanup_error() {
830+
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: DELETE TASK WITH DOWNSTREAM META CLEANUP ERROR"
831+
cleanup_data openapi
832+
cleanup_process
833+
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/master/MockRemoveDownstreamMetaDataError=return(true)"
834+
run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $cur/conf/dm-master1.toml
835+
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1
836+
run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $cur/conf/dm-master2.toml
837+
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2
838+
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
839+
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
840+
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
841+
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
842+
prepare_database
843+
844+
task_name="test-no-shard-openapi-delete-failpoint"
845+
target_table_name=""
846+
847+
# create source successfully
848+
openapi_source_check "create_source1_success"
849+
openapi_source_check "create_source2_success"
850+
openapi_source_check "list_source_success" 2
851+
852+
openapi_task_check "create_noshard_task_success" $task_name $target_table_name
853+
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
854+
"query-status $task_name" \
855+
"\"stage\": \"Stopped\"" 2
856+
857+
openapi_task_check "delete_task_success" "$task_name"
858+
859+
export GO_FAILPOINTS=""
860+
cleanup_process
861+
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: DELETE TASK WITH DOWNSTREAM META CLEANUP ERROR SUCCESS"
862+
}
863+
832864
function test_start_task_with_condition() {
833865
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: START TASK WITH CONDITION"
834866
prepare_database
@@ -1300,8 +1332,8 @@ function run() {
13001332
test_full_mode_task
13011333
test_tls
13021334

1303-
# NOTE: this test case MUST running at last, because it will offline some members of cluster
13041335
test_cluster
1336+
test_delete_task_with_downstream_meta_cleanup_error
13051337
}
13061338

13071339
cleanup_data openapi

0 commit comments

Comments
 (0)