Skip to content

Commit cf4647e

Browse files
authored
stats: do not close the priority queue in DDL handler (#65271)
1 parent 00916d0 commit cf4647e

File tree

5 files changed

+129
-48
lines changed

5 files changed

+129
-48
lines changed

pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,21 @@ import (
3737

3838
// HandleDDLEvent handles DDL events for the priority queue.
3939
func (pq *AnalysisPriorityQueue) HandleDDLEvent(_ context.Context, sctx sessionctx.Context, event *notifier.SchemaChangeEvent) error {
40-
// Check if auto analyze is enabled.
41-
if !vardef.RunAutoAnalyze.Load() {
42-
// Close the priority queue if auto analyze is disabled.
43-
// This ensures proper cleanup of the DDL notifier (mysql.tidb_ddl_notifier) and prevents the queue from remaining
44-
// in an unknown state. When auto analyze is re-enabled, the priority queue can be properly re-initialized.
45-
// NOTE: It is safe to call Close multiple times and it will get the lock internally.
46-
pq.Close()
47-
return nil
48-
}
4940
pq.syncFields.mu.Lock()
5041
defer pq.syncFields.mu.Unlock()
5142
// If the priority queue is not initialized, we should retry later.
5243
if !pq.syncFields.initialized {
53-
return notifier.ErrNotReadyRetryLater
44+
// If auto analyze is enabled but the priority queue is not initialized,
45+
// it means the priority queue initialization is not finished yet.
46+
// So we should retry later.
47+
if vardef.RunAutoAnalyze.Load() {
48+
return notifier.ErrNotReadyRetryLater
49+
}
50+
// NOTE: If auto analyze is disabled and the priority queue is not initialized,
51+
// we can just ignore the DDL events.
52+
// SAFETY: Once auto analyze is enabled again, the priority queue will be initialized
53+
// and it will recreate the jobs for all tables.
54+
return nil
5455
}
5556

5657
var err error

pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ func TestCreateIndexUnderDDLAnalyzeEnabled(t *testing.T) {
11081108
require.Equal(t, columnInfo[0].Name.L, "c1")
11091109
}
11101110

1111-
func TestTurnOffAutoAnalyze(t *testing.T) {
1111+
func TestTurnOffAutoAnalyzeAfterQueueInit(t *testing.T) {
11121112
store, do := testkit.CreateMockStoreAndDomain(t)
11131113
testKit := testkit.NewTestKit(t, store)
11141114
testKit.MustExec("use test")
@@ -1141,31 +1141,71 @@ func TestTurnOffAutoAnalyze(t *testing.T) {
11411141
require.NoError(t, err)
11421142
require.Equal(t, tableInfo.ID, job.GetTableID())
11431143

1144-
// Truncate table.
1145-
testKit.MustExec("truncate table t")
1144+
// Add a new index on column c1.
1145+
testKit.MustExec("alter table t add index idx1(c1)")
11461146

1147-
// Find the truncate table event.
1148-
truncateTableEvent := statstestutil.FindEvent(h.DDLEventCh(), model.ActionTruncateTable)
1147+
// Find the add index event.
1148+
addIndexEvent := statstestutil.FindEvent(h.DDLEventCh(), model.ActionAddIndex)
1149+
require.NotNil(t, addIndexEvent)
11491150

11501151
// Disable the auto analyze.
11511152
testKit.MustExec("set @@global.tidb_enable_auto_analyze = 0;")
11521153

1153-
// Handle the truncate table event.
1154-
err = statstestutil.HandleDDLEventWithTxn(h, truncateTableEvent)
1154+
// Handle the add index event.
1155+
err = statstestutil.HandleDDLEventWithTxn(h, addIndexEvent)
11551156
require.NoError(t, err)
11561157

1157-
sctx := testKit.Session().(sessionctx.Context)
1158-
// Handle the truncate table event in priority queue.
1159-
require.NoError(t, pq.HandleDDLEvent(ctx, sctx, truncateTableEvent))
1160-
1161-
// Because the auto analyze is turned off, the priority queue should be closed properly.
1162-
_, err = pq.IsEmptyForTest()
1163-
require.ErrorContains(t, err, "priority queue not initialized")
1158+
// Handle the add index event in priority queue.
1159+
require.NoError(t, statsutil.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
1160+
return pq.HandleDDLEvent(ctx, sctx, addIndexEvent)
1161+
}, statsutil.FlagWrapTxn))
11641162

1165-
// The priority queue can be re-initialized after turning on the auto analyze.
1166-
// We manually mock it here.
1167-
require.NoError(t, pq.Initialize(ctx))
11681163
isEmpty, err = pq.IsEmptyForTest()
11691164
require.NoError(t, err)
1170-
require.True(t, isEmpty)
1165+
require.False(t, isEmpty)
1166+
}
1167+
1168+
func TestTurnOffAutoAnalyzeBeforeQueueInit(t *testing.T) {
1169+
store, do := testkit.CreateMockStoreAndDomain(t)
1170+
testKit := testkit.NewTestKit(t, store)
1171+
testKit.MustExec("use test")
1172+
enableAutoAnalyze(t, testKit)
1173+
testKit.MustExec("create table t (c1 int, c2 int, index idx(c1, c2))")
1174+
h := do.StatsHandle()
1175+
statstestutil.HandleNextDDLEventWithTxn(h)
1176+
// Insert some data.
1177+
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
1178+
require.NoError(t, h.DumpStatsDeltaToKV(true))
1179+
require.NoError(t, h.Update(context.Background(), do.InfoSchema()))
1180+
1181+
statistics.AutoAnalyzeMinCnt = 0
1182+
defer func() {
1183+
statistics.AutoAnalyzeMinCnt = 1000
1184+
}()
1185+
1186+
// Disable the auto analyze.
1187+
testKit.MustExec("set @@global.tidb_enable_auto_analyze = 0;")
1188+
1189+
pq := priorityqueue.NewAnalysisPriorityQueue(h)
1190+
defer pq.Close()
1191+
ctx := context.Background()
1192+
1193+
// Add a new index on column c1.
1194+
testKit.MustExec("alter table t add index idx1(c1)")
1195+
1196+
// Find the add index event.
1197+
addIndexEvent := statstestutil.FindEvent(h.DDLEventCh(), model.ActionAddIndex)
1198+
require.NotNil(t, addIndexEvent)
1199+
1200+
// Handle the add index event.
1201+
err := statstestutil.HandleDDLEventWithTxn(h, addIndexEvent)
1202+
require.NoError(t, err)
1203+
1204+
// Handle the add index event in priority queue.
1205+
require.NoError(t, statsutil.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
1206+
return pq.HandleDDLEvent(ctx, sctx, addIndexEvent)
1207+
}, statsutil.FlagWrapTxn))
1208+
1209+
_, err = pq.IsEmptyForTest()
1210+
require.ErrorContains(t, err, "priority queue not initialized")
11711211
}

pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ go_test(
3434
"worker_test.go",
3535
],
3636
flaky = True,
37-
shard_count = 11,
37+
shard_count = 12,
3838
deps = [
3939
":refresher",
4040
"//pkg/parser/ast",

pkg/statistics/handle/autoanalyze/refresher/refresher.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,6 @@ func (r *Refresher) UpdateConcurrency() {
9292
// Usually, this is done by the caller through `util.CallWithSCtx`.
9393
func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
9494
parameters := exec.GetAutoAnalyzeParameters(sctx)
95-
err := r.setAutoAnalysisTimeWindow(parameters)
96-
if err != nil {
97-
statslogutil.StatsErrVerboseSampleLogger().Error("Set auto analyze time window failed", zap.Error(err))
98-
return false
99-
}
100-
if !r.isWithinTimeWindow() {
101-
return false
102-
}
10395
currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[vardef.TiDBAutoAnalyzeRatio])
10496
currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
10597
if !r.jobs.IsInitialized() {
@@ -122,6 +114,20 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
122114
}
123115
}
124116

117+
// NOTE: This check must be done after initializing/rebuilding the queue.
118+
// For example, if TiDB instances restart outside the time window, the queue will not be initialized.
119+
// This means the DDL events are not being processed. This would prevent the DDL notifier from moving forward.
120+
// Although this won't cause a correctness issue and it does not affect other handlers, it is still better to avoid this situation.
121+
// We should make sure the queue is always initialized when the current instance is the owner.
122+
err := r.setAutoAnalysisTimeWindow(parameters)
123+
if err != nil {
124+
statslogutil.StatsErrVerboseSampleLogger().Error("Set auto analyze time window failed", zap.Error(err))
125+
return false
126+
}
127+
if !r.isWithinTimeWindow() {
128+
return false
129+
}
130+
125131
// Update the concurrency to the latest value.
126132
r.UpdateConcurrency()
127133
// Check remaining concurrency.

pkg/statistics/handle/autoanalyze/refresher/refresher_test.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,38 +49,72 @@ func TestTurnOffAndOnAutoAnalyze(t *testing.T) {
4949
tk.MustExec("insert into t values (5, 4), (5, 5), (6, 6)")
5050
require.NoError(t, handle.DumpStatsDeltaToKV(true))
5151
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
52-
sysProcTracker := dom.SysProcTracker()
53-
r := refresher.NewRefresher(context.Background(), handle, sysProcTracker, dom.DDLNotifier())
54-
defer r.Close()
52+
53+
// This would initialize the auto analyze queue.
54+
require.True(t, handle.HandleAutoAnalyze())
5555
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
56-
require.True(t, handle.HandleAutoAnalyze())
5756
return nil
5857
}))
59-
r.WaitAutoAnalyzeFinishedForTest()
6058

59+
// Disable auto analyze.
6160
tk.MustExec("set @@global.tidb_enable_auto_analyze = 0;")
62-
6361
// Add a new index to generate DDL event.
6462
tk.MustExec("alter table t add index idx_b(b);")
6563
// Make sure the mysql.tidb_ddl_notifier is not empty.
6664
rows := tk.MustQuery("select * from mysql.tidb_ddl_notifier").Rows()
6765
require.Greater(t, len(rows), 0)
68-
require.Eventually(t, func() bool {
69-
return !r.IsQueueInitializedForTest()
70-
}, time.Second*5, time.Millisecond*100)
7166

7267
// Make sure the mysql.tidb_ddl_notifier table is empty.
7368
require.Eventually(t, func() bool {
7469
rows := tk.MustQuery("select * from mysql.tidb_ddl_notifier").Rows()
7570
return len(rows) == 0
7671
}, time.Second*5, time.Millisecond*100)
7772

78-
// Enable auto analyze again to make sure the queue is re-initialized and handles the DDL event correctly.
73+
// Make sure the table is added to the auto analyze queue.
74+
snapshot, err := handle.GetPriorityQueueSnapshot()
75+
require.NoError(t, err)
76+
require.Equal(t, 1, len(snapshot.CurrentJobs), "The queue should have one job for table t with the new index idx_b")
77+
78+
// Enable auto analyze again to make sure the queue works.
7979
tk.MustExec("set @@global.tidb_enable_auto_analyze = 1;")
80+
require.True(t, handle.HandleAutoAnalyze())
81+
}
82+
83+
func TestQueueInitializesOutsideTimeWindow(t *testing.T) {
84+
statistics.AutoAnalyzeMinCnt = 0
85+
defer func() {
86+
statistics.AutoAnalyzeMinCnt = 1000
87+
}()
88+
89+
store, dom := testkit.CreateMockStoreAndDomain(t)
90+
tk := testkit.NewTestKit(t, store)
91+
92+
handle := dom.StatsHandle()
93+
now := time.Now()
94+
// Set the auto-analyze time window to a future time.
95+
tk.MustExec("set @@global.tidb_auto_analyze_start_time = ?", now.Add(time.Hour).Format("15:04"))
96+
tk.MustExec("set @@global.tidb_auto_analyze_end_time = ?", now.Add(2*time.Hour).Format("15:04"))
97+
98+
tk.MustExec("use test")
99+
tk.MustExec("create table t (a int, b int, index idx(a))")
100+
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)")
101+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
102+
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
103+
tk.MustExec("analyze table t")
104+
105+
r := refresher.NewRefresher(context.Background(), handle, dom.SysProcTracker(), dom.DDLNotifier())
106+
defer r.Close()
107+
108+
tk.MustExec("insert into t values (5, 4), (5, 5), (6, 6)")
109+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
110+
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
111+
80112
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
81-
require.True(t, handle.HandleAutoAnalyze())
113+
require.False(t, r.AnalyzeHighestPriorityTables(sctx), "Because it's out of time window, no jobs should be executed")
82114
return nil
83115
}))
116+
r.WaitAutoAnalyzeFinishedForTest()
117+
require.Equal(t, 1, r.Len(), "The job queue should be initialized even if it is out of the time window")
84118
}
85119

86120
func TestChangePruneMode(t *testing.T) {

0 commit comments

Comments
 (0)