@@ -43,9 +43,11 @@ import (
4343 "github.com/tikv/pd/server/id"
4444 syncer "github.com/tikv/pd/server/region_syncer"
4545 "github.com/tikv/pd/server/schedule/operator"
46+ "github.com/tikv/pd/server/schedulers"
4647 "github.com/tikv/pd/server/storage"
4748 "github.com/tikv/pd/server/tso"
4849 "github.com/tikv/pd/tests"
50+ "github.com/tikv/pd/tests/server/api"
4951 "google.golang.org/grpc/codes"
5052 "google.golang.org/grpc/status"
5153)
@@ -1280,6 +1282,126 @@ func TestStaleTermHeartbeat(t *testing.T) {
12801282 re .NoError (err )
12811283}
12821284
1285+ func TestTransferLeaderForScheduler (t * testing.T ) {
1286+ re := require .New (t )
1287+ ctx , cancel := context .WithCancel (context .Background ())
1288+ defer cancel ()
1289+ re .NoError (failpoint .Enable ("github.com/tikv/pd/server/cluster/changeCoordinatorTicker" , `return(true)` ))
1290+ tc , err := tests .NewTestCluster (ctx , 2 )
1291+ defer tc .Destroy ()
1292+ re .NoError (err )
1293+ err = tc .RunInitialServers ()
1294+ re .NoError (err )
1295+ tc .WaitLeader ()
1296+ // start
1297+ leaderServer := tc .GetServer (tc .GetLeader ())
1298+ re .NoError (leaderServer .BootstrapCluster ())
1299+ rc := leaderServer .GetServer ().GetRaftCluster ()
1300+ re .NotNil (rc )
1301+
1302+ storesNum := 2
1303+ grpcPDClient := testutil .MustNewGrpcClient (re , leaderServer .GetAddr ())
1304+ for i := 1 ; i <= storesNum ; i ++ {
1305+ store := & metapb.Store {
1306+ Id : uint64 (i ),
1307+ Address : "127.0.0.1:" + strconv .Itoa (i ),
1308+ }
1309+ resp , err := putStore (grpcPDClient , leaderServer .GetClusterID (), store )
1310+ re .NoError (err )
1311+ re .Equal (pdpb .ErrorType_OK , resp .GetHeader ().GetError ().GetType ())
1312+ }
1313+ // region heartbeat
1314+ id := leaderServer .GetAllocator ()
1315+ putRegionWithLeader (re , rc , id , 1 )
1316+
1317+ time .Sleep (time .Second )
1318+ re .True (leaderServer .GetRaftCluster ().IsPrepared ())
1319+ // Add evict leader scheduler
1320+ api .MustAddScheduler (re , leaderServer .GetAddr (), schedulers .EvictLeaderName , map [string ]interface {}{
1321+ "store_id" : 1 ,
1322+ })
1323+ api .MustAddScheduler (re , leaderServer .GetAddr (), schedulers .EvictLeaderName , map [string ]interface {}{
1324+ "store_id" : 2 ,
1325+ })
1326+ // Check scheduler updated.
1327+ re .Len (rc .GetSchedulers (), 6 )
1328+ checkEvictLeaderSchedulerExist (re , rc , true )
1329+ checkEvictLeaderStoreIDs (re , rc , []uint64 {1 , 2 })
1330+
1331+ // transfer PD leader to another PD
1332+ tc .ResignLeader ()
1333+ rc .Stop ()
1334+ tc .WaitLeader ()
1335+ leaderServer = tc .GetServer (tc .GetLeader ())
1336+ rc1 := leaderServer .GetServer ().GetRaftCluster ()
1337+ rc1 .Start (leaderServer .GetServer ())
1338+ re .NoError (err )
1339+ re .NotNil (rc1 )
1340+ // region heartbeat
1341+ id = leaderServer .GetAllocator ()
1342+ putRegionWithLeader (re , rc1 , id , 1 )
1343+ time .Sleep (time .Second )
1344+ re .True (leaderServer .GetRaftCluster ().IsPrepared ())
1345+ // Check scheduler updated.
1346+ re .Len (rc1 .GetSchedulers (), 6 )
1347+ checkEvictLeaderSchedulerExist (re , rc , true )
1348+ checkEvictLeaderStoreIDs (re , rc , []uint64 {1 , 2 })
1349+
1350+ // transfer PD leader back to the previous PD
1351+ tc .ResignLeader ()
1352+ rc1 .Stop ()
1353+ tc .WaitLeader ()
1354+ leaderServer = tc .GetServer (tc .GetLeader ())
1355+ rc = leaderServer .GetServer ().GetRaftCluster ()
1356+ rc .Start (leaderServer .GetServer ())
1357+ re .NotNil (rc )
1358+ // region heartbeat
1359+ id = leaderServer .GetAllocator ()
1360+ putRegionWithLeader (re , rc , id , 1 )
1361+ time .Sleep (time .Second )
1362+ re .True (leaderServer .GetRaftCluster ().IsPrepared ())
1363+ // Check scheduler updated
1364+ re .Len (rc .GetSchedulers (), 6 )
1365+ checkEvictLeaderSchedulerExist (re , rc , true )
1366+ checkEvictLeaderStoreIDs (re , rc , []uint64 {1 , 2 })
1367+
1368+ re .NoError (failpoint .Disable ("github.com/tikv/pd/server/cluster/changeCoordinatorTicker" ))
1369+ }
1370+
1371+ func checkEvictLeaderSchedulerExist (re * require.Assertions , rc * cluster.RaftCluster , exist bool ) {
1372+ isExistScheduler := func (rc * cluster.RaftCluster , name string ) bool {
1373+ s := rc .GetSchedulers ()
1374+ for _ , scheduler := range s {
1375+ if scheduler == name {
1376+ return true
1377+ }
1378+ }
1379+ return false
1380+ }
1381+
1382+ testutil .Eventually (re , func () bool {
1383+ if ! exist {
1384+ return ! isExistScheduler (rc , schedulers .EvictLeaderName )
1385+ }
1386+ return isExistScheduler (rc , schedulers .EvictLeaderName )
1387+ })
1388+ }
1389+
1390+ func checkEvictLeaderStoreIDs (re * require.Assertions , rc * cluster.RaftCluster , expected []uint64 ) {
1391+ handler , ok := rc .GetSchedulerHandlers ()[schedulers .EvictLeaderName ]
1392+ re .True (ok )
1393+ h , ok := handler .(interface {
1394+ EvictStoreIDs () []uint64
1395+ })
1396+ re .True (ok )
1397+ var evictStoreIDs []uint64
1398+ testutil .Eventually (re , func () bool {
1399+ evictStoreIDs = h .EvictStoreIDs ()
1400+ return len (evictStoreIDs ) == len (expected )
1401+ })
1402+ re .ElementsMatch (evictStoreIDs , expected )
1403+ }
1404+
12831405func putRegionWithLeader (re * require.Assertions , rc * cluster.RaftCluster , id id.Allocator , storeID uint64 ) {
12841406 for i := 0 ; i < 3 ; i ++ {
12851407 regionID , err := id .Alloc ()
0 commit comments