@@ -216,16 +216,26 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
216216 if useSessionTimestamp {
217217 // set the session timestamp to match upstream DDL execution time
218218 if err := setSessionTimestamp (ctx , tx , ddlTimestamp ); err != nil {
219- log .Error ("Fail to set session timestamp for DDL" ,
220- zap .Float64 ("timestamp" , ddlTimestamp ),
221- zap .Uint64 ("startTs" , ddl .StartTs ),
222- zap .Uint64 ("commitTs" , ddl .CommitTs ),
223- zap .String ("query" , ddl .Query ),
224- zap .Error (err ))
225- if rbErr := tx .Rollback (); rbErr != nil {
226- log .Error ("Failed to rollback" , zap .String ("changefeed" , m .id .ID ), zap .Error (rbErr ))
219+ if isIgnorableSessionTimestampErr (err ) {
220+ log .Warn ("Fail to set session timestamp for DDL, continue without session timestamp" ,
221+ zap .Float64 ("timestamp" , ddlTimestamp ),
222+ zap .Uint64 ("startTs" , ddl .StartTs ),
223+ zap .Uint64 ("commitTs" , ddl .CommitTs ),
224+ zap .String ("query" , ddl .Query ),
225+ zap .Error (err ))
226+ useSessionTimestamp = false
227+ } else {
228+ log .Error ("Fail to set session timestamp for DDL" ,
229+ zap .Float64 ("timestamp" , ddlTimestamp ),
230+ zap .Uint64 ("startTs" , ddl .StartTs ),
231+ zap .Uint64 ("commitTs" , ddl .CommitTs ),
232+ zap .String ("query" , ddl .Query ),
233+ zap .Error (err ))
234+ if rbErr := tx .Rollback (); rbErr != nil {
235+ log .Error ("Failed to rollback" , zap .String ("changefeed" , m .id .ID ), zap .Error (rbErr ))
236+ }
237+ return err
227238 }
228- return err
229239 }
230240 }
231241
@@ -252,11 +262,18 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
252262 if useSessionTimestamp {
253263 // reset session timestamp after DDL execution to avoid affecting subsequent operations
254264 if err := resetSessionTimestamp (ctx , tx ); err != nil {
255- log .Error ("Failed to reset session timestamp after DDL execution" , zap .Error (err ))
256- if rbErr := tx .Rollback (); rbErr != nil {
257- log .Error ("Failed to rollback" , zap .String ("sql" , ddl .Query ), zap .Error (rbErr ))
265+ if isIgnorableSessionTimestampErr (err ) {
266+ log .Warn ("Failed to reset session timestamp after DDL execution, continue" ,
267+ zap .String ("namespace" , m .id .Namespace ),
268+ zap .String ("changefeed" , m .id .ID ),
269+ zap .Error (err ))
270+ } else {
271+ log .Error ("Failed to reset session timestamp after DDL execution" , zap .Error (err ))
272+ if rbErr := tx .Rollback (); rbErr != nil {
273+ log .Error ("Failed to rollback" , zap .String ("sql" , ddl .Query ), zap .Error (rbErr ))
274+ }
275+ return errors .WrapError (errors .ErrMySQLTxnError , errors .WithMessage (err , fmt .Sprintf ("Query info: %s; " , ddl .Query )))
258276 }
259- return errors .WrapError (errors .ErrMySQLTxnError , errors .WithMessage (err , fmt .Sprintf ("Query info: %s; " , ddl .Query )))
260277 }
261278 }
262279
0 commit comments