Skip to content

Commit 49e9daf

Browse files
authored
lightning: always get latest PD leader when access PD after initialized (#46726) (#46758)
close #43436, close #46688
1 parent acdac74 commit 49e9daf

20 files changed

+185
-47
lines changed

br/pkg/lightning/importer/check_info_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
412412
dbMetas,
413413
preInfoGetter,
414414
nil,
415+
nil,
415416
)
416417
preInfoGetter.dbInfosCache = rc.dbInfos
417418
err = rc.checkCSVHeader(ctx)
@@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
465466
dbMetas,
466467
preInfoGetter,
467468
nil,
469+
nil,
468470
)
469471

470472
rc := &Controller{
@@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
622624
nil,
623625
preInfoGetter,
624626
nil,
627+
nil,
625628
)
626629
rc := &Controller{
627630
cfg: cfg,

br/pkg/lightning/importer/import.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ func NewImportControllerWithPauser(
430430
}
431431

432432
preCheckBuilder := NewPrecheckItemBuilder(
433-
cfg, p.DBMetas, preInfoGetter, cpdb,
433+
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
434434
)
435435

436436
rc := &Controller{
@@ -488,6 +488,8 @@ func (rc *Controller) Close() {
488488

489489
// Run starts the restore task.
490490
func (rc *Controller) Run(ctx context.Context) error {
491+
failpoint.Inject("beforeRun", func() {})
492+
491493
opts := []func(context.Context) error{
492494
rc.setGlobalVariables,
493495
rc.restoreSchema,
@@ -1395,7 +1397,7 @@ const (
13951397

13961398
func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
13971399
tlsOpt := rc.tls.ToPDSecurityOption()
1398-
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
1400+
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
13991401
if err != nil {
14001402
return nil, errors.Trace(err)
14011403
}
@@ -1556,8 +1558,13 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
15561558
}
15571559

15581560
// Disable GC because TiDB enables GC already.
1561+
1562+
currentLeaderAddr := rc.pdCli.GetLeaderAddr()
1563+
// remove URL scheme
1564+
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
1565+
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
15591566
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
1560-
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName),
1567+
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName),
15611568
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
15621569
)
15631570
if err != nil {
@@ -1769,7 +1776,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
17691776
}
17701777

17711778
func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
1772-
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg)
1779+
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr())
17731780
if err != nil {
17741781
return nil, errors.Trace(err)
17751782
}
@@ -2181,7 +2188,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
21812188
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
21822189
}
21832190
if isLocalBackend(rc.cfg) {
2184-
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
2191+
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
21852192
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
21862193
if err != nil {
21872194
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)

br/pkg/lightning/importer/import_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func TestPreCheckFailed(t *testing.T) {
224224
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
225225
}
226226
cpdb := panicCheckpointDB{}
227-
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
227+
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
228228
ctl := &Controller{
229229
cfg: cfg,
230230
saveCpCh: make(chan saveCp),

br/pkg/lightning/importer/precheck.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte
2323

2424
// PrecheckItemBuilder is used to build precheck items
2525
type PrecheckItemBuilder struct {
26-
cfg *config.Config
27-
dbMetas []*mydump.MDDatabaseMeta
28-
preInfoGetter PreImportInfoGetter
29-
checkpointsDB checkpoints.DB
26+
cfg *config.Config
27+
dbMetas []*mydump.MDDatabaseMeta
28+
preInfoGetter PreImportInfoGetter
29+
checkpointsDB checkpoints.DB
30+
pdLeaderAddrGetter func() string
3031
}
3132

3233
// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
3334
// pdCli **must not** be nil for local backend
34-
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
35+
func NewPrecheckItemBuilderFromConfig(
36+
ctx context.Context,
37+
cfg *config.Config,
38+
pdCli pd.Client,
39+
opts ...ropts.PrecheckItemBuilderOption,
40+
) (*PrecheckItemBuilder, error) {
3541
var gerr error
3642
builderCfg := new(ropts.PrecheckItemBuilderConfig)
3743
for _, o := range opts {
@@ -71,7 +77,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
7177
if err != nil {
7278
return nil, errors.Trace(err)
7379
}
74-
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
80+
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
7581
}
7682

7783
// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
@@ -80,12 +86,21 @@ func NewPrecheckItemBuilder(
8086
dbMetas []*mydump.MDDatabaseMeta,
8187
preInfoGetter PreImportInfoGetter,
8288
checkpointsDB checkpoints.DB,
89+
pdCli pd.Client,
8390
) *PrecheckItemBuilder {
91+
leaderAddrGetter := func() string {
92+
return cfg.TiDB.PdAddr
93+
}
94+
// in tests we may not have a pdCli
95+
if pdCli != nil {
96+
leaderAddrGetter = pdCli.GetLeaderAddr
97+
}
8498
return &PrecheckItemBuilder{
85-
cfg: cfg,
86-
dbMetas: dbMetas,
87-
preInfoGetter: preInfoGetter,
88-
checkpointsDB: checkpointsDB,
99+
cfg: cfg,
100+
dbMetas: dbMetas,
101+
preInfoGetter: preInfoGetter,
102+
checkpointsDB: checkpointsDB,
103+
pdLeaderAddrGetter: leaderAddrGetter,
89104
}
90105
}
91106

@@ -117,7 +132,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
117132
case precheck.CheckLocalTempKVDir:
118133
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
119134
case precheck.CheckTargetUsingCDCPITR:
120-
return NewCDCPITRCheckItem(b.cfg), nil
135+
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
121136
default:
122137
return nil, errors.Errorf("unsupported check item: %v", checkID)
123138
}

br/pkg/lightning/importer/precheck_impl.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -746,17 +746,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
746746
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
747747
// caller override the Instruction message.
748748
type CDCPITRCheckItem struct {
749-
cfg *config.Config
750-
Instruction string
749+
cfg *config.Config
750+
Instruction string
751+
leaderAddrGetter func() string
751752
// used in test
752753
etcdCli *clientv3.Client
753754
}
754755

755756
// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
756-
func NewCDCPITRCheckItem(cfg *config.Config) precheck.Checker {
757+
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
757758
return &CDCPITRCheckItem{
758-
cfg: cfg,
759-
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
759+
cfg: cfg,
760+
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
761+
leaderAddrGetter: leaderAddrGetter,
760762
}
761763
}
762764

@@ -765,7 +767,11 @@ func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID {
765767
return precheck.CheckTargetUsingCDCPITR
766768
}
767769

768-
func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
770+
func dialEtcdWithCfg(
771+
ctx context.Context,
772+
cfg *config.Config,
773+
leaderAddr string,
774+
) (*clientv3.Client, error) {
769775
cfg2, err := cfg.ToTLS()
770776
if err != nil {
771777
return nil, err
@@ -774,7 +780,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,
774780

775781
return clientv3.New(clientv3.Config{
776782
TLS: tlsConfig,
777-
Endpoints: []string{cfg.TiDB.PdAddr},
783+
Endpoints: []string{leaderAddr},
778784
AutoSyncInterval: 30 * time.Second,
779785
DialTimeout: 5 * time.Second,
780786
DialOptions: []grpc.DialOption{
@@ -801,7 +807,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e
801807

802808
if ci.etcdCli == nil {
803809
var err error
804-
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
810+
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
805811
if err != nil {
806812
return nil, errors.Trace(err)
807813
}

br/pkg/lightning/importer/precheck_impl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
598598
Backend: config.BackendLocal,
599599
},
600600
}
601-
ci := NewCDCPITRCheckItem(cfg)
601+
ci := NewCDCPITRCheckItem(cfg, nil)
602602
checker := ci.(*CDCPITRCheckItem)
603603
checker.etcdCli = testEtcdCluster.RandClient()
604604
result, err := ci.Check(ctx)

br/pkg/lightning/importer/precheck_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {
3232

3333
preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
3434
require.NoError(t, err)
35-
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
35+
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
3636
for _, checkItemID := range []precheck.CheckItemID{
3737
precheck.CheckLargeDataFile,
3838
precheck.CheckSourcePermission,

br/pkg/lightning/importer/table_import_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
11931193
targetInfoGetter: targetInfoGetter,
11941194
srcStorage: mockStore,
11951195
}
1196-
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
1196+
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
11971197
rc := &Controller{
11981198
cfg: cfg,
11991199
tls: tls,
@@ -1352,7 +1352,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
13521352
targetInfoGetter: targetInfoGetter,
13531353
dbMetas: dbMetas,
13541354
}
1355-
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
1355+
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
13561356
rc := &Controller{
13571357
cfg: cfg,
13581358
tls: tls,
@@ -1448,7 +1448,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
14481448
for _, ca := range cases {
14491449
template := NewSimpleTemplate()
14501450
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
1451-
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
1451+
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
14521452
rc := &Controller{
14531453
cfg: cfg,
14541454
checkTemplate: template,

0 commit comments

Comments
 (0)