Skip to content

Commit a00349b

Browse files
authored
syncer(dm): debounce repeated unhandled-event warnings (pingcap#12579)
close pingcap#12499
1 parent 7d85ef4 commit a00349b

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

dm/syncer/syncer.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/pingcap/tidb/pkg/sessionctx"
3939
"github.com/pingcap/tidb/pkg/util/dbutil"
4040
"github.com/pingcap/tidb/pkg/util/filter"
41+
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
4142
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
4243
router "github.com/pingcap/tidb/pkg/util/table-router"
4344
"github.com/pingcap/tiflow/dm/config"
@@ -97,6 +98,9 @@ const (
9798
skipJobIdx = iota
9899
ddlJobIdx
99100
workerJobTSArrayInitSize // size = skip + ddl
101+
102+
unhandledEventSampleInterval = 5 * time.Minute
103+
unhandledEventSampleFirst = 1
100104
)
101105

102106
// waitXIDStatus represents the status for waiting XID event when pause/stop task.
@@ -255,7 +259,8 @@ type Syncer struct {
255259
charsetAndDefaultCollation map[string]string
256260
idAndCollationMap map[int]string
257261

258-
ddlWorker *DDLWorker
262+
ddlWorker *DDLWorker
263+
unhandledEventLogger *zap.Logger
259264
}
260265

261266
// NewSyncer creates a new Syncer.
@@ -307,6 +312,11 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel
307312
syncer.lastCheckpointFlushedTime = time.Time{}
308313
syncer.relay = relay
309314
syncer.safeMode = sm.NewSafeMode()
315+
syncer.unhandledEventLogger = tidblogutil.SampleLoggerFactory(
316+
unhandledEventSampleInterval,
317+
unhandledEventSampleFirst,
318+
logFields...,
319+
)()
310320

311321
return syncer
312322
}
@@ -343,6 +353,10 @@ func (s *Syncer) closeJobChans() {
343353
s.jobsClosed.Store(true)
344354
}
345355

356+
func (s *Syncer) recordUnhandledEvent(message string, ev interface{}) {
357+
s.unhandledEventLogger.Warn(message, zap.String("type", fmt.Sprintf("%T", ev)))
358+
}
359+
346360
// Type implements Unit.Type.
347361
func (s *Syncer) Type() pb.UnitType {
348362
return pb.UnitType_Sync
@@ -2443,7 +2457,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
24432457
case *replication.TableMapEvent:
24442458
case *replication.FormatDescriptionEvent:
24452459
default:
2446-
s.tctx.L().Warn("unhandled event from transaction payload", zap.String("type", fmt.Sprintf("%T", tpevt)))
2460+
s.recordUnhandledEvent("unhandled event from transaction payload", tpevt)
24472461
}
24482462
}
24492463
if needContinue {
@@ -2452,7 +2466,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
24522466
case *replication.TableMapEvent:
24532467
case *replication.FormatDescriptionEvent:
24542468
default:
2455-
s.tctx.L().Warn("unhandled event", zap.String("type", fmt.Sprintf("%T", ev)))
2469+
s.recordUnhandledEvent("unhandled event", ev)
24562470
}
24572471
if err2 != nil {
24582472
if err := s.handleEventError(err2, startLocation, endLocation, e.Header.EventType == replication.QUERY_EVENT, originSQL); err != nil {

dm/syncer/syncer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
_ "github.com/go-sql-driver/mysql"
3232
"github.com/pingcap/check"
3333
"github.com/pingcap/failpoint"
34+
pclog "github.com/pingcap/log"
3435
"github.com/pingcap/tidb/pkg/infoschema"
3536
"github.com/pingcap/tidb/pkg/parser"
3637
"github.com/pingcap/tidb/pkg/parser/ast"
@@ -62,6 +63,7 @@ import (
6263
"github.com/pingcap/tiflow/pkg/sqlmodel"
6364
"github.com/stretchr/testify/require"
6465
"go.uber.org/zap"
66+
"go.uber.org/zap/zaptest/observer"
6567
)
6668

6769
var _ = check.Suite(&testSyncerSuite{})
@@ -219,6 +221,38 @@ func (s *testSyncerSuite) TearDownSuite(c *check.C) {
219221
os.RemoveAll(s.cfg.Dir)
220222
}
221223

224+
func (s *testSyncerSuite) TestSampleUnhandledEvents(c *check.C) {
225+
core, logs := observer.New(zap.WarnLevel)
226+
restoreGlobal := pclog.ReplaceGlobals(zap.New(core), nil)
227+
defer restoreGlobal()
228+
229+
cfg := genDefaultSubTaskConfig4Test()
230+
syncer := NewSyncer(cfg, nil, nil)
231+
232+
syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
233+
syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
234+
syncer.recordUnhandledEvent("unhandled event", &replication.QueryEvent{})
235+
syncer.recordUnhandledEvent("unhandled event from transaction payload", &replication.QueryEvent{})
236+
237+
entries := logs.All()
238+
c.Assert(entries, check.HasLen, 2)
239+
240+
seen := make(map[string][]map[string]interface{}, len(entries))
241+
for _, entry := range entries {
242+
seen[entry.Message] = append(seen[entry.Message], entry.ContextMap())
243+
}
244+
245+
c.Assert(seen["unhandled event"], check.HasLen, 1)
246+
c.Assert(seen["unhandled event"][0]["type"], check.Equals, "*replication.RowsQueryEvent")
247+
c.Assert(seen["unhandled event from transaction payload"], check.HasLen, 1)
248+
c.Assert(seen["unhandled event from transaction payload"][0]["type"], check.Equals, "*replication.QueryEvent")
249+
250+
syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
251+
syncer.recordUnhandledEvent("unhandled event", &replication.QueryEvent{})
252+
syncer.recordUnhandledEvent("unhandled event from transaction payload", &replication.QueryEvent{})
253+
c.Assert(logs.All(), check.HasLen, 2)
254+
}
255+
222256
func mockGetServerUnixTS(mock sqlmock.Sqlmock) {
223257
ts := time.Now().Unix()
224258
rows := sqlmock.NewRows([]string{"UNIX_TIMESTAMP()"}).AddRow(strconv.FormatInt(ts, 10))

0 commit comments

Comments
 (0)