Skip to content

Commit 9162286

Browse files
authored
ttl: fix some memory leak in TTL (#45512) (#45514)
close #45510
1 parent 3e492da commit 9162286

File tree

3 files changed

+35
-6
lines changed

3 files changed

+35
-6
lines changed

session/session.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3584,8 +3584,10 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
35843584
// attachStatsCollector attaches the stats collector in the dom for the session
35853585
func attachStatsCollector(s *session, dom *domain.Domain) *session {
35863586
if dom.StatsHandle() != nil && dom.StatsUpdating() {
3587-
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
3588-
if GetIndexUsageSyncLease() > 0 {
3587+
if s.statsCollector == nil {
3588+
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
3589+
}
3590+
if s.idxUsageCollector == nil && GetIndexUsageSyncLease() > 0 {
35893591
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
35903592
}
35913593
}
@@ -3595,9 +3597,14 @@ func attachStatsCollector(s *session, dom *domain.Domain) *session {
35953597

35963598
// detachStatsCollector removes the stats collector in the session
35973599
func detachStatsCollector(s *session) *session {
3598-
s.statsCollector = nil
3599-
s.idxUsageCollector = nil
3600-
3600+
if s.statsCollector != nil {
3601+
s.statsCollector.Delete()
3602+
s.statsCollector = nil
3603+
}
3604+
if s.idxUsageCollector != nil {
3605+
s.idxUsageCollector.Delete()
3606+
s.idxUsageCollector = nil
3607+
}
36013608
return s
36023609
}
36033610

ttl/ttlworker/task_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ loop:
306306
err = idleWorker.Schedule(task.ttlScanTask)
307307
if err != nil {
308308
logger.Warn("fail to schedule task", zap.Error(err))
309+
task.cancel()
309310
continue
310311
}
311312

@@ -457,6 +458,8 @@ func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) {
457458
stillRunningTasks = append(stillRunningTasks, task)
458459
continue
459460
}
461+
// we should cancel task to release inner context and avoid memory leak
462+
task.cancel()
460463
err := m.reportTaskFinished(se, now, task)
461464
if err != nil {
462465
logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err))
@@ -579,6 +582,11 @@ type runningScanTask struct {
579582
result *ttlScanTaskExecResult
580583
}
581584

585+
// Context returns context for the task and is only used by test now
586+
func (t *runningScanTask) Context() context.Context {
587+
return t.ctx
588+
}
589+
582590
func (t *runningScanTask) finished() bool {
583591
return t.result != nil && t.statistics.TotalRows.Load() == t.statistics.ErrorRows.Load()+t.statistics.SuccessRows.Load()
584592
}

ttl/ttlworker/task_manager_integration_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func TestParallelSchedule(t *testing.T) {
131131
require.NoError(t, isc.Update(sessionFactory()))
132132
now := time.Now()
133133
scheduleWg := sync.WaitGroup{}
134+
finishTasks := make([]func(), 0, 4)
134135
for i := 0; i < 4; i++ {
135136
workers := []ttlworker.Worker{}
136137
for j := 0; j < 4; j++ {
@@ -139,21 +140,34 @@ func TestParallelSchedule(t *testing.T) {
139140
workers = append(workers, scanWorker)
140141
}
141142

142-
m := ttlworker.NewTaskManager(context.Background(), nil, isc, fmt.Sprintf("task-manager-%d", i), store)
143+
managerID := fmt.Sprintf("task-manager-%d", i)
144+
m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store)
143145
m.SetScanWorkers4Test(workers)
144146
scheduleWg.Add(1)
145147
go func() {
146148
se := sessionFactory()
147149
m.RescheduleTasks(se, now)
148150
scheduleWg.Done()
149151
}()
152+
finishTasks = append(finishTasks, func() {
153+
se := sessionFactory()
154+
for _, task := range m.GetRunningTasks() {
155+
require.Nil(t, task.Context().Err(), fmt.Sprintf("%s %d", managerID, task.ScanID))
156+
task.SetResult(nil)
157+
m.CheckFinishedTask(se, time.Now())
158+
require.NotNil(t, task.Context().Err(), fmt.Sprintf("%s %d", managerID, task.ScanID))
159+
}
160+
})
150161
}
151162
scheduleWg.Wait()
152163
// all tasks should have been scheduled
153164
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("16"))
154165
for i := 0; i < 4; i++ {
155166
sql := fmt.Sprintf("select count(1) from mysql.tidb_ttl_task where status = 'running' AND owner_id = 'task-manager-%d'", i)
156167
tk.MustQuery(sql).Check(testkit.Rows("4"))
168+
finishTasks[i]()
169+
sql = fmt.Sprintf("select count(1) from mysql.tidb_ttl_task where status = 'finished' AND owner_id = 'task-manager-%d'", i)
170+
tk.MustQuery(sql).Check(testkit.Rows("4"))
157171
}
158172
}
159173

0 commit comments

Comments
 (0)