Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
916b7db
feat: add comprehensive DDL timestamp handling for sink
haiboumich Dec 5, 2025
275df5b
sink(ticdc): align MySQL DDL timestamp handling with upstream default…
haiboumich Jan 12, 2026
f8c9cd3
tiflow: gate DDL session timestamp on origin_default
haiboumich Jan 13, 2026
8002d41
tiflow: update DDL timestamp handling
haiboumich Jan 13, 2026
ef3aeec
ticdc: revert log
haiboumich Jan 13, 2026
50a83e9
ticdc: refine code
haiboumich Jan 13, 2026
d635098
ticdc: refine code
haiboumich Jan 13, 2026
caff434
ticdc: update log
haiboumich Jan 13, 2026
d837830
Removed //go:build intest and // +build intest
haiboumich Jan 15, 2026
5ba109a
Made the targeted‑skip variant: we now ignore SET TIMESTAMP / reset f…
haiboumich Jan 16, 2026
e0b4f36
Added a MySQL‑only integration test that reproduces the ALTER TABLE …
haiboumich Jan 16, 2026
dc7a1ab
Revert "Made the targeted‑skip variant: we now ignore SET TIMESTAMP /…
haiboumich Jan 16, 2026
67267c6
Merge branch 'pingcap:master' into fix-handle-alter-add-default_curre…
haiboumich Jan 16, 2026
980163d
add more dml before and after adding column for integration test
haiboumich Jan 16, 2026
29d6e56
added a pre-DDL SET TIMESTAMP = DEFAULT so every DDL starts from a cl…
haiboumich Jan 19, 2026
83c5cb5
fix ut
haiboumich Jan 19, 2026
20e2425
sink/mysql: make ddl timestamp leak test deterministic
haiboumich Jan 19, 2026
952bafa
avoid system tz noise
haiboumich Jan 20, 2026
3d6848b
deliberate no pre ddl reset
haiboumich Jan 20, 2026
ece8374
failpoint the right place
haiboumich Jan 20, 2026
0dd9d3c
uncomment pre-ddl
haiboumich Jan 20, 2026
6f42244
sink(ticdc): improve DDL timestamp failpoint coverage
haiboumich Jan 22, 2026
fff1040
sink(ticdc): add failpoint coverage for DDL timestamp reset
haiboumich Jan 22, 2026
76a6e4b
chore: reorder
haiboumich Jan 22, 2026
445aaae
sink(ticdc): guard DDL timestamp helper for nil table info
haiboumich Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions cdc/sink/ddlsink/mysql/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"

dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/errno"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
tidbmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

func setSessionTimestamp(ctx context.Context, tx *sql.Tx, unixTimestamp float64) error {
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TIMESTAMP = %s", formatUnixTimestamp(unixTimestamp)))
return err
}

func resetSessionTimestamp(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "SET TIMESTAMP = DEFAULT")
return err
}

func isIgnorableSessionTimestampErr(err error) bool {
if err == nil {
return false
}
mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError)
if !ok {
return false
}
switch mysqlErr.Number {
case uint16(errno.ErrWrongValueForVar),
uint16(errno.ErrTruncatedWrongValue),
uint16(errno.ErrUnknownSystemVariable):
return true
default:
return false
}
}

func formatUnixTimestamp(unixTimestamp float64) string {
return strconv.FormatFloat(unixTimestamp, 'f', 6, 64)
}

func ddlSessionTimestampFromOriginDefault(ddl *model.DDLEvent, timezone string) (float64, bool) {
if ddl == nil || ddl.TableInfo == nil {
return 0, false
}
targetColumns, err := extractCurrentTimestampDefaultColumns(ddl.Query)
if err != nil || len(targetColumns) == 0 {
return 0, false
}

for _, col := range ddl.TableInfo.Columns {
if _, ok := targetColumns[col.Name.L]; !ok {
continue
}
val := col.GetOriginDefaultValue()
valStr, ok := val.(string)
if !ok || valStr == "" {
continue
}
ts, err := parseOriginDefaultTimestamp(valStr, col, timezone)
if err != nil {
log.Warn("Failed to parse OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Error(err))
continue
}
log.Info("Using OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Float64("timestamp", ts),
zap.String("timezone", timezone))
return ts, true
}

return 0, false
}

func extractCurrentTimestampDefaultColumns(query string) (map[string]struct{}, error) {
p := parser.New()
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
return nil, err
}

cols := make(map[string]struct{})
switch s := stmt.(type) {
case *ast.CreateTableStmt:
for _, col := range s.Cols {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
case *ast.AlterTableStmt:
for _, spec := range s.Specs {
switch spec.Tp {
case ast.AlterTableAddColumns, ast.AlterTableModifyColumn, ast.AlterTableChangeColumn, ast.AlterTableAlterColumn:
for _, col := range spec.NewColumns {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
}
}
}

return cols, nil
}

func hasCurrentTimestampDefault(col *ast.ColumnDef) bool {
if col == nil {
return false
}
for _, opt := range col.Options {
if opt.Tp != ast.ColumnOptionDefaultValue {
continue
}
if isCurrentTimestampExpr(opt.Expr) {
return true
}
}
return false
}

func isCurrentTimestampExpr(expr ast.ExprNode) bool {
if expr == nil {
return false
}
switch v := expr.(type) {
case *ast.FuncCallExpr:
return isCurrentTimestampFuncName(v.FnName.L)
case ast.ValueExpr:
return isCurrentTimestampFuncName(strings.ToLower(v.GetString()))
default:
return false
}
}

func isCurrentTimestampFuncName(name string) bool {
switch name {
case ast.CurrentTimestamp, ast.Now, ast.LocalTime, ast.LocalTimestamp:
return true
default:
return false
}
}

func parseOriginDefaultTimestamp(val string, col *timodel.ColumnInfo, timezone string) (float64, error) {
loc, err := resolveOriginDefaultLocation(col, timezone)
if err != nil {
return 0, err
}
return parseTimestampInLocation(val, loc)
}

func resolveOriginDefaultLocation(col *timodel.ColumnInfo, timezone string) (*time.Location, error) {
if col != nil && col.GetType() == tidbmysql.TypeTimestamp && col.Version >= timodel.ColumnInfoVersion1 {
return time.UTC, nil
}
if timezone == "" {
return time.UTC, nil
}
tz := strings.Trim(timezone, "\"")
return time.LoadLocation(tz)
}

func parseTimestampInLocation(val string, loc *time.Location) (float64, error) {
formats := []string{
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.999999",
}
for _, f := range formats {
t, err := time.ParseInLocation(f, val, loc)
if err == nil {
return float64(t.UnixNano()) / float64(time.Second), nil
}
}
return 0, fmt.Errorf("failed to parse timestamp: %s", val)
}
73 changes: 69 additions & 4 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,44 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
return err
}

ddlTimestamp, useSessionTimestamp := ddlSessionTimestampFromOriginDefault(ddl, m.cfg.Timezone)

if useSessionTimestamp {
// set the session timestamp to match upstream DDL execution time
if err := setSessionTimestamp(ctx, tx, ddlTimestamp); err != nil {
if isIgnorableSessionTimestampErr(err) {
log.Warn("Fail to set session timestamp for DDL, continue without session timestamp",
zap.Float64("timestamp", ddlTimestamp),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("query", ddl.Query),
zap.Error(err))
useSessionTimestamp = false
} else {
log.Error("Fail to set session timestamp for DDL",
zap.Float64("timestamp", ddlTimestamp),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("query", ddl.Query),
zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("changefeed", m.id.ID), zap.Error(rbErr))
}
return err
}
}
}

if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
log.Error("Failed to ExecContext", zap.Any("err", err), zap.Any("query", ddl.Query))
if useSessionTimestamp {
if tsErr := resetSessionTimestamp(ctx, tx); tsErr != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If resetSessionTimestamp failed, does it affect subsequent ddl?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think resetSessionTimestamp fails, the session timestamp can leak and affect later DDLs. I added a pre-DDL SET TIMESTAMP = DEFAULT so every DDL starts from a clean session timestam

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the resetSessionTimestamp is duplicated after ddl is executed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think it is just a safeguard with low cost

log.Warn("Failed to reset session timestamp after DDL execution failure",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(tsErr))
}
}
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback",
zap.String("namespace", m.id.Namespace),
Expand All @@ -222,13 +259,41 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
return err
}

if useSessionTimestamp {
// reset session timestamp after DDL execution to avoid affecting subsequent operations
if err := resetSessionTimestamp(ctx, tx); err != nil {
if isIgnorableSessionTimestampErr(err) {
log.Warn("Failed to reset session timestamp after DDL execution, continue",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
} else {
log.Error("Failed to reset session timestamp after DDL execution", zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query), zap.Error(rbErr))
}
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}
}
}

if err = tx.Commit(); err != nil {
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}

log.Info("Exec DDL succeeded",
zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID),
zap.Duration("duration", time.Since(start)), zap.String("sql", ddl.Query))
logFields := []zap.Field{
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Duration("duration", time.Since(start)),
zap.String("sql", ddl.Query),
}

if useSessionTimestamp {
logFields = append(logFields, zap.Float64("sessionTimestamp", ddlTimestamp))
}

log.Info("Exec DDL succeeded", logFields...)

return nil
}

Expand Down Expand Up @@ -344,7 +409,7 @@ func getDDLCreateTime(ctx context.Context, db *sql.DB) string {
// getDDLStateFromTiDB retrieves the ddl job status of the ddl query from downstream tidb based on the ddl query and the approximate ddl create time.
func getDDLStateFromTiDB(ctx context.Context, db *sql.DB, ddl string, createTime string) (timodel.JobState, error) {
// ddlCreateTime and createTime are both based on UTC timezone of downstream
showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`, createTime, ddl)
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs)
Expand Down
Loading