Skip to content

Commit 77d6f5b

Browse files
ti-chi-botHuSharpti-chi-bot[bot]
authored
scheduler: fix scheduler save config (#7108) (#7162)
close #6897 Signed-off-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent d499de1 commit 77d6f5b

File tree

6 files changed

+193
-9
lines changed

6 files changed

+193
-9
lines changed

server/cluster/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string)
647647
c.wg.Add(1)
648648
go c.runScheduler(s)
649649
c.schedulers[s.GetName()] = s
650+
if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil {
651+
log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err))
652+
return err
653+
}
650654
c.cluster.opt.AddSchedulerCfg(s.GetType(), args)
651655
return nil
652656
}

server/cluster/coordinator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,8 +826,9 @@ func TestPersistScheduler(t *testing.T) {
826826
// whether the schedulers added or removed in dynamic way are recorded in opt
827827
_, newOpt, err := newTestScheduleConfig()
828828
re.NoError(err)
829-
_, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
829+
shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
830830
re.NoError(err)
831+
re.NoError(co.addScheduler(shuffle))
831832
// suppose we add a new default enable scheduler
832833
config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"})
833834
defer func() {

server/schedule/scheduler.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage endpo
118118
if !ok {
119119
return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ)
120120
}
121+
return fn(opController, storage, dec)
122+
}
121123

122-
s, err := fn(opController, storage, dec)
123-
if err != nil {
124-
return nil, err
125-
}
124+
// SaveSchedulerConfig saves the config of the specified scheduler.
125+
func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error {
126126
data, err := s.EncodeConfig()
127127
if err != nil {
128-
return nil, err
128+
return err
129129
}
130-
err = storage.SaveScheduleConfig(s.GetName(), data)
131-
return s, err
130+
return storage.SaveScheduleConfig(s.GetName(), data)
132131
}
133132

134133
// FindSchedulerTypeByName finds the type of the specified name.

server/schedulers/evict_leader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev
205205
}
206206
}
207207

208-
// EvictStores returns the IDs of the evict-stores.
208+
// EvictStoreIDs returns the IDs of the evict-stores.
209209
func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 {
210210
return s.conf.getStores()
211211
}

tests/server/api/testutil.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2023 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package api
16+
17+
import (
18+
"bytes"
19+
"encoding/json"
20+
"fmt"
21+
"io"
22+
"net/http"
23+
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
const schedulersPrefix = "/pd/api/v1/schedulers"
28+
29+
// dialClient used to dial http request.
30+
var dialClient = &http.Client{
31+
Transport: &http.Transport{
32+
DisableKeepAlives: true,
33+
},
34+
}
35+
36+
// MustAddScheduler adds a scheduler with HTTP API.
37+
func MustAddScheduler(
38+
re *require.Assertions, serverAddr string,
39+
schedulerName string, args map[string]interface{},
40+
) {
41+
request := map[string]interface{}{
42+
"name": schedulerName,
43+
}
44+
for arg, val := range args {
45+
request[arg] = val
46+
}
47+
data, err := json.Marshal(request)
48+
re.NoError(err)
49+
httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data))
50+
re.NoError(err)
51+
// Send request.
52+
resp, err := dialClient.Do(httpReq)
53+
re.NoError(err)
54+
defer resp.Body.Close()
55+
data, err = io.ReadAll(resp.Body)
56+
re.NoError(err)
57+
re.Equal(http.StatusOK, resp.StatusCode, string(data))
58+
}

tests/server/cluster/cluster_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ import (
4343
"github.com/tikv/pd/server/id"
4444
syncer "github.com/tikv/pd/server/region_syncer"
4545
"github.com/tikv/pd/server/schedule/operator"
46+
"github.com/tikv/pd/server/schedulers"
4647
"github.com/tikv/pd/server/storage"
4748
"github.com/tikv/pd/server/tso"
4849
"github.com/tikv/pd/tests"
50+
"github.com/tikv/pd/tests/server/api"
4951
"google.golang.org/grpc/codes"
5052
"google.golang.org/grpc/status"
5153
)
@@ -1280,6 +1282,126 @@ func TestStaleTermHeartbeat(t *testing.T) {
12801282
re.NoError(err)
12811283
}
12821284

1285+
func TestTransferLeaderForScheduler(t *testing.T) {
1286+
re := require.New(t)
1287+
ctx, cancel := context.WithCancel(context.Background())
1288+
defer cancel()
1289+
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`))
1290+
tc, err := tests.NewTestCluster(ctx, 2)
1291+
defer tc.Destroy()
1292+
re.NoError(err)
1293+
err = tc.RunInitialServers()
1294+
re.NoError(err)
1295+
tc.WaitLeader()
1296+
// start
1297+
leaderServer := tc.GetServer(tc.GetLeader())
1298+
re.NoError(leaderServer.BootstrapCluster())
1299+
rc := leaderServer.GetServer().GetRaftCluster()
1300+
re.NotNil(rc)
1301+
1302+
storesNum := 2
1303+
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
1304+
for i := 1; i <= storesNum; i++ {
1305+
store := &metapb.Store{
1306+
Id: uint64(i),
1307+
Address: "127.0.0.1:" + strconv.Itoa(i),
1308+
}
1309+
resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store)
1310+
re.NoError(err)
1311+
re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())
1312+
}
1313+
// region heartbeat
1314+
id := leaderServer.GetAllocator()
1315+
putRegionWithLeader(re, rc, id, 1)
1316+
1317+
time.Sleep(time.Second)
1318+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1319+
// Add evict leader scheduler
1320+
api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
1321+
"store_id": 1,
1322+
})
1323+
api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
1324+
"store_id": 2,
1325+
})
1326+
// Check scheduler updated.
1327+
re.Len(rc.GetSchedulers(), 5)
1328+
checkEvictLeaderSchedulerExist(re, rc, true)
1329+
checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2})
1330+
1331+
// transfer PD leader to another PD
1332+
tc.ResignLeader()
1333+
rc.Stop()
1334+
tc.WaitLeader()
1335+
leaderServer = tc.GetServer(tc.GetLeader())
1336+
rc1 := leaderServer.GetServer().GetRaftCluster()
1337+
rc1.Start(leaderServer.GetServer())
1338+
re.NoError(err)
1339+
re.NotNil(rc1)
1340+
// region heartbeat
1341+
id = leaderServer.GetAllocator()
1342+
putRegionWithLeader(re, rc1, id, 1)
1343+
time.Sleep(time.Second)
1344+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1345+
// Check scheduler updated.
1346+
re.Len(rc1.GetSchedulers(), 5)
1347+
checkEvictLeaderSchedulerExist(re, rc, true)
1348+
checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2})
1349+
1350+
// transfer PD leader back to the previous PD
1351+
tc.ResignLeader()
1352+
rc1.Stop()
1353+
tc.WaitLeader()
1354+
leaderServer = tc.GetServer(tc.GetLeader())
1355+
rc = leaderServer.GetServer().GetRaftCluster()
1356+
rc.Start(leaderServer.GetServer())
1357+
re.NotNil(rc)
1358+
// region heartbeat
1359+
id = leaderServer.GetAllocator()
1360+
putRegionWithLeader(re, rc, id, 1)
1361+
time.Sleep(time.Second)
1362+
re.True(leaderServer.GetRaftCluster().IsPrepared())
1363+
// Check scheduler updated
1364+
re.Len(rc.GetSchedulers(), 5)
1365+
checkEvictLeaderSchedulerExist(re, rc, true)
1366+
checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2})
1367+
1368+
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"))
1369+
}
1370+
1371+
func checkEvictLeaderSchedulerExist(re *require.Assertions, rc *cluster.RaftCluster, exist bool) {
1372+
isExistScheduler := func(rc *cluster.RaftCluster, name string) bool {
1373+
s := rc.GetSchedulers()
1374+
for _, scheduler := range s {
1375+
if scheduler == name {
1376+
return true
1377+
}
1378+
}
1379+
return false
1380+
}
1381+
1382+
testutil.Eventually(re, func() bool {
1383+
if !exist {
1384+
return !isExistScheduler(rc, schedulers.EvictLeaderName)
1385+
}
1386+
return isExistScheduler(rc, schedulers.EvictLeaderName)
1387+
})
1388+
}
1389+
1390+
func checkEvictLeaderStoreIDs(re *require.Assertions, rc *cluster.RaftCluster, expected []uint64) {
1391+
handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
1392+
re.True(ok)
1393+
h, ok := handler.(interface {
1394+
EvictStoreIDs() []uint64
1395+
})
1396+
re.True(ok)
1397+
var evictStoreIDs []uint64
1398+
testutil.Eventually(re, func() bool {
1399+
evictStoreIDs = h.EvictStoreIDs()
1400+
return len(evictStoreIDs) == len(expected)
1401+
})
1402+
re.ElementsMatch(evictStoreIDs, expected)
1403+
}
1404+
12831405
func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) {
12841406
for i := 0; i < 3; i++ {
12851407
regionID, err := id.Alloc()

0 commit comments

Comments
 (0)