Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path"
"reflect"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -97,6 +98,8 @@ const (
skipJobIdx = iota
ddlJobIdx
workerJobTSArrayInitSize // size = skip + ddl

unhandledEventFlushInterval = 5 * time.Minute
)

// waitXIDStatus represents the status for waiting XID event when pause/stop task.
Expand All @@ -108,6 +111,11 @@ const (
waitComplete
)

type unhandledEventLogKey struct {
message string
eventType string
}

// Syncer can sync your MySQL data to another MySQL database.
type Syncer struct {
sync.RWMutex
Expand Down Expand Up @@ -256,6 +264,11 @@ type Syncer struct {
idAndCollationMap map[int]string

ddlWorker *DDLWorker

unhandledEvents struct {
sync.Mutex
counts map[unhandledEventLogKey]int
}
}

// NewSyncer creates a new Syncer.
Expand Down Expand Up @@ -307,6 +320,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel
syncer.lastCheckpointFlushedTime = time.Time{}
syncer.relay = relay
syncer.safeMode = sm.NewSafeMode()
syncer.unhandledEvents.counts = make(map[unhandledEventLogKey]int)

return syncer
}
Expand Down Expand Up @@ -343,6 +357,71 @@ func (s *Syncer) closeJobChans() {
s.jobsClosed.Store(true)
}

func (s *Syncer) recordUnhandledEvent(message string, ev interface{}) {
eventType := fmt.Sprintf("%T", ev)
s.unhandledEvents.Lock()
s.unhandledEvents.counts[unhandledEventLogKey{
message: message,
eventType: eventType,
}]++
s.unhandledEvents.Unlock()
}

func (s *Syncer) flushUnhandledEvents() {
s.unhandledEvents.Lock()
if len(s.unhandledEvents.counts) == 0 {
s.unhandledEvents.Unlock()
return
}

entries := make([]struct {
key unhandledEventLogKey
count int
}, 0, len(s.unhandledEvents.counts))
for key, count := range s.unhandledEvents.counts {
entries = append(entries, struct {
key unhandledEventLogKey
count int
}{
key: key,
count: count,
})
}
clear(s.unhandledEvents.counts)
s.unhandledEvents.Unlock()

sort.Slice(entries, func(i, j int) bool {
if entries[i].key.message != entries[j].key.message {
return entries[i].key.message < entries[j].key.message
}
return entries[i].key.eventType < entries[j].key.eventType
})

for _, entry := range entries {
s.tctx.L().Warn(entry.key.message,
zap.String("type", entry.key.eventType),
zap.Int("count", entry.count),
)
}
}

func (s *Syncer) logUnhandledEventsCronJob(ctx context.Context) {
defer s.runWg.Done()

ticker := time.NewTicker(unhandledEventFlushInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
s.flushUnhandledEvents()
return
case <-ticker.C:
s.flushUnhandledEvents()
}
}
}

// Type implements Unit.Type.
func (s *Syncer) Type() pb.UnitType {
return pb.UnitType_Sync
Expand Down Expand Up @@ -1822,6 +1901,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
go s.updateLagCronJob(s.runCtx.Ctx)
s.runWg.Add(1)
go s.updateTSOffsetCronJob(s.runCtx.Ctx)
s.runWg.Add(1)
go s.logUnhandledEventsCronJob(s.runCtx.Ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use SampleLogger inside tidb, instead of a routine to aggregate


// some prepare work before the binlog event loop:
// 1. first we flush checkpoint as needed, so in next resume we won't go to Load unit.
Expand Down Expand Up @@ -2443,7 +2524,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.TableMapEvent:
case *replication.FormatDescriptionEvent:
default:
s.tctx.L().Warn("unhandled event from transaction payload", zap.String("type", fmt.Sprintf("%T", tpevt)))
s.recordUnhandledEvent("unhandled event from transaction payload", tpevt)
}
}
if needContinue {
Expand All @@ -2452,7 +2533,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.TableMapEvent:
case *replication.FormatDescriptionEvent:
default:
s.tctx.L().Warn("unhandled event", zap.String("type", fmt.Sprintf("%T", ev)))
s.recordUnhandledEvent("unhandled event", ev)
}
if err2 != nil {
if err := s.handleEventError(err2, startLocation, endLocation, e.Header.EventType == replication.QUERY_EVENT, originSQL); err != nil {
Expand Down
29 changes: 29 additions & 0 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/pingcap/tiflow/pkg/sqlmodel"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

var _ = check.Suite(&testSyncerSuite{})
Expand Down Expand Up @@ -219,6 +220,34 @@ func (s *testSyncerSuite) TearDownSuite(c *check.C) {
os.RemoveAll(s.cfg.Dir)
}

func (s *testSyncerSuite) TestFlushUnhandledEvents(c *check.C) {
core, logs := observer.New(zap.WarnLevel)
cfg := genDefaultSubTaskConfig4Test()
cfg.FrameworkLogger = zap.New(core)
syncer := NewSyncer(cfg, nil, nil)

syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
syncer.recordUnhandledEvent("unhandled event", &replication.RowsQueryEvent{})
syncer.recordUnhandledEvent("unhandled event from transaction payload", &replication.QueryEvent{})
syncer.flushUnhandledEvents()

entries := logs.All()
c.Assert(entries, check.HasLen, 2)

seen := make(map[string]map[string]interface{}, len(entries))
for _, entry := range entries {
seen[entry.Message] = entry.ContextMap()
}

c.Assert(seen["unhandled event"]["type"], check.Equals, "*replication.RowsQueryEvent")
c.Assert(seen["unhandled event"]["count"], check.Equals, int64(2))
c.Assert(seen["unhandled event from transaction payload"]["type"], check.Equals, "*replication.QueryEvent")
c.Assert(seen["unhandled event from transaction payload"]["count"], check.Equals, int64(1))

syncer.flushUnhandledEvents()
c.Assert(logs.All(), check.HasLen, 2)
}

func mockGetServerUnixTS(mock sqlmock.Sqlmock) {
ts := time.Now().Unix()
rows := sqlmock.NewRows([]string{"UNIX_TIMESTAMP()"}).AddRow(strconv.FormatInt(ts, 10))
Expand Down