Skip to content

Commit 7f61b0f

Browse files
committed
sink/mysql: align DDL time defaults with origin_default (pingcap#12490)
close pingcap#11368
1 parent 6c53c68 commit 7f61b0f

File tree

10 files changed

+609
-5
lines changed

10 files changed

+609
-5
lines changed

cdc/entry/schema/snapshot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,10 +790,10 @@ func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs ui
790790
s.doCreateTable(tbInfo, currentTs)
791791
tag := negative(currentTs)
792792
// when the table is a partition table, we have to record all partition ids
793-
if old.IsPartitionTable() {
793+
if old.GetPartitionInfo() != nil {
794794
newPi := tbInfo.GetPartitionInfo()
795795
oldPi := old.GetPartitionInfo()
796-
newPartitionIDMap := make(map[int64]struct{}, len(newPi.NewPartitionIDs))
796+
newPartitionIDMap := make(map[int64]struct{}, len(newPi.Definitions))
797797
for _, partition := range newPi.Definitions {
798798
newPartitionIDMap[partition.ID] = struct{}{}
799799
}

cdc/sinkv2/ddlsink/mysql/helper.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package mysql
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"strconv"
21+
"strings"
22+
"time"
23+
24+
"github.com/pingcap/failpoint"
25+
"github.com/pingcap/log"
26+
"github.com/pingcap/tidb/parser"
27+
"github.com/pingcap/tidb/parser/ast"
28+
timodel "github.com/pingcap/tidb/parser/model"
29+
tidbmysql "github.com/pingcap/tidb/parser/mysql"
30+
"github.com/pingcap/tiflow/cdc/model"
31+
"go.uber.org/zap"
32+
)
33+
34+
func setSessionTimestamp(ctx context.Context, tx *sql.Tx, unixTimestamp float64) error {
35+
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TIMESTAMP = %s", formatUnixTimestamp(unixTimestamp)))
36+
return err
37+
}
38+
39+
// resetSessionTimestamp clears session @@timestamp to prevent stale values from
40+
// leaking across DDLs using the same session; it's a cheap safety net before
41+
// and after DDL execution.
42+
func resetSessionTimestamp(ctx context.Context, tx *sql.Tx) error {
43+
_, err := tx.ExecContext(ctx, "SET TIMESTAMP = DEFAULT")
44+
return err
45+
}
46+
47+
func formatUnixTimestamp(unixTimestamp float64) string {
48+
return strconv.FormatFloat(unixTimestamp, 'f', 6, 64)
49+
}
50+
51+
func ddlSessionTimestampFromOriginDefault(ddl *model.DDLEvent, timezone string) (float64, bool) {
52+
if ddl == nil || ddl.TableInfo == nil || ddl.TableInfo.TableInfo == nil {
53+
return 0, false
54+
}
55+
targetColumns, err := extractCurrentTimestampDefaultColumns(ddl.Query)
56+
if err != nil || len(targetColumns) == 0 {
57+
return 0, false
58+
}
59+
60+
for _, col := range ddl.TableInfo.Columns {
61+
if col == nil {
62+
continue
63+
}
64+
if _, ok := targetColumns[col.Name.L]; !ok {
65+
continue
66+
}
67+
val := col.GetOriginDefaultValue()
68+
valStr, ok := val.(string)
69+
if !ok || valStr == "" {
70+
continue
71+
}
72+
ts, err := parseOriginDefaultTimestamp(valStr, col, timezone)
73+
if err != nil {
74+
log.Warn("Failed to parse OriginDefaultValue for DDL timestamp",
75+
zap.String("column", col.Name.O),
76+
zap.String("originDefault", valStr),
77+
zap.Error(err))
78+
continue
79+
}
80+
log.Info("Using OriginDefaultValue for DDL timestamp",
81+
zap.String("column", col.Name.O),
82+
zap.String("originDefault", valStr),
83+
zap.Float64("timestamp", ts),
84+
zap.String("timezone", timezone))
85+
return ts, true
86+
}
87+
88+
return 0, false
89+
}
90+
91+
func extractCurrentTimestampDefaultColumns(query string) (map[string]struct{}, error) {
92+
p := parser.New()
93+
stmt, err := p.ParseOneStmt(query, "", "")
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
cols := make(map[string]struct{})
99+
switch s := stmt.(type) {
100+
case *ast.CreateTableStmt:
101+
for _, col := range s.Cols {
102+
if hasCurrentTimestampDefault(col) {
103+
cols[col.Name.Name.L] = struct{}{}
104+
}
105+
}
106+
case *ast.AlterTableStmt:
107+
for _, spec := range s.Specs {
108+
switch spec.Tp {
109+
case ast.AlterTableAddColumns, ast.AlterTableModifyColumn, ast.AlterTableChangeColumn, ast.AlterTableAlterColumn:
110+
for _, col := range spec.NewColumns {
111+
if hasCurrentTimestampDefault(col) {
112+
cols[col.Name.Name.L] = struct{}{}
113+
}
114+
}
115+
}
116+
}
117+
}
118+
119+
return cols, nil
120+
}
121+
122+
func hasCurrentTimestampDefault(col *ast.ColumnDef) bool {
123+
if col == nil {
124+
return false
125+
}
126+
for _, opt := range col.Options {
127+
if opt.Tp != ast.ColumnOptionDefaultValue {
128+
continue
129+
}
130+
if isCurrentTimestampExpr(opt.Expr) {
131+
return true
132+
}
133+
}
134+
return false
135+
}
136+
137+
func isCurrentTimestampExpr(expr ast.ExprNode) bool {
138+
if expr == nil {
139+
return false
140+
}
141+
switch v := expr.(type) {
142+
case *ast.FuncCallExpr:
143+
return isCurrentTimestampFuncName(v.FnName.L)
144+
case ast.ValueExpr:
145+
return isCurrentTimestampFuncName(strings.ToLower(v.GetString()))
146+
default:
147+
return false
148+
}
149+
}
150+
151+
func isCurrentTimestampFuncName(name string) bool {
152+
switch name {
153+
case ast.CurrentTimestamp, ast.Now, ast.LocalTime, ast.LocalTimestamp:
154+
return true
155+
default:
156+
return false
157+
}
158+
}
159+
160+
func parseOriginDefaultTimestamp(val string, col *timodel.ColumnInfo, timezone string) (float64, error) {
161+
loc, err := resolveOriginDefaultLocation(col, timezone)
162+
if err != nil {
163+
return 0, err
164+
}
165+
return parseTimestampInLocation(val, loc)
166+
}
167+
168+
func resolveOriginDefaultLocation(col *timodel.ColumnInfo, timezone string) (*time.Location, error) {
169+
if col != nil && col.GetType() == tidbmysql.TypeTimestamp && col.Version >= timodel.ColumnInfoVersion1 {
170+
return time.UTC, nil
171+
}
172+
if timezone == "" {
173+
return time.UTC, nil
174+
}
175+
tz := strings.Trim(timezone, "\"")
176+
return time.LoadLocation(tz)
177+
}
178+
179+
func parseTimestampInLocation(val string, loc *time.Location) (float64, error) {
180+
formats := []string{
181+
"2006-01-02 15:04:05",
182+
"2006-01-02 15:04:05.999999",
183+
}
184+
for _, f := range formats {
185+
t, err := time.ParseInLocation(f, val, loc)
186+
if err == nil {
187+
return float64(t.UnixNano()) / float64(time.Second), nil
188+
}
189+
}
190+
return 0, fmt.Errorf("failed to parse timestamp: %s", val)
191+
}
192+
193+
func matchFailpointValue(val failpoint.Value, ddlQuery string) bool {
194+
if val == nil {
195+
return true
196+
}
197+
switch v := val.(type) {
198+
case bool:
199+
return v
200+
case string:
201+
if v == "" {
202+
return true
203+
}
204+
return strings.Contains(strings.ToLower(ddlQuery), strings.ToLower(v))
205+
default:
206+
return true
207+
}
208+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package mysql
15+
16+
import (
17+
"testing"
18+
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestMatchFailpointValue(t *testing.T) {
23+
ddl := "ALTER TABLE t ADD COLUMN c2 int"
24+
tests := []struct {
25+
name string
26+
val any
27+
want bool
28+
}{
29+
{name: "nil", val: nil, want: true},
30+
{name: "bool-true", val: true, want: true},
31+
{name: "bool-false", val: false, want: false},
32+
{name: "empty-string", val: "", want: true},
33+
{name: "match-string", val: "c2", want: true},
34+
{name: "match-string-case", val: "C2", want: true},
35+
{name: "no-match", val: "d2", want: false},
36+
{name: "unknown-type", val: 123, want: true},
37+
}
38+
39+
for _, tc := range tests {
40+
t.Run(tc.name, func(t *testing.T) {
41+
require.Equal(t, tc.want, matchFailpointValue(tc.val, ddl))
42+
})
43+
}
44+
}

cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,65 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
195195
}
196196
}
197197

198+
// Reset session timestamp before DDL to avoid leaking from pooled connections.
199+
if err := resetSessionTimestamp(ctx, tx); err != nil {
200+
log.Error("Failed to reset session timestamp before DDL execution",
201+
zap.String("namespace", m.id.Namespace),
202+
zap.String("changefeed", m.id.ID),
203+
zap.Error(err))
204+
if rbErr := tx.Rollback(); rbErr != nil {
205+
log.Error("Failed to rollback", zap.String("changefeed", m.id.ID), zap.Error(rbErr))
206+
}
207+
return err
208+
}
209+
210+
ddlTimestamp, useSessionTimestamp := ddlSessionTimestampFromOriginDefault(ddl, m.cfg.Timezone)
211+
skipSetTimestamp := false
212+
failpoint.Inject("MySQLSinkSkipSetSessionTimestamp", func(val failpoint.Value) {
213+
skipSetTimestamp = matchFailpointValue(val, ddl.Query)
214+
})
215+
skipResetAfterDDL := false
216+
failpoint.Inject("MySQLSinkSkipResetSessionTimestampAfterDDL", func(val failpoint.Value) {
217+
skipResetAfterDDL = matchFailpointValue(val, ddl.Query)
218+
})
219+
220+
if useSessionTimestamp && skipSetTimestamp {
221+
log.Warn("Skip setting session timestamp due to failpoint",
222+
zap.String("namespace", m.id.Namespace),
223+
zap.String("changefeed", m.id.ID),
224+
zap.String("query", ddl.Query))
225+
}
226+
if useSessionTimestamp && !skipSetTimestamp {
227+
// set the session timestamp to match upstream DDL execution time
228+
if err := setSessionTimestamp(ctx, tx, ddlTimestamp); err != nil {
229+
log.Error("Fail to set session timestamp for DDL",
230+
zap.Float64("timestamp", ddlTimestamp),
231+
zap.Uint64("startTs", ddl.StartTs),
232+
zap.Uint64("commitTs", ddl.CommitTs),
233+
zap.String("query", ddl.Query),
234+
zap.Error(err))
235+
if rbErr := tx.Rollback(); rbErr != nil {
236+
log.Error("Failed to rollback", zap.String("changefeed", m.id.ID), zap.Error(rbErr))
237+
}
238+
return err
239+
}
240+
}
241+
198242
if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
243+
log.Error("Failed to ExecContext", zap.Any("err", err), zap.Any("query", ddl.Query))
244+
if useSessionTimestamp {
245+
if skipResetAfterDDL {
246+
log.Warn("Skip resetting session timestamp after DDL execution failure due to failpoint",
247+
zap.String("namespace", m.id.Namespace),
248+
zap.String("changefeed", m.id.ID),
249+
zap.String("query", ddl.Query))
250+
} else if tsErr := resetSessionTimestamp(ctx, tx); tsErr != nil {
251+
log.Warn("Failed to reset session timestamp after DDL execution failure",
252+
zap.String("namespace", m.id.Namespace),
253+
zap.String("changefeed", m.id.ID),
254+
zap.Error(tsErr))
255+
}
256+
}
199257
if rbErr := tx.Rollback(); rbErr != nil {
200258
log.Error("Failed to rollback", zap.String("sql", ddl.Query),
201259
zap.String("namespace", m.id.Namespace),
@@ -204,6 +262,22 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
204262
return err
205263
}
206264

265+
if useSessionTimestamp {
266+
// reset session timestamp after DDL execution to avoid affecting subsequent operations
267+
if skipResetAfterDDL {
268+
log.Warn("Skip resetting session timestamp after DDL execution due to failpoint",
269+
zap.String("namespace", m.id.Namespace),
270+
zap.String("changefeed", m.id.ID),
271+
zap.String("query", ddl.Query))
272+
} else if err := resetSessionTimestamp(ctx, tx); err != nil {
273+
log.Error("Failed to reset session timestamp after DDL execution", zap.Error(err))
274+
if rbErr := tx.Rollback(); rbErr != nil {
275+
log.Error("Failed to rollback", zap.String("sql", ddl.Query), zap.Error(rbErr))
276+
}
277+
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
278+
}
279+
}
280+
207281
if err = tx.Commit(); err != nil {
208282
log.Error("Failed to exec DDL", zap.String("sql", ddl.Query),
209283
zap.Duration("duration", time.Since(start)),
@@ -212,10 +286,16 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
212286
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
213287
}
214288

215-
log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query),
289+
logFields := []zap.Field{
290+
zap.String("sql", ddl.Query),
216291
zap.Duration("duration", time.Since(start)),
217292
zap.String("namespace", m.id.Namespace),
218-
zap.String("changefeed", m.id.ID))
293+
zap.String("changefeed", m.id.ID),
294+
}
295+
if useSessionTimestamp {
296+
logFields = append(logFields, zap.Float64("sessionTimestamp", ddlTimestamp))
297+
}
298+
log.Info("Exec DDL succeeded", logFields...)
219299
return nil
220300
}
221301

cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@ func TestWriteDDLEvent(t *testing.T) {
4949
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
5050
mock.ExpectBegin()
5151
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
52+
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
5253
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1))
5354
mock.ExpectCommit()
5455
mock.ExpectBegin()
5556
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
57+
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
5658
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
5759
WillReturnError(&dmysql.MySQLError{
5860
Number: uint16(infoschema.ErrColumnExists.Code()),

0 commit comments

Comments
 (0)