Skip to content

Commit f883b1e

Browse files
authored
drainer: fix too much version when initializing table infos (#1237) (#1238)
ref #1137
1 parent 9ab2794 commit f883b1e

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

drainer/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ type SyncerConfig struct {
122122
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
123123
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
124124
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
125-
LoadTableInfos bool `toml:"load-table-infos" json:"load-table-infos"`
125+
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`
126126

127127
// v2 filter rules
128128
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
@@ -253,7 +253,7 @@ func NewConfig() *Config {
253253
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
254254
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
255255
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
256-
fs.BoolVar(&cfg.SyncerCfg.LoadTableInfos, "load-table-infos", false, "load table infos")
256+
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
257257
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
258258
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
259259
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")

drainer/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
202202
defer tiStore.Close()
203203

204204
var jobs []*model.Job
205-
if cfg.LoadTableInfos {
205+
if cfg.LoadSchemaSnapshot {
206206
jobs, err = loadTableInfos(tiStore, cp.TS())
207207
} else {
208208
jobs, err = loadHistoryDDLJobs(tiStore)
@@ -281,7 +281,7 @@ func (s *Server) Start() error {
281281
}
282282
})
283283

284-
if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadTableInfos {
284+
if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
285285
s.tg.GoNoPanic("gc_safepoint", func() {
286286
defer func() { go s.Close() }()
287287
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)

drainer/util.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,22 @@ func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
214214
if err != nil {
215215
return nil, errors.Trace(err)
216216
}
217+
if len(tableInfos) == 0 {
218+
continue
219+
}
217220
for _, tableInfo := range tableInfos {
218221
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
219-
jobs = append(jobs, mockCreateTableJob(tableInfo, dbinfo.ID, version))
220-
version++
221222
}
223+
jobs = append(jobs, &model.Job{
224+
Type: model.ActionCreateTables,
225+
State: model.JobStateDone,
226+
SchemaID: dbinfo.ID,
227+
BinlogInfo: &model.HistoryInfo{
228+
SchemaVersion: version,
229+
MultipleTableInfos: tableInfos,
230+
},
231+
})
232+
version++
222233
}
223234
return jobs, nil
224235
}
@@ -379,15 +390,3 @@ func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
379390
},
380391
}
381392
}
382-
383-
func mockCreateTableJob(tableInfo *model.TableInfo, schemaID, schemaVersion int64) *model.Job {
384-
return &model.Job{
385-
Type: model.ActionCreateTable,
386-
State: model.JobStateDone,
387-
SchemaID: schemaID,
388-
BinlogInfo: &model.HistoryInfo{
389-
SchemaVersion: schemaVersion,
390-
TableInfo: tableInfo,
391-
},
392-
}
393-
}

0 commit comments

Comments
 (0)