Skip to content

Commit 86c4dd0

Browse files
drainer/* Improve performance of dest-type=kafka (#423) (#428)
note binlog.String() may be pretty slow only 1k+ op/second of the benchmark
1 parent 0863544 commit 86c4dd0

File tree

3 files changed

+44
-27
lines changed

3 files changed

+44
-27
lines changed

drainer/executor/bench_kafka_test.go

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,48 @@ import (
88
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
99
)
1010

11+
var bytes = make([]byte, 5*(1<<10))
12+
var table = &obinlog.Table{
13+
SchemaName: proto.String("test"),
14+
TableName: proto.String("test"),
15+
ColumnInfo: []*obinlog.ColumnInfo{
16+
&obinlog.ColumnInfo{Name: "id", MysqlType: "int"},
17+
&obinlog.ColumnInfo{Name: "a1", MysqlType: "blob"},
18+
},
19+
Mutations: []*obinlog.TableMutation{
20+
&obinlog.TableMutation{
21+
Type: obinlog.MutationType_Insert.Enum(),
22+
Row: &obinlog.Row{
23+
Columns: []*obinlog.Column{
24+
&obinlog.Column{
25+
Int64Value: proto.Int64(1),
26+
},
27+
&obinlog.Column{
28+
BytesValue: bytes,
29+
},
30+
},
31+
},
32+
},
33+
},
34+
}
35+
36+
// with bytes = 5KB
37+
// BenchmarkBinlogMarshal-4 100000 573941 ns/op
38+
// means only 1742 op/second
39+
func BenchmarkBinlogMarshal(b *testing.B) {
40+
binlog := &obinlog.Binlog{
41+
DmlData: &obinlog.DMLData{
42+
Tables: []*obinlog.Table{table},
43+
},
44+
}
45+
for i := 0; i < b.N; i++ {
46+
binlog.String()
47+
}
48+
}
49+
50+
// with bytes = 5KB
51+
// BenchmarkKafka-4 1000000 42384 ns/op
52+
// means 23593 op/second
1153
func BenchmarkKafka(b *testing.B) {
1254
log.SetLevelByString("error")
1355

@@ -22,31 +64,6 @@ func BenchmarkKafka(b *testing.B) {
2264
b.Fatal(err)
2365
}
2466

25-
bytes := make([]byte, 128)
26-
table := &obinlog.Table{
27-
SchemaName: proto.String("test"),
28-
TableName: proto.String("test"),
29-
ColumnInfo: []*obinlog.ColumnInfo{
30-
{Name: "id", MysqlType: "int"},
31-
{Name: "a1", MysqlType: "blob"},
32-
},
33-
Mutations: []*obinlog.TableMutation{
34-
{
35-
Type: obinlog.MutationType_Insert.Enum(),
36-
Row: &obinlog.Row{
37-
Columns: []*obinlog.Column{
38-
{
39-
Int64Value: proto.Int64(1),
40-
},
41-
{
42-
BytesValue: bytes,
43-
},
44-
},
45-
},
46-
},
47-
},
48-
}
49-
5067
b.ResetTimer()
5168
var arg = []interface{}{table}
5269
var args = [][]interface{}{arg}

drainer/executor/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (p *kafkaExecutor) Close() error {
139139
}
140140

141141
func (p *kafkaExecutor) saveBinlog(binlog *obinlog.Binlog) error {
142-
log.Debug("save binlog: ", binlog.String())
142+
// log.Debug("save binlog: ", binlog.String())
143143
data, err := binlog.Marshal()
144144
if err != nil {
145145
return errors.Trace(err)

drainer/translator/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
148148
return
149149
}
150150

151-
log.Debugf("decodeRow: %+v\n", columnValues)
151+
// log.Debugf("decodeRow: %+v\n", columnValues)
152152
// maybe only the pk column value
153153
if columnValues == nil {
154154
columnValues = make(map[int64]types.Datum)

0 commit comments

Comments
 (0)