Skip to content

Commit ba468df

Browse files
add downstream kafka integrate test (#412) (#426)
* tests/* add downstream kafka integrate test
1 parent 1fc3ea8 commit ba468df

File tree

21 files changed

+1536
-131
lines changed

21 files changed

+1536
-131
lines changed

drainer/config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,6 @@ func (cfg *Config) adjustConfig() error {
270270
} else if cfg.SyncerCfg.DestDBType == "pb" {
271271
cfg.SyncerCfg.To.BinlogFileDir = cfg.DataDir
272272
log.Infof("use default downstream pb directory: %s", cfg.DataDir)
273-
} else if cfg.SyncerCfg.DestDBType == "kafka" {
274-
cfg.SyncerCfg.To.KafkaAddrs = defaultKafkaAddrs
275-
cfg.SyncerCfg.To.KafkaVersion = defaultKafkaVersion
276273
}
277274
}
278275

@@ -298,7 +295,12 @@ func (cfg *Config) adjustConfig() error {
298295
cfg.SyncerCfg.To.KafkaVersion = defaultKafkaVersion
299296
}
300297
if cfg.SyncerCfg.To.KafkaAddrs == "" {
301-
cfg.SyncerCfg.To.KafkaAddrs = defaultKafkaAddrs
298+
addrs := os.Getenv("KAFKA_ADDRS")
299+
if len(addrs) > 0 {
300+
cfg.SyncerCfg.To.KafkaAddrs = addrs
301+
} else {
302+
cfg.SyncerCfg.To.KafkaAddrs = defaultKafkaAddrs
303+
}
302304
}
303305

304306
if cfg.SyncerCfg.To.KafkaMaxMessages <= 0 {

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ require (
6262
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c // indirect
6363
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
6464
github.com/pingcap/check v0.0.0-20171206051426-1c287c953996
65+
github.com/pingcap/errors v0.11.0 // indirect
6566
github.com/pingcap/goleveldb v0.0.0-20161010101021-158edde5a354 // indirect
6667
github.com/pingcap/kvproto v0.0.0-20181010074705-0ba3ca8a6e37 // indirect
68+
github.com/pingcap/parser v0.0.0-20181210061630-27e9d3e251d4 // indirect
6769
github.com/pingcap/pd v2.0.5+incompatible
6870
github.com/pingcap/tidb v2.1.0-beta.0.20180823032518-ef6590e1899a+incompatible
69-
github.com/pingcap/tidb-tools v0.0.0-20180913093950-ff0ddbf51b57
71+
github.com/pingcap/tidb-tools v2.1.1-0.20181130053235-0206fdab9ef8+incompatible
7072
github.com/pingcap/tipb v0.0.0-20180711115030-4141907f6909
7173
github.com/pkg/errors v0.8.0 // indirect
7274
github.com/pmezard/go-difflib v1.0.0 // indirect

tests/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ This folder contains all tests which relies on external service such as TiDB.
1313
2. The following programs must be installed:
1414

1515
- `mysql`(the CLI client)
16+
- `kafka` working on default port 9092
1617

1718
3. The user executing the tests must have permission to create the folder
1819

tests/_utils/run_drainer

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ echo "[$(date)] <<<<<< START IN TEST ${TEST_NAME-} FOR: $config >>>>>>" >> "$OUT
1313

1414
if [ -f "$config" ]
1515
then
16-
drainer -config $config -log-file $OUT_DIR/drainer.log >> $OUT_DIR/drainer.log 2>&1
16+
drainer -config $config -log-file $OUT_DIR/drainer.log $* >> $OUT_DIR/drainer.log 2>&1
1717
else
18-
drainer -log-file $OUT_DIR/drainer.log >> $OUT_DIR/drainer.log 2>&1
18+
drainer -log-file $OUT_DIR/drainer.log $* >> $OUT_DIR/drainer.log 2>&1
1919
fi

tests/binlog/binlog.go

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"database/sql"
54
"flag"
65
"os"
76

@@ -24,32 +23,6 @@ func main() {
2423
os.Exit(2)
2524
}
2625

27-
TableSQLs := []string{`
28-
create table ptest(
29-
a int primary key,
30-
b double NOT NULL DEFAULT 2.0,
31-
c varchar(10) NOT NULL,
32-
d time unique
33-
);
34-
`,
35-
`
36-
create table itest(
37-
a int,
38-
b double NOT NULL DEFAULT 2.0,
39-
c varchar(10) NOT NULL,
40-
d time unique,
41-
PRIMARY KEY(a, b)
42-
);
43-
`,
44-
`
45-
create table ntest(
46-
a int,
47-
b double NOT NULL DEFAULT 2.0,
48-
c varchar(10) NOT NULL,
49-
d time unique
50-
);
51-
`}
52-
5326
sourceDB, err := util.CreateDB(cfg.SourceDBCfg)
5427
if err != nil {
5528
log.Fatal(err)
@@ -62,23 +35,5 @@ create table ntest(
6235
}
6336
defer util.CloseDB(targetDB)
6437

65-
// run the simple test case
66-
dailytest.RunCase(&cfg.DiffConfig, sourceDB, targetDB)
67-
68-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
69-
// generate insert/update/delete sqls and execute
70-
dailytest.RunDailyTest(cfg.SourceDBCfg, TableSQLs, cfg.WorkerCount, cfg.JobCount, cfg.Batch)
71-
})
72-
73-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
74-
// truncate test data
75-
dailytest.TruncateTestTable(cfg.SourceDBCfg, TableSQLs)
76-
})
77-
78-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
79-
// drop test table
80-
dailytest.DropTestTable(cfg.SourceDBCfg, TableSQLs)
81-
})
82-
83-
log.Info("test pass!!!")
38+
dailytest.Run(sourceDB, targetDB, &cfg.DiffConfig, cfg.WorkerCount, cfg.JobCount, cfg.Batch)
8439
}

tests/dailytest/dailytest.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package dailytest
2+
3+
import (
4+
"database/sql"
5+
6+
"github.com/ngaut/log"
7+
"github.com/pingcap/tidb-binlog/diff"
8+
)
9+
10+
func Run(sourceDB *sql.DB, targetDB *sql.DB, diffCfg *diff.Config, workerCount int, jobCount int, batch int) {
11+
12+
TableSQLs := []string{`
13+
create table ptest(
14+
a int primary key,
15+
b double NOT NULL DEFAULT 2.0,
16+
c varchar(10) NOT NULL,
17+
d time unique
18+
);
19+
`,
20+
`
21+
create table itest(
22+
a int,
23+
b double NOT NULL DEFAULT 2.0,
24+
c varchar(10) NOT NULL,
25+
d time unique,
26+
PRIMARY KEY(a, b)
27+
);
28+
`,
29+
`
30+
create table ntest(
31+
a int,
32+
b double NOT NULL DEFAULT 2.0,
33+
c varchar(10) NOT NULL,
34+
d time unique
35+
);
36+
`}
37+
38+
// run the simple test case
39+
RunCase(diffCfg, sourceDB, targetDB)
40+
41+
RunTest(diffCfg, sourceDB, targetDB, func(src *sql.DB) {
42+
// generate insert/update/delete sqls and execute
43+
RunDailyTest(sourceDB, TableSQLs, workerCount, jobCount, batch)
44+
})
45+
46+
RunTest(diffCfg, sourceDB, targetDB, func(src *sql.DB) {
47+
// truncate test data
48+
TruncateTestTable(sourceDB, TableSQLs)
49+
})
50+
51+
RunTest(diffCfg, sourceDB, targetDB, func(src *sql.DB) {
52+
// drop test table
53+
DropTestTable(sourceDB, TableSQLs)
54+
})
55+
56+
log.Info("test pass!!!")
57+
58+
}

tests/dailytest/exector.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package dailytest
22

33
import (
4+
"database/sql"
45
"fmt"
56
"sync"
67

78
"github.com/ngaut/log"
8-
"github.com/pingcap/tidb-binlog/tests/util"
99
)
1010

1111
// RunDailyTest generates insert/update/delete sqls and execute
12-
func RunDailyTest(dbCfg util.DBConfig, tableSQLs []string, workerCount int, jobCount int, batch int) {
12+
func RunDailyTest(db *sql.DB, tableSQLs []string, workerCount int, jobCount int, batch int) {
1313
var wg sync.WaitGroup
1414
wg.Add(len(tableSQLs))
1515

@@ -23,32 +23,20 @@ func RunDailyTest(dbCfg util.DBConfig, tableSQLs []string, workerCount int, jobC
2323
log.Fatal(err)
2424
}
2525

26-
dbs, err := createDBs(dbCfg, workerCount)
26+
err = execSQL(db, tableSQLs[i])
2727
if err != nil {
2828
log.Fatal(err)
2929
}
30-
defer closeDBs(dbs)
3130

32-
err = execSQL(dbs[0], tableSQLs[i])
33-
if err != nil {
34-
log.Fatal(err)
35-
}
36-
37-
doProcess(table, dbs, jobCount, workerCount, batch)
31+
doProcess(table, db, jobCount, workerCount, batch)
3832
}(i)
3933
}
4034

4135
wg.Wait()
4236
}
4337

4438
// TruncateTestTable truncates test data
45-
func TruncateTestTable(dbCfg util.DBConfig, tableSQLs []string) {
46-
db, err := util.CreateDB(dbCfg)
47-
if err != nil {
48-
log.Fatal(err)
49-
}
50-
defer util.CloseDB(db)
51-
39+
func TruncateTestTable(db *sql.DB, tableSQLs []string) {
5240
for i := range tableSQLs {
5341
table := newTable()
5442
err := parseTableSQL(table, tableSQLs[i])
@@ -64,13 +52,7 @@ func TruncateTestTable(dbCfg util.DBConfig, tableSQLs []string) {
6452
}
6553

6654
// DropTestTable drops test table
67-
func DropTestTable(dbCfg util.DBConfig, tableSQLs []string) {
68-
db, err := util.CreateDB(dbCfg)
69-
if err != nil {
70-
log.Fatal(err)
71-
}
72-
defer util.CloseDB(db)
73-
55+
func DropTestTable(db *sql.DB, tableSQLs []string) {
7456
for i := range tableSQLs {
7557
table := newTable()
7658
err := parseTableSQL(table, tableSQLs[i])

tests/dailytest/job.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ func doWait(doneChan chan struct{}, start time.Time, jobCount int, workerCount i
9696
close(doneChan)
9797
}
9898

99-
func doDMLProcess(table *table, dbs []*sql.DB, jobCount int, workerCount int, batch int) {
99+
func doDMLProcess(table *table, db *sql.DB, jobCount int, workerCount int, batch int) {
100100
jobChan := make(chan struct{}, 16*workerCount)
101101
doneChan := make(chan struct{}, workerCount)
102102

103103
start := time.Now()
104104
go addJobs(jobCount, jobChan)
105105

106106
for i := 0; i < workerCount; i++ {
107-
go doJob(table, dbs[i], batch, jobChan, doneChan)
107+
go doJob(table, db, batch, jobChan, doneChan)
108108
}
109109

110110
doWait(doneChan, start, jobCount, workerCount)
@@ -149,12 +149,12 @@ func doDDLProcess(table *table, db *sql.DB) {
149149
execSqls(db, []string{sql}, [][]interface{}{{}})
150150
}
151151

152-
func doProcess(table *table, dbs []*sql.DB, jobCount int, workerCount int, batch int) {
152+
func doProcess(table *table, db *sql.DB, jobCount int, workerCount int, batch int) {
153153
if len(table.columns) <= 2 {
154154
log.Fatal("column count must > 2, and the first and second column are for primary key")
155155
}
156156

157-
doDMLProcess(table, dbs, jobCount/2, workerCount, batch)
158-
doDDLProcess(table, dbs[0])
159-
doDMLProcess(table, dbs, jobCount/2, workerCount, batch)
157+
doDMLProcess(table, db, jobCount/2, workerCount, batch)
158+
doDDLProcess(table, db)
159+
doDMLProcess(table, db, jobCount/2, workerCount, batch)
160160
}

tests/flash/flash.go

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"database/sql"
54
"flag"
65
"fmt"
76
"os"
@@ -27,32 +26,6 @@ func main() {
2726
os.Exit(2)
2827
}
2928

30-
TableSQLs := []string{`
31-
create table ptest(
32-
a int primary key,
33-
b double NOT NULL DEFAULT 2.0,
34-
c varchar(10) NOT NULL,
35-
d time unique
36-
);
37-
`,
38-
`
39-
create table itest(
40-
a int,
41-
b double NOT NULL DEFAULT 2.0,
42-
c varchar(10) NOT NULL,
43-
d time unique,
44-
PRIMARY KEY(a, b)
45-
);
46-
`,
47-
`
48-
create table ntest(
49-
a int,
50-
b double NOT NULL DEFAULT 2.0,
51-
c varchar(10) NOT NULL,
52-
d time unique
53-
);
54-
`}
55-
5629
sourceDB, err := util.CreateDB(cfg.SourceDBCfg)
5730
if err != nil {
5831
log.Fatal(err)
@@ -84,23 +57,5 @@ create table ntest(
8457
log.Fatal(err)
8558
}
8659

87-
// run the simple test case
88-
dailytest.RunCase(&cfg.DiffConfig, sourceDB, targetDB)
89-
90-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
91-
// generate insert/update/delete sqls and execute
92-
dailytest.RunDailyTest(cfg.SourceDBCfg, TableSQLs, cfg.WorkerCount, cfg.JobCount, cfg.Batch)
93-
})
94-
95-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
96-
// truncate test data
97-
dailytest.TruncateTestTable(cfg.SourceDBCfg, TableSQLs)
98-
})
99-
100-
dailytest.RunTest(&cfg.DiffConfig, sourceDB, targetDB, func(src *sql.DB) {
101-
// drop test table
102-
dailytest.DropTestTable(cfg.SourceDBCfg, TableSQLs)
103-
})
104-
105-
log.Info("test pass!!!")
60+
dailytest.Run(sourceDB, targetDB, &cfg.DiffConfig, cfg.WorkerCount, cfg.JobCount, cfg.Batch)
10661
}

tests/kafka/drainer.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# drainer meta data directory path
2+
data-dir = "/tmp/tidb_binlog_test/data.drainer"
3+
4+
# a comma separated list of PD endpoints
5+
pd-urls = "http://127.0.0.1:2379"
6+
7+
# syncer Configuration.
8+
[syncer]
9+
10+
# disable sync these schema
11+
ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
12+
13+
# number of binlog events in a transaction batch
14+
txn-batch = 1
15+
16+
# work count to execute binlogs
17+
worker-count = 1
18+
19+
disable-dispatch = false
20+
21+
# safe mode will split update to delete and insert
22+
safe-mode = false
23+
24+
# downstream storage, equal to --dest-db-type
25+
# valid values are "mysql", "pb", "tidb", "flash", "kafka"
26+
db-type = "kafka"
27+
28+
# Time and size limits for flash batch write
29+
# time-limit = "30s"
30+
# size-limit = "100000"
31+
[syncer.to.checkpoint]
32+
#schema = "tidb_binlog"
33+
34+
[syncer.to]
35+
topic-name = "binlog_test_topic"

0 commit comments

Comments
 (0)