Skip to content

Commit 9edb720

Browse files
haiboumichti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#12490
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent f18090a commit 9edb720

File tree

9 files changed

+633
-4
lines changed

9 files changed

+633
-4
lines changed

cdc/sink/ddlsink/mysql/helper.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package mysql
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"strconv"
21+
"strings"
22+
"time"
23+
24+
"github.com/pingcap/failpoint"
25+
"github.com/pingcap/log"
26+
timodel "github.com/pingcap/tidb/pkg/meta/model"
27+
"github.com/pingcap/tidb/pkg/parser"
28+
"github.com/pingcap/tidb/pkg/parser/ast"
29+
tidbmysql "github.com/pingcap/tidb/pkg/parser/mysql"
30+
"github.com/pingcap/tiflow/cdc/model"
31+
"go.uber.org/zap"
32+
)
33+
34+
func setSessionTimestamp(ctx context.Context, tx *sql.Tx, unixTimestamp float64) error {
35+
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TIMESTAMP = %s", formatUnixTimestamp(unixTimestamp)))
36+
return err
37+
}
38+
39+
// resetSessionTimestamp clears session @@timestamp to prevent stale values from
40+
// leaking across DDLs using the same session; it's a cheap safety net before
41+
// and after DDL execution.
42+
func resetSessionTimestamp(ctx context.Context, tx *sql.Tx) error {
43+
_, err := tx.ExecContext(ctx, "SET TIMESTAMP = DEFAULT")
44+
return err
45+
}
46+
47+
func formatUnixTimestamp(unixTimestamp float64) string {
48+
return strconv.FormatFloat(unixTimestamp, 'f', 6, 64)
49+
}
50+
51+
func ddlSessionTimestampFromOriginDefault(ddl *model.DDLEvent, timezone string) (float64, bool) {
52+
if ddl == nil || ddl.TableInfo == nil || ddl.TableInfo.TableInfo == nil {
53+
return 0, false
54+
}
55+
targetColumns, err := extractCurrentTimestampDefaultColumns(ddl.Query)
56+
if err != nil || len(targetColumns) == 0 {
57+
return 0, false
58+
}
59+
60+
for _, col := range ddl.TableInfo.Columns {
61+
if col == nil {
62+
continue
63+
}
64+
if _, ok := targetColumns[col.Name.L]; !ok {
65+
continue
66+
}
67+
val := col.GetOriginDefaultValue()
68+
valStr, ok := val.(string)
69+
if !ok || valStr == "" {
70+
continue
71+
}
72+
ts, err := parseOriginDefaultTimestamp(valStr, col, timezone)
73+
if err != nil {
74+
log.Warn("Failed to parse OriginDefaultValue for DDL timestamp",
75+
zap.String("column", col.Name.O),
76+
zap.String("originDefault", valStr),
77+
zap.Error(err))
78+
continue
79+
}
80+
log.Info("Using OriginDefaultValue for DDL timestamp",
81+
zap.String("column", col.Name.O),
82+
zap.String("originDefault", valStr),
83+
zap.Float64("timestamp", ts),
84+
zap.String("timezone", timezone))
85+
return ts, true
86+
}
87+
88+
return 0, false
89+
}
90+
91+
func extractCurrentTimestampDefaultColumns(query string) (map[string]struct{}, error) {
92+
p := parser.New()
93+
stmt, err := p.ParseOneStmt(query, "", "")
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
cols := make(map[string]struct{})
99+
switch s := stmt.(type) {
100+
case *ast.CreateTableStmt:
101+
for _, col := range s.Cols {
102+
if hasCurrentTimestampDefault(col) {
103+
cols[col.Name.Name.L] = struct{}{}
104+
}
105+
}
106+
case *ast.AlterTableStmt:
107+
for _, spec := range s.Specs {
108+
switch spec.Tp {
109+
case ast.AlterTableAddColumns, ast.AlterTableModifyColumn, ast.AlterTableChangeColumn, ast.AlterTableAlterColumn:
110+
for _, col := range spec.NewColumns {
111+
if hasCurrentTimestampDefault(col) {
112+
cols[col.Name.Name.L] = struct{}{}
113+
}
114+
}
115+
}
116+
}
117+
}
118+
119+
return cols, nil
120+
}
121+
122+
func hasCurrentTimestampDefault(col *ast.ColumnDef) bool {
123+
if col == nil {
124+
return false
125+
}
126+
for _, opt := range col.Options {
127+
if opt.Tp != ast.ColumnOptionDefaultValue {
128+
continue
129+
}
130+
if isCurrentTimestampExpr(opt.Expr) {
131+
return true
132+
}
133+
}
134+
return false
135+
}
136+
137+
func isCurrentTimestampExpr(expr ast.ExprNode) bool {
138+
if expr == nil {
139+
return false
140+
}
141+
switch v := expr.(type) {
142+
case *ast.FuncCallExpr:
143+
return isCurrentTimestampFuncName(v.FnName.L)
144+
case ast.ValueExpr:
145+
return isCurrentTimestampFuncName(strings.ToLower(v.GetString()))
146+
default:
147+
return false
148+
}
149+
}
150+
151+
func isCurrentTimestampFuncName(name string) bool {
152+
switch name {
153+
case ast.CurrentTimestamp, ast.Now, ast.LocalTime, ast.LocalTimestamp:
154+
return true
155+
default:
156+
return false
157+
}
158+
}
159+
160+
func parseOriginDefaultTimestamp(val string, col *timodel.ColumnInfo, timezone string) (float64, error) {
161+
loc, err := resolveOriginDefaultLocation(col, timezone)
162+
if err != nil {
163+
return 0, err
164+
}
165+
return parseTimestampInLocation(val, loc)
166+
}
167+
168+
func resolveOriginDefaultLocation(col *timodel.ColumnInfo, timezone string) (*time.Location, error) {
169+
if col != nil && col.GetType() == tidbmysql.TypeTimestamp && col.Version >= timodel.ColumnInfoVersion1 {
170+
return time.UTC, nil
171+
}
172+
if timezone == "" {
173+
return time.UTC, nil
174+
}
175+
tz := strings.Trim(timezone, "\"")
176+
return time.LoadLocation(tz)
177+
}
178+
179+
func parseTimestampInLocation(val string, loc *time.Location) (float64, error) {
180+
formats := []string{
181+
"2006-01-02 15:04:05",
182+
"2006-01-02 15:04:05.999999",
183+
}
184+
for _, f := range formats {
185+
t, err := time.ParseInLocation(f, val, loc)
186+
if err == nil {
187+
return float64(t.UnixNano()) / float64(time.Second), nil
188+
}
189+
}
190+
return 0, fmt.Errorf("failed to parse timestamp: %s", val)
191+
}
192+
193+
func matchFailpointValue(val failpoint.Value, ddlQuery string) bool {
194+
if val == nil {
195+
return true
196+
}
197+
switch v := val.(type) {
198+
case bool:
199+
return v
200+
case string:
201+
if v == "" {
202+
return true
203+
}
204+
return strings.Contains(strings.ToLower(ddlQuery), strings.ToLower(v))
205+
default:
206+
return true
207+
}
208+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package mysql
15+
16+
import (
17+
"testing"
18+
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestMatchFailpointValue(t *testing.T) {
23+
ddl := "ALTER TABLE t ADD COLUMN c2 int"
24+
tests := []struct {
25+
name string
26+
val any
27+
want bool
28+
}{
29+
{name: "nil", val: nil, want: true},
30+
{name: "bool-true", val: true, want: true},
31+
{name: "bool-false", val: false, want: false},
32+
{name: "empty-string", val: "", want: true},
33+
{name: "match-string", val: "c2", want: true},
34+
{name: "match-string-case", val: "C2", want: true},
35+
{name: "no-match", val: "d2", want: false},
36+
{name: "unknown-type", val: 123, want: true},
37+
}
38+
39+
for _, tc := range tests {
40+
t.Run(tc.name, func(t *testing.T) {
41+
require.Equal(t, tc.want, matchFailpointValue(tc.val, ddl))
42+
})
43+
}
44+
}

0 commit comments

Comments
 (0)