Skip to content

Commit bebe490

Browse files
committed
syncer(dm): log sampled unhandled-event counts
1 parent 5d46e99 commit bebe490

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

dm/syncer/syncer.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ type Syncer struct {
261261

262262
ddlWorker *DDLWorker
263263
unhandledEventLogger *zap.Logger
264+
unhandledEvents struct {
265+
sync.Mutex
266+
counts map[string]map[string]int
267+
}
264268
}
265269

266270
// NewSyncer creates a new Syncer.
@@ -317,6 +321,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel
317321
unhandledEventSampleFirst,
318322
logFields...,
319323
)()
324+
syncer.unhandledEvents.counts = make(map[string]map[string]int)
320325

321326
return syncer
322327
}
@@ -354,7 +359,23 @@ func (s *Syncer) closeJobChans() {
354359
}
355360

356361
func (s *Syncer) recordUnhandledEvent(message string, ev interface{}) {
357-
s.unhandledEventLogger.Warn(message, zap.String("type", fmt.Sprintf("%T", ev)))
362+
eventType := fmt.Sprintf("%T", ev)
363+
364+
s.unhandledEvents.Lock()
365+
eventCounts, ok := s.unhandledEvents.counts[message]
366+
if !ok {
367+
eventCounts = make(map[string]int)
368+
s.unhandledEvents.counts[message] = eventCounts
369+
}
370+
eventCounts[eventType]++
371+
372+
snapshot := make(map[string]int, len(eventCounts))
373+
for event, count := range eventCounts {
374+
snapshot[event] = count
375+
}
376+
s.unhandledEvents.Unlock()
377+
378+
s.unhandledEventLogger.Warn(message, zap.Any("events", snapshot))
358379
}
359380

360381
// Type implements Unit.Type.

dm/syncer/syncer_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,14 +243,28 @@ func (s *testSyncerSuite) TestSampleUnhandledEvents(c *check.C) {
243243
}
244244

245245
c.Assert(seen["unhandled event"], check.HasLen, 1)
246-
c.Assert(seen["unhandled event"][0]["type"], check.Equals, "*replication.RowsQueryEvent")
246+
c.Assert(seen["unhandled event"][0]["events"], check.DeepEquals, map[string]int{
247+
"*replication.RowsQueryEvent": 1,
248+
})
247249
c.Assert(seen["unhandled event from transaction payload"], check.HasLen, 1)
248-
c.Assert(seen["unhandled event from transaction payload"][0]["type"], check.Equals, "*replication.QueryEvent")
250+
c.Assert(seen["unhandled event from transaction payload"][0]["events"], check.DeepEquals, map[string]int{
251+
"*replication.QueryEvent": 1,
252+
})
249253

250254
syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
251255
syncer.recordUnhandledEvent("unhandled event", &replication.QueryEvent{})
252256
syncer.recordUnhandledEvent("unhandled event from transaction payload", &replication.QueryEvent{})
253257
c.Assert(logs.All(), check.HasLen, 2)
258+
259+
syncer.unhandledEvents.Lock()
260+
defer syncer.unhandledEvents.Unlock()
261+
c.Assert(syncer.unhandledEvents.counts["unhandled event"], check.DeepEquals, map[string]int{
262+
"*replication.QueryEvent": 2,
263+
"*replication.RowsQueryEvent": 3,
264+
})
265+
c.Assert(syncer.unhandledEvents.counts["unhandled event from transaction payload"], check.DeepEquals, map[string]int{
266+
"*replication.QueryEvent": 2,
267+
})
254268
}
255269

256270
func mockGetServerUnixTS(mock sqlmock.Sqlmock) {

0 commit comments

Comments
 (0)