@@ -19,7 +19,6 @@ import (
1919 "fmt"
2020 "math"
2121 "math/rand"
22- "net"
2322 "sort"
2423 "strconv"
2524 "strings"
@@ -66,12 +65,9 @@ import (
6665 "github.com/pingcap/tidb/statistics/handle"
6766 "github.com/pingcap/tidb/store/helper"
6867 "github.com/pingcap/tidb/telemetry"
69- "github.com/pingcap/tidb/ttl/cache"
70- "github.com/pingcap/tidb/ttl/sqlbuilder"
7168 "github.com/pingcap/tidb/ttl/ttlworker"
7269 "github.com/pingcap/tidb/types"
7370 "github.com/pingcap/tidb/util"
74- "github.com/pingcap/tidb/util/chunk"
7571 "github.com/pingcap/tidb/util/dbterror"
7672 disttaskutil "github.com/pingcap/tidb/util/disttask"
7773 "github.com/pingcap/tidb/util/domainutil"
@@ -165,6 +161,7 @@ type Domain struct {
165161 historicalStatsWorker * HistoricalStatsWorker
166162 ttlJobManager atomic.Pointer [ttlworker.JobManager ]
167163 runawayManager * resourcegroup.RunawayManager
164+ runawaySyncer * runawaySyncer
168165 resourceGroupsController * rmclient.ResourceGroupsController
169166
170167 serverID uint64
@@ -1243,6 +1240,7 @@ func (do *Domain) Init(
12431240 do .wg .Run (do .infoSyncerKeeper , "infoSyncerKeeper" )
12441241 do .wg .Run (do .globalConfigSyncerKeeper , "globalConfigSyncerKeeper" )
12451242 do .wg .Run (do .runawayRecordFlushLoop , "runawayRecordFlushLoop" )
1243+ do .wg .Run (do .runawayWatchSyncLoop , "runawayWatchSyncLoop" )
12461244 if ! skipRegisterToDashboard {
12471245 do .wg .Run (do .topologySyncerKeeper , "topologySyncerKeeper" )
12481246 }
@@ -1270,255 +1268,6 @@ func (do *Domain) SetOnClose(onClose func()) {
12701268 do .onClose = onClose
12711269}
12721270
1273- const (
1274- runawayRecordFluashInterval = time .Second
1275- quarantineRecordGCInterval = time .Minute * 10
1276- runawayRecordGCInterval = time .Hour * 24
1277- runawayRecordExpiredDuration = time .Hour * 24 * 7
1278-
1279- runawayRecordGCBatchSize = 100
1280- runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5
1281- )
1282-
1283- var systemSchemaCIStr = model .NewCIStr ("mysql" )
1284-
1285- func (do * Domain ) deleteExpiredRows (tableName , colName string , expiredDuration time.Duration ) {
1286- if ! do .DDL ().OwnerManager ().IsOwner () {
1287- return
1288- }
1289- failpoint .Inject ("FastRunawayGC" , func () {
1290- expiredDuration = time .Second * 1
1291- })
1292- expiredTime := time .Now ().Add (- expiredDuration )
1293- tbCIStr := model .NewCIStr (tableName )
1294- tbl , err := do .InfoSchema ().TableByName (systemSchemaCIStr , tbCIStr )
1295- if err != nil {
1296- logutil .BgLogger ().Error ("delete system table failed" , zap .String ("table" , tableName ), zap .Error (err ))
1297- return
1298- }
1299- tbInfo := tbl .Meta ()
1300- col := tbInfo .FindPublicColumnByName (colName )
1301- if col == nil {
1302- logutil .BgLogger ().Error ("time column is not public in table" , zap .String ("table" , tableName ), zap .String ("column" , colName ))
1303- return
1304- }
1305- tb , err := cache .NewBasePhysicalTable (systemSchemaCIStr , tbInfo , model .NewCIStr ("" ), col )
1306- if err != nil {
1307- logutil .BgLogger ().Error ("delete system table failed" , zap .String ("table" , tableName ), zap .Error (err ))
1308- return
1309- }
1310- generator , err := sqlbuilder .NewScanQueryGenerator (tb , expiredTime , nil , nil )
1311- if err != nil {
1312- logutil .BgLogger ().Error ("delete system table failed" , zap .String ("table" , tableName ), zap .Error (err ))
1313- return
1314- }
1315- var leftRows [][]types.Datum
1316- for {
1317- sql := ""
1318- if sql , err = generator .NextSQL (leftRows , runawayRecordGCSelectBatchSize ); err != nil {
1319- logutil .BgLogger ().Error ("delete system table failed" , zap .String ("table" , tableName ), zap .Error (err ))
1320- return
1321- }
1322- // to remove
1323- if len (sql ) == 0 {
1324- return
1325- }
1326-
1327- rows , sqlErr := do .execRestrictedSQL (sql , nil )
1328- if sqlErr != nil {
1329- logutil .BgLogger ().Error ("delete system table failed" , zap .String ("table" , tableName ), zap .Error (err ))
1330- return
1331- }
1332- leftRows = make ([][]types.Datum , len (rows ))
1333- for i , row := range rows {
1334- leftRows [i ] = row .GetDatumRow (tb .KeyColumnTypes )
1335- }
1336-
1337- for len (leftRows ) > 0 {
1338- var delBatch [][]types.Datum
1339- if len (leftRows ) < runawayRecordGCBatchSize {
1340- delBatch = leftRows
1341- leftRows = nil
1342- } else {
1343- delBatch = leftRows [0 :runawayRecordGCBatchSize ]
1344- leftRows = leftRows [runawayRecordGCBatchSize :]
1345- }
1346- sql , err := sqlbuilder .BuildDeleteSQL (tb , delBatch , expiredTime )
1347- if err != nil {
1348- logutil .BgLogger ().Error (
1349- "build delete SQL failed when deleting system table" ,
1350- zap .Error (err ),
1351- zap .String ("table" , tb .Schema .O + "." + tb .Name .O ),
1352- )
1353- return
1354- }
1355-
1356- _ , err = do .execRestrictedSQL (sql , nil )
1357- if err != nil {
1358- logutil .BgLogger ().Error (
1359- "delete SQL failed when deleting system table" , zap .Error (err ), zap .String ("SQL" , sql ),
1360- )
1361- }
1362- }
1363- }
1364- }
1365-
1366- func (do * Domain ) runawayRecordFlushLoop () {
1367- defer util .Recover (metrics .LabelDomain , "runawayRecordFlushLoop" , nil , false )
1368-
1369- // this times is used to batch flushing rocords, with 1s duration,
1370- // we can guarantee a watch record can be seen by the user within 1s.
1371- runawayRecordFluashTimer := time .NewTimer (runawayRecordFluashInterval )
1372- runawayRecordGCTicker := time .NewTicker (runawayRecordGCInterval )
1373- quarantineRecordGCTicker := time .NewTicker (quarantineRecordGCInterval )
1374- failpoint .Inject ("FastRunawayGC" , func () {
1375- runawayRecordFluashTimer .Stop ()
1376- runawayRecordGCTicker .Stop ()
1377- quarantineRecordGCTicker .Stop ()
1378- runawayRecordFluashTimer = time .NewTimer (time .Millisecond * 50 )
1379- runawayRecordGCTicker = time .NewTicker (time .Millisecond * 200 )
1380- quarantineRecordGCTicker = time .NewTicker (time .Millisecond * 200 )
1381- })
1382-
1383- fired := false
1384- recordCh := do .RunawayManager ().RunawayRecordChan ()
1385- quarantineRecordCh := do .RunawayManager ().QuarantineRecordChan ()
1386- flushThrehold := do .runawayManager .FlushThreshold ()
1387- records := make ([]* resourcegroup.RunawayRecord , 0 , flushThrehold )
1388- quarantineRecords := make ([]* resourcegroup.QuarantineRecord , 0 )
1389-
1390- flushRunawayRecords := func () {
1391- if len (records ) == 0 {
1392- return
1393- }
1394- sql , params := genRunawayQueriesStmt (records )
1395- if _ , err := do .execRestrictedSQL (sql , params ); err != nil {
1396- logutil .BgLogger ().Error ("flush runaway records failed" , zap .Error (err ), zap .Int ("count" , len (records )))
1397- }
1398- records = records [:0 ]
1399- }
1400- flushQuarantineRecords := func () {
1401- if len (quarantineRecords ) == 0 {
1402- return
1403- }
1404- sql , params := genQuarantineQueriesStmt (quarantineRecords )
1405- if _ , err := do .execRestrictedSQL (sql , params ); err != nil {
1406- logutil .BgLogger ().Error ("flush quarantine records failed" , zap .Error (err ), zap .Int ("count" , len (quarantineRecords )))
1407- }
1408- quarantineRecords = quarantineRecords [:0 ]
1409- }
1410- for {
1411- select {
1412- case <- do .exit :
1413- return
1414- case <- runawayRecordFluashTimer .C :
1415- flushRunawayRecords ()
1416- fired = true
1417- case r := <- quarantineRecordCh :
1418- quarantineRecords = append (quarantineRecords , r )
1419- // we expect quarantine record should not be triggered very often, so always
1420- // flush as soon as possible.
1421- if len (quarantineRecordCh ) == 0 || len (quarantineRecords ) >= flushThrehold {
1422- flushQuarantineRecords ()
1423- }
1424- case r := <- recordCh :
1425- records = append (records , r )
1426- failpoint .Inject ("FastRunawayGC" , func () {
1427- flushRunawayRecords ()
1428- })
1429- if len (records ) >= flushThrehold {
1430- flushRunawayRecords ()
1431- } else if fired {
1432- fired = false
1433- // meet a new record, reset the timer.
1434- runawayRecordFluashTimer .Reset (runawayRecordFluashInterval )
1435- }
1436- case <- runawayRecordGCTicker .C :
1437- go do .deleteExpiredRows ("tidb_runaway_queries" , "time" , runawayRecordExpiredDuration )
1438- case <- quarantineRecordGCTicker .C :
1439- go do .deleteExpiredRows ("tidb_runaway_quarantined_watch" , "end_time" , time .Duration (0 ))
1440- }
1441- }
1442- }
1443-
1444- func (do * Domain ) execRestrictedSQL (sql string , params []interface {}) ([]chunk.Row , error ) {
1445- se , err := do .sysSessionPool .Get ()
1446- defer func () {
1447- do .sysSessionPool .Put (se )
1448- }()
1449- if err != nil {
1450- return nil , errors .Annotate (err , "get session failed" )
1451- }
1452- exec := se .(sqlexec.RestrictedSQLExecutor )
1453- ctx := kv .WithInternalSourceType (context .Background (), kv .InternalTxnStats )
1454- r , _ , err := exec .ExecRestrictedSQL (ctx , []sqlexec.OptionFuncAlias {sqlexec .ExecOptionUseCurSession },
1455- sql , params ... ,
1456- )
1457- return r , err
1458- }
1459-
1460- func genRunawayQueriesStmt (records []* resourcegroup.RunawayRecord ) (string , []interface {}) {
1461- var builder strings.Builder
1462- params := make ([]interface {}, 0 , len (records )* 7 )
1463- builder .WriteString ("insert into mysql.tidb_runaway_queries VALUES " )
1464- for count , r := range records {
1465- if count > 0 {
1466- builder .WriteByte (',' )
1467- }
1468- builder .WriteString ("(%?, %?, %?, %?, %?, %?, %?)" )
1469- params = append (params , r .ResourceGroupName )
1470- params = append (params , r .Time )
1471- params = append (params , r .Match )
1472- params = append (params , r .Action )
1473- params = append (params , r .SQLText )
1474- params = append (params , r .PlanDigest )
1475- params = append (params , r .From )
1476- }
1477- return builder .String (), params
1478- }
1479-
1480- func genQuarantineQueriesStmt (records []* resourcegroup.QuarantineRecord ) (string , []interface {}) {
1481- var builder strings.Builder
1482- params := make ([]interface {}, 0 , len (records )* 7 )
1483- builder .WriteString ("insert into mysql.tidb_runaway_quarantined_watch VALUES " )
1484- for count , r := range records {
1485- if count > 0 {
1486- builder .WriteByte (',' )
1487- }
1488- builder .WriteString ("(%?, %?, %?, %?, %?, %?)" )
1489- params = append (params , r .ResourceGroupName )
1490- params = append (params , r .StartTime )
1491- params = append (params , r .EndTime )
1492- params = append (params , r .Watch )
1493- params = append (params , r .WatchText )
1494- params = append (params , r .Source )
1495- }
1496- return builder .String (), params
1497- }
1498-
1499- func (do * Domain ) initResourceGroupsController (ctx context.Context , pdClient pd.Client ) error {
1500- if pdClient == nil {
1501- logutil .BgLogger ().Warn ("cannot setup up resource controller, not using tikv storage" )
1502- // return nil as unistore doesn't support it
1503- return nil
1504- }
1505-
1506- control , err := rmclient .NewResourceGroupController (ctx , do .ServerID (), pdClient , nil , rmclient .WithMaxWaitDuration (resourcegroup .MaxWaitDuration ))
1507- if err != nil {
1508- return err
1509- }
1510- control .Start (ctx )
1511- serverInfo , err := infosync .GetServerInfo ()
1512- if err != nil {
1513- return err
1514- }
1515- serverAddr := net .JoinHostPort (serverInfo .IP , strconv .Itoa (int (serverInfo .Port )))
1516- do .runawayManager = resourcegroup .NewRunawayManager (control , serverAddr )
1517- do .resourceGroupsController = control
1518- tikv .SetResourceControlInterceptor (control )
1519- return nil
1520- }
1521-
15221271func (do * Domain ) initLogBackup (ctx context.Context , pdClient pd.Client ) error {
15231272 cfg := config .GetGlobalConfig ()
15241273 if pdClient == nil || do .etcdClient == nil {
0 commit comments