Skip to content

Commit 88ac061

Browse files
authored
sync-diff-inspector: enable limit iterator with splitter-strategy configuration (pingcap#12518)
close pingcap#12492
1 parent 8c5244d commit 88ac061

File tree

7 files changed

+168
-13
lines changed

7 files changed

+168
-13
lines changed

sync_diff_inspector/config/config.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ import (
4343
"go.uber.org/zap"
4444
)
4545

46+
// Supported values for SplitterStrategy.
47+
const (
48+
SplitterStrategyLimit = "limit"
49+
SplitterStrategyRandom = "random"
50+
)
51+
4652
const (
4753
// LocalFilePerm is the permission for local files
4854
LocalFilePerm os.FileMode = 0o644
@@ -398,6 +404,8 @@ type Config struct {
398404
CheckDataOnly bool `toml:"check-data-only" json:"-"`
399405
// skip validation for tables that don't exist upstream or downstream
400406
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
407+
// SplitterStrategy controls the fallback splitter when bucket stats are unavailable.
408+
SplitterStrategy string `toml:"splitter-strategy" json:"-"`
401409
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
402410
DMAddr string `toml:"dm-addr" json:"dm-addr"`
403411
// DMTask string `toml:"dm-task" json:"dm-task"`
@@ -628,6 +636,10 @@ func (c *Config) CheckConfig() bool {
628636
log.Error("check-thread-count must greater than 0!")
629637
return false
630638
}
639+
if err := c.normalizeSplitterStrategy(); err != nil {
640+
log.Warn("invalid splitter strategy", zap.Error(err))
641+
return false
642+
}
631643
if len(c.DMAddr) != 0 {
632644
u, err := url.Parse(c.DMAddr)
633645
if err != nil || u.Scheme == "" || u.Host == "" {
@@ -643,6 +655,19 @@ func (c *Config) CheckConfig() bool {
643655
return true
644656
}
645657

658+
func (c *Config) normalizeSplitterStrategy() error {
659+
mode := strings.ToLower(strings.TrimSpace(c.SplitterStrategy))
660+
switch mode {
661+
case "", SplitterStrategyRandom:
662+
c.SplitterStrategy = SplitterStrategyRandom
663+
case SplitterStrategyLimit:
664+
c.SplitterStrategy = mode
665+
default:
666+
return errors.Errorf("splitter-strategy must be limit or random")
667+
}
668+
return nil
669+
}
670+
646671
func timestampOutputDir() string {
647672
return filepath.Join(os.TempDir(), time.Now().Format("sync-diff.output.2006-01-02T15.04.05Z0700"))
648673
}

sync_diff_inspector/source/common/table_diff.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ type TableDiff struct {
6767

6868
ChunkSize int64 `json:"chunk-size"`
6969

70+
// SplitterStrategy is the fallback splitter when bucket stats are
71+
// unavailable ("limit" or "random").
72+
SplitterStrategy string `json:"-"`
73+
7074
// TableLack = 1: the table only exists downstream,
7175
// TableLack = -1: the table only exists upstream,
7276
// TableLack = 0: the table exists both upstream and downstream.

sync_diff_inspector/source/source.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups
145145
NeedUnifiedTimeZone: needUnifiedTimeZone,
146146
Collation: tableConfig.Collation,
147147
ChunkSize: tableConfig.ChunkSize,
148+
SplitterStrategy: cfg.SplitterStrategy,
148149
})
149150

150151
// When the router set case-sensitive false,

sync_diff_inspector/source/tidb.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,13 @@ func (a *TiDBTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.T
5858
if err == nil {
5959
return bucketIter, nil
6060
}
61-
log.Info("failed to build bucket iterator, fall back to use random iterator", zap.Error(err))
62-
// fall back to random splitter
63-
64-
// use random splitter if we cannot use bucket splitter, then we can simply choose target table to generate chunks.
65-
randIter, err := splitter.NewRandomIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
66-
if err != nil {
67-
return nil, errors.Trace(err)
61+
log.Info("failed to build bucket iterator, falling back", zap.Error(err))
62+
if originTable.SplitterStrategy == config.SplitterStrategyLimit {
63+
log.Info("choose limit splitter", zap.String("table", progressID))
64+
return splitter.NewLimitIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
6865
}
69-
return randIter, nil
66+
log.Info("choose random splitter", zap.String("table", progressID))
67+
return splitter.NewRandomIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
7068
}
7169

7270
// TiDBRowsIterator is used to iterate rows in TiDB

sync_diff_inspector/splitter/limit.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121

2222
"github.com/pingcap/errors"
23+
"github.com/pingcap/failpoint"
2324
"github.com/pingcap/log"
2425
"github.com/pingcap/tidb/pkg/meta/model"
2526
"github.com/pingcap/tidb/pkg/util/dbutil"
@@ -121,6 +122,8 @@ func NewLimitIteratorWithCheckpoint(
121122
return nil, errors.NotFoundf("not found index")
122123
}
123124

125+
tagChunk.IndexColumnNames = utils.GetColumnNames(indexColumns)
126+
124127
chunkSize := table.ChunkSize
125128
if chunkSize <= 0 {
126129
cnt, err := getRowCount(ctx, dbConn, table.Schema, table.Table, "", nil)
@@ -184,6 +187,20 @@ func (lmt *LimitIterator) Next() (*chunk.Range, error) {
184187
if !ok && c == nil {
185188
return nil, nil
186189
}
190+
if c != nil {
191+
failpoint.Inject("print-chunk-info", func() {
192+
lowerBounds := make([]string, len(c.Bounds))
193+
upperBounds := make([]string, len(c.Bounds))
194+
for i, bound := range c.Bounds {
195+
lowerBounds[i] = bound.Lower
196+
upperBounds[i] = bound.Upper
197+
}
198+
log.Info("failpoint print-chunk-info injected (limit splitter)",
199+
zap.Strings("lowerBounds", lowerBounds),
200+
zap.Strings("upperBounds", upperBounds),
201+
zap.String("indexCode", c.Index.ToString()))
202+
})
203+
}
187204
return c, nil
188205
}
189206
}
@@ -194,6 +211,10 @@ func (lmt *LimitIterator) GetIndexID() int64 {
194211
}
195212

196213
func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
214+
defer func() {
215+
progress.UpdateTotal(lmt.progressID, 0, true)
216+
close(lmt.chunksCh)
217+
}()
197218
for {
198219
where, args := lmt.tagChunk.ToString(lmt.table.Collation)
199220
query := fmt.Sprintf(lmt.queryTmpl, where)
@@ -211,16 +232,15 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
211232
if dataMap == nil {
212233
// there is no row in result set
213234
chunk.InitChunk(chunkRange, chunk.Limit, bucketID, bucketID, lmt.table.Collation, lmt.table.Range)
214-
progress.UpdateTotal(lmt.progressID, 1, true)
215235
select {
216236
case <-ctx.Done():
217237
case lmt.chunksCh <- chunkRange:
218238
}
219-
close(lmt.chunksCh)
220239
return
221240
}
222241

223242
newTagChunk := chunk.NewChunkRangeOffset(lmt.columnOffset, lmt.table.Info)
243+
newTagChunk.IndexColumnNames = chunkRange.IndexColumnNames
224244
for column, data := range dataMap {
225245
newTagChunk.Update(column, string(data.Data), "", !data.IsNull, false)
226246
chunkRange.Update(column, "", string(data.Data), false, !data.IsNull)
@@ -235,6 +255,11 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
235255
case lmt.chunksCh <- chunkRange:
236256
}
237257
lmt.tagChunk = newTagChunk
258+
259+
failpoint.Inject("check-one-chunk", func() {
260+
log.Info("failpoint check-one-chunk injected, stop producing new chunks.")
261+
failpoint.Return()
262+
})
238263
}
239264
}
240265

@@ -263,8 +288,9 @@ func generateLimitQueryTemplate(indexColumns []*model.ColumnInfo, table *common.
263288
fields = append(fields, dbutil.ColumnName(columnInfo.Name.O))
264289
}
265290
columns := strings.Join(fields, ", ")
291+
orderBy := utils.BuildOrderByClause(indexColumns, table.Collation)
292+
tableName := dbutil.TableName(table.Schema, table.Table)
266293

267-
// TODO: the limit splitter has not been used yet.
268-
// once it is used, need to add `collation` after `ORDER BY`.
269-
return fmt.Sprintf("SELECT %s FROM %s WHERE %%s ORDER BY %s LIMIT %d,1", columns, dbutil.TableName(table.Schema, table.Table), columns, chunkSize)
294+
return fmt.Sprintf("SELECT %s FROM %s WHERE %%s ORDER BY %s LIMIT %d,1",
295+
columns, tableName, orderBy, chunkSize)
270296
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Diff Configuration.
2+
3+
######################### Global config #########################
4+
5+
# how many goroutines are created to check data
6+
check-thread-count = 4
7+
8+
# set false if just want compare data by checksum, will skip select data when checksum is not equal.
9+
# set true if want compare all different rows, will slow down the total compare time.
10+
export-fix-sql = true
11+
12+
# ignore check table's data
13+
check-struct-only = false
14+
15+
splitter-strategy = "limit"
16+
17+
######################### Databases config #########################
18+
[data-sources.mysql1]
19+
host = "127.0.0.1"#MYSQL_HOST
20+
port = 3306#MYSQL_PORT
21+
user = "root"
22+
password = ""
23+
24+
# remove comment if use tidb's snapshot data
25+
# snapshot = "2016-10-08 16:45:26"
26+
27+
[data-sources.tidb]
28+
host = "127.0.0.1"
29+
port = 4000
30+
user = "root"
31+
password = ""
32+
# remove comment if use tidb's snapshot data
33+
# snapshot = "2016-10-08 16:45:26"
34+
35+
[table-configs]
36+
[table-configs.config1]
37+
target-tables = ["diff_test.test"]
38+
chunk-size = 10
39+
range = "1=1"
40+
######################### Task config #########################
41+
[task]
42+
# 1 fix sql: fix-target-TIDB1.sql
43+
# 2 log: sync-diff.log
44+
# 3 summary: summary.txt
45+
# 4 checkpoint: a dir
46+
output-dir = "/tmp/sync_diff_inspector_test/sync_diff_inspector/output"
47+
48+
source-instances = ["mysql1"]
49+
50+
target-instance = "tidb"
51+
52+
# tables need to check.
53+
target-check-tables = ["diff_test.test"]
54+
55+
# extra table config
56+
target-configs= ["config1"]

sync_diff_inspector/tests/sync_diff_inspector/checkpoint/run.sh

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,51 @@ cat $OUT_DIR/first_chunk_index
9595
check_contains "${last_chunk_bound}" $OUT_DIR/first_chunk_bound
9696
check_contains_regex ".:${bucket_index_left}-${bucket_index_right}:$((${last_chunk_index_array[2]} + 1)):${last_chunk_index_array[3]}" $OUT_DIR/first_chunk_index
9797

98+
sed "s/\"127.0.0.1\"#MYSQL_HOST/\"${MYSQL_HOST}\"/g" ./config_base_limit.toml | sed "s/3306#MYSQL_PORT/${MYSQL_PORT}/g" >./config.toml
99+
100+
echo "================test limit checkpoint================="
101+
echo "------1. checkpoint and resume with limit---------"
102+
rm -rf $OUT_DIR
103+
mkdir -p $OUT_DIR
104+
export GO_FAILPOINTS="github.com/pingcap/tiflow/sync_diff_inspector/splitter/check-one-chunk=return();\
105+
github.com/pingcap/tiflow/sync_diff_inspector/splitter/print-chunk-info=return();\
106+
github.com/pingcap/tiflow/sync_diff_inspector/diff/wait-for-checkpoint=return()"
107+
sync_diff_inspector --config=./config.toml >$OUT_DIR/checkpoint_diff.output
108+
check_contains "check pass!!!" $OUT_DIR/sync_diff.log
109+
check_contains "choose limit splitter" $OUT_DIR/sync_diff.log
110+
# Save the last chunk's info to verify continuation.
111+
# With limit, each chunk is a single row query, so chunkIndex+1 == chunkCnt.
112+
last_chunk_info=$(grep 'print-chunk-info' $OUT_DIR/sync_diff.log | awk -F 'upperBounds=' '{print $2}' | sed 's/[]["]//g' | sort -n | awk 'END {print}')
113+
echo "$last_chunk_info" # e.g. 9 indexCode=0:0-0:0:1
114+
last_chunk_bound=$(echo $last_chunk_info | awk -F ' ' '{print $1}')
115+
echo "$last_chunk_bound"
116+
last_chunk_index=$(echo $last_chunk_info | awk -F '=' '{print $2}')
117+
echo "$last_chunk_index"
118+
OLD_IFS="$IFS"
119+
IFS=":"
120+
last_chunk_index_array=($last_chunk_index)
121+
IFS="$OLD_IFS"
122+
for s in ${last_chunk_index_array[@]}; do
123+
echo "$s"
124+
done
125+
# chunkIndex should be the last Index
126+
[[ $((${last_chunk_index_array[2]} + 1)) -eq ${last_chunk_index_array[3]} ]] || exit 1
127+
# Save bucketIndexRight, which should be equal to bucketIndexLeft of the chunk first created in the next running.
128+
bucket_index_right=$(($(echo ${last_chunk_index_array[1]} | awk -F '-' '{print $2}') + 1))
129+
echo $bucket_index_right
130+
131+
rm -f $OUT_DIR/sync_diff.log
132+
export GO_FAILPOINTS="github.com/pingcap/tiflow/sync_diff_inspector/splitter/print-chunk-info=return()"
133+
sync_diff_inspector --config=./config.toml >$OUT_DIR/checkpoint_diff.output
134+
first_chunk_info=$(grep 'print-chunk-info' $OUT_DIR/sync_diff.log | awk -F 'lowerBounds=' '{print $2}' | sed 's/[]["]//g' | sort -n | awk 'NR==1')
135+
echo $first_chunk_info | awk -F '=' '{print $1}' >$OUT_DIR/first_chunk_bound
136+
cat $OUT_DIR/first_chunk_bound
137+
echo $first_chunk_info | awk -F '=' '{print $3}' >$OUT_DIR/first_chunk_index
138+
cat $OUT_DIR/first_chunk_index
139+
# Notice: when chunk is created paralleling, the least chunk may not appear in the first line. so we sort it as before.
140+
check_contains "${last_chunk_bound}" $OUT_DIR/first_chunk_bound
141+
check_contains_regex ".*:${bucket_index_right}-.*:0:.*" $OUT_DIR/first_chunk_index
142+
98143
sed "s/\"127.0.0.1\"#MYSQL_HOST/\"${MYSQL_HOST}\"/g" ./config_base_rand.toml | sed "s/3306#MYSQL_PORT/${MYSQL_PORT}/g" >./config.toml
99144

100145
echo "================test random checkpoint================="

0 commit comments

Comments
 (0)