Skip to content

Commit 6752974

Browse files
scheduler: fix scheduler save config (#7108)
close #6897 Signed-off-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 8e844d8 commit 6752974

File tree

5 files changed

+133
-9
lines changed

5 files changed

+133
-9
lines changed

pkg/schedule/schedulers/evict_leader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade
181181
}
182182
}
183183

184-
// EvictStores returns the IDs of the evict-stores.
184+
// EvictStoreIDs returns the IDs of the evict-stores.
185185
func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 {
186186
return s.conf.getStores()
187187
}

pkg/schedule/schedulers/scheduler.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,16 +124,16 @@ func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.Confi
124124
return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ)
125125
}
126126

127-
s, err := fn(oc, storage, dec, removeSchedulerCb...)
128-
if err != nil {
129-
return nil, err
130-
}
127+
return fn(oc, storage, dec, removeSchedulerCb...)
128+
}
129+
130+
// SaveSchedulerConfig saves the config of the specified scheduler.
131+
func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error {
131132
data, err := s.EncodeConfig()
132133
if err != nil {
133-
return nil, err
134+
return err
134135
}
135-
err = storage.SaveSchedulerConfig(s.GetName(), data)
136-
return s, err
136+
return storage.SaveSchedulerConfig(s.GetName(), data)
137137
}
138138

139139
// FindSchedulerTypeByName finds the type of the specified name.

pkg/schedule/schedulers/scheduler_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er
138138
}
139139

140140
c.schedulerHandlers[name] = scheduler
141+
if err := SaveSchedulerConfig(c.storage, scheduler); err != nil {
142+
log.Error("can not save HTTP scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err))
143+
return err
144+
}
141145
c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args)
142146
return nil
143147
}
@@ -188,6 +192,10 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
188192
c.wg.Add(1)
189193
go c.runScheduler(s)
190194
c.schedulers[s.Scheduler.GetName()] = s
195+
if err := SaveSchedulerConfig(c.storage, scheduler); err != nil {
196+
log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err))
197+
return err
198+
}
191199
c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args)
192200
return nil
193201
}

server/cluster/cluster_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3111,8 +3111,9 @@ func TestPersistScheduler(t *testing.T) {
31113111
// whether the schedulers added or removed in dynamic way are recorded in opt
31123112
_, newOpt, err := newTestScheduleConfig()
31133113
re.NoError(err)
3114-
_, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null")))
3114+
shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null")))
31153115
re.NoError(err)
3116+
re.NoError(controller.AddScheduler(shuffle))
31163117
// suppose we add a new default enable scheduler
31173118
sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"})
31183119
defer func() {

tests/server/cluster/cluster_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/tikv/pd/pkg/mock/mockid"
3838
sc "github.com/tikv/pd/pkg/schedule/config"
3939
"github.com/tikv/pd/pkg/schedule/operator"
40+
"github.com/tikv/pd/pkg/schedule/schedulers"
4041
"github.com/tikv/pd/pkg/storage"
4142
"github.com/tikv/pd/pkg/syncer"
4243
"github.com/tikv/pd/pkg/tso"
@@ -47,6 +48,7 @@ import (
4748
"github.com/tikv/pd/server/cluster"
4849
"github.com/tikv/pd/server/config"
4950
"github.com/tikv/pd/tests"
51+
"github.com/tikv/pd/tests/server/api"
5052
"google.golang.org/grpc/codes"
5153
"google.golang.org/grpc/status"
5254
)
@@ -1275,6 +1277,119 @@ func TestStaleTermHeartbeat(t *testing.T) {
12751277
re.NoError(err)
12761278
}
12771279

1280+
func TestTransferLeaderForScheduler(t *testing.T) {
1281+
re := require.New(t)
1282+
ctx, cancel := context.WithCancel(context.Background())
1283+
defer cancel()
1284+
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
1285+
tc, err := tests.NewTestCluster(ctx, 2)
1286+
defer tc.Destroy()
1287+
re.NoError(err)
1288+
err = tc.RunInitialServers()
1289+
re.NoError(err)
1290+
tc.WaitLeader()
1291+
// start
1292+
leaderServer := tc.GetServer(tc.GetLeader())
1293+
re.NoError(leaderServer.BootstrapCluster())
1294+
rc := leaderServer.GetServer().GetRaftCluster()
1295+
re.NotNil(rc)
1296+
1297+
storesNum := 2
1298+
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
1299+
for i := 1; i <= storesNum; i++ {
1300+
store := &metapb.Store{
1301+
Id: uint64(i),
1302+
Address: "127.0.0.1:" + strconv.Itoa(i),
1303+
}
1304+
resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store)
1305+
re.NoError(err)
1306+
re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())
1307+
}
1308+
// region heartbeat
1309+
id := leaderServer.GetAllocator()
1310+
putRegionWithLeader(re, rc, id, 1)
1311+
1312+
time.Sleep(time.Second)
1313+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1314+
// Add evict leader scheduler
1315+
api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
1316+
"store_id": 1,
1317+
})
1318+
api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
1319+
"store_id": 2,
1320+
})
1321+
// Check scheduler updated.
1322+
schedulersController := rc.GetCoordinator().GetSchedulersController()
1323+
re.Len(schedulersController.GetSchedulerNames(), 6)
1324+
checkEvictLeaderSchedulerExist(re, schedulersController, true)
1325+
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
1326+
1327+
// transfer PD leader to another PD
1328+
tc.ResignLeader()
1329+
rc.Stop()
1330+
tc.WaitLeader()
1331+
leaderServer = tc.GetServer(tc.GetLeader())
1332+
rc1 := leaderServer.GetServer().GetRaftCluster()
1333+
rc1.Start(leaderServer.GetServer())
1334+
re.NoError(err)
1335+
re.NotNil(rc1)
1336+
// region heartbeat
1337+
id = leaderServer.GetAllocator()
1338+
putRegionWithLeader(re, rc1, id, 1)
1339+
time.Sleep(time.Second)
1340+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1341+
// Check scheduler updated.
1342+
schedulersController = rc1.GetCoordinator().GetSchedulersController()
1343+
re.Len(schedulersController.GetSchedulerNames(), 6)
1344+
checkEvictLeaderSchedulerExist(re, schedulersController, true)
1345+
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
1346+
1347+
// transfer PD leader back to the previous PD
1348+
tc.ResignLeader()
1349+
rc1.Stop()
1350+
tc.WaitLeader()
1351+
leaderServer = tc.GetServer(tc.GetLeader())
1352+
rc = leaderServer.GetServer().GetRaftCluster()
1353+
rc.Start(leaderServer.GetServer())
1354+
re.NotNil(rc)
1355+
// region heartbeat
1356+
id = leaderServer.GetAllocator()
1357+
putRegionWithLeader(re, rc, id, 1)
1358+
time.Sleep(time.Second)
1359+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1360+
// Check scheduler updated
1361+
schedulersController = rc.GetCoordinator().GetSchedulersController()
1362+
re.Len(schedulersController.GetSchedulerNames(), 6)
1363+
checkEvictLeaderSchedulerExist(re, schedulersController, true)
1364+
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
1365+
1366+
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
1367+
}
1368+
1369+
func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {
1370+
testutil.Eventually(re, func() bool {
1371+
if !exist {
1372+
return sc.GetScheduler(schedulers.EvictLeaderName) == nil
1373+
}
1374+
return sc.GetScheduler(schedulers.EvictLeaderName) != nil
1375+
})
1376+
}
1377+
1378+
func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) {
1379+
handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
1380+
re.True(ok)
1381+
h, ok := handler.(interface {
1382+
EvictStoreIDs() []uint64
1383+
})
1384+
re.True(ok)
1385+
var evictStoreIDs []uint64
1386+
testutil.Eventually(re, func() bool {
1387+
evictStoreIDs = h.EvictStoreIDs()
1388+
return len(evictStoreIDs) == len(expected)
1389+
})
1390+
re.ElementsMatch(evictStoreIDs, expected)
1391+
}
1392+
12781393
func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) {
12791394
for i := 0; i < 3; i++ {
12801395
regionID, err := id.Alloc()

0 commit comments

Comments
 (0)