Skip to content

Commit fd062fc

Browse files
okJianglhy1024
andauthored
[cp-v7.5.6] schedulers: add reload config function (#9871)
ref #9866 Signed-off-by: lhy1024 <admin@liudos.us> Signed-off-by: okjiang <819421878@qq.com> Co-authored-by: lhy1024 <admin@liudos.us>
1 parent a3685c3 commit fd062fc

29 files changed

+570
-215
lines changed

pkg/schedule/schedulers/balance_leader.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ var (
6767
)
6868

6969
type balanceLeaderSchedulerConfig struct {
70-
mu syncutil.RWMutex
70+
syncutil.RWMutex
7171
storage endpoint.ConfigStorage
7272
Ranges []core.KeyRange `json:"ranges"`
7373
// Batch is used to generate multiple operators by one scheduling
7474
Batch int `json:"batch"`
7575
}
7676

7777
func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) {
78-
conf.mu.Lock()
79-
defer conf.mu.Unlock()
78+
conf.Lock()
79+
defer conf.Unlock()
8080

8181
oldc, _ := json.Marshal(conf)
8282

@@ -109,8 +109,8 @@ func (conf *balanceLeaderSchedulerConfig) validate() bool {
109109
}
110110

111111
func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig {
112-
conf.mu.RLock()
113-
defer conf.mu.RUnlock()
112+
conf.RLock()
113+
defer conf.RUnlock()
114114
ranges := make([]core.KeyRange, len(conf.Ranges))
115115
copy(ranges, conf.Ranges)
116116
return &balanceLeaderSchedulerConfig{
@@ -217,14 +217,14 @@ func (l *balanceLeaderScheduler) GetType() string {
217217
}
218218

219219
func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
220-
l.conf.mu.RLock()
221-
defer l.conf.mu.RUnlock()
220+
l.conf.RLock()
221+
defer l.conf.RUnlock()
222222
return EncodeConfig(l.conf)
223223
}
224224

225225
func (l *balanceLeaderScheduler) ReloadConfig() error {
226-
l.conf.mu.Lock()
227-
defer l.conf.mu.Unlock()
226+
l.conf.Lock()
227+
defer l.conf.Unlock()
228228
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
229229
if err != nil {
230230
return err
@@ -342,8 +342,8 @@ func (cs *candidateStores) resortStoreWithPos(pos int) {
342342
}
343343

344344
func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
345-
l.conf.mu.RLock()
346-
defer l.conf.mu.RUnlock()
345+
l.conf.RLock()
346+
defer l.conf.RUnlock()
347347
basePlan := plan.NewBalanceSchedulerPlan()
348348
var collector *plan.Collector
349349
if dryRun {

pkg/schedule/schedulers/balance_region.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ var (
5151
type balanceRegionSchedulerConfig struct {
5252
Name string `json:"name"`
5353
Ranges []core.KeyRange `json:"ranges"`
54+
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
5455
}
5556

5657
type balanceRegionScheduler struct {

pkg/schedule/schedulers/balance_witness.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,16 @@ const (
5353
)
5454

5555
type balanceWitnessSchedulerConfig struct {
56-
mu syncutil.RWMutex
56+
syncutil.RWMutex
5757
storage endpoint.ConfigStorage
5858
Ranges []core.KeyRange `json:"ranges"`
5959
// Batch is used to generate multiple operators by one scheduling
6060
Batch int `json:"batch"`
6161
}
6262

6363
func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}) {
64-
conf.mu.Lock()
65-
defer conf.mu.Unlock()
64+
conf.Lock()
65+
defer conf.Unlock()
6666

6767
oldc, _ := json.Marshal(conf)
6868

@@ -95,8 +95,8 @@ func (conf *balanceWitnessSchedulerConfig) validate() bool {
9595
}
9696

9797
func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig {
98-
conf.mu.RLock()
99-
defer conf.mu.RUnlock()
98+
conf.RLock()
99+
defer conf.RUnlock()
100100
ranges := make([]core.KeyRange, len(conf.Ranges))
101101
copy(ranges, conf.Ranges)
102102
return &balanceWitnessSchedulerConfig{
@@ -205,14 +205,14 @@ func (b *balanceWitnessScheduler) GetType() string {
205205
}
206206

207207
func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
208-
b.conf.mu.RLock()
209-
defer b.conf.mu.RUnlock()
208+
b.conf.RLock()
209+
defer b.conf.RUnlock()
210210
return EncodeConfig(b.conf)
211211
}
212212

213213
func (b *balanceWitnessScheduler) ReloadConfig() error {
214-
b.conf.mu.Lock()
215-
defer b.conf.mu.Unlock()
214+
b.conf.Lock()
215+
defer b.conf.Unlock()
216216
cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName())
217217
if err != nil {
218218
return err
@@ -238,8 +238,8 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste
238238
}
239239

240240
func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
241-
b.conf.mu.RLock()
242-
defer b.conf.mu.RUnlock()
241+
b.conf.RLock()
242+
defer b.conf.RUnlock()
243243
basePlan := plan.NewBalanceSchedulerPlan()
244244
var collector *plan.Collector
245245
if dryRun {

pkg/schedule/schedulers/base_scheduler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
9494
return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth)
9595
}
9696

97-
// Prepare does some prepare work
98-
func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil }
97+
// PrepareConfig does some prepare work
98+
func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil }
9999

100-
// Cleanup does some cleanup work
101-
func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {}
100+
// CleanConfig does some cleanup work
101+
func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {}

pkg/schedule/schedulers/evict_leader.go

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ var (
5757
)
5858

5959
type evictLeaderSchedulerConfig struct {
60-
mu syncutil.RWMutex
60+
syncutil.RWMutex
6161
storage endpoint.ConfigStorage
6262
StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
6363
cluster *core.BasicCluster
6464
removeSchedulerCb func(string) error
6565
}
6666

6767
func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
68-
conf.mu.RLock()
69-
defer conf.mu.RUnlock()
68+
conf.RLock()
69+
defer conf.RUnlock()
7070
stores := make([]uint64, 0, len(conf.StoreIDWithRanges))
7171
for storeID := range conf.StoreIDWithRanges {
7272
stores = append(stores, storeID)
@@ -90,15 +90,15 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
9090
if err != nil {
9191
return err
9292
}
93-
conf.mu.Lock()
94-
defer conf.mu.Unlock()
93+
conf.Lock()
94+
defer conf.Unlock()
9595
conf.StoreIDWithRanges[id] = ranges
9696
return nil
9797
}
9898

9999
func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
100-
conf.mu.RLock()
101-
defer conf.mu.RUnlock()
100+
conf.RLock()
101+
defer conf.RUnlock()
102102
storeIDWithRanges := make(map[uint64][]core.KeyRange)
103103
for id, ranges := range conf.StoreIDWithRanges {
104104
storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...)
@@ -110,8 +110,8 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
110110

111111
func (conf *evictLeaderSchedulerConfig) Persist() error {
112112
name := conf.getSchedulerName()
113-
conf.mu.RLock()
114-
defer conf.mu.RUnlock()
113+
conf.RLock()
114+
defer conf.RUnlock()
115115
data, err := EncodeConfig(conf)
116116
failpoint.Inject("persistFail", func() {
117117
err = errors.New("fail to persist")
@@ -127,8 +127,8 @@ func (conf *evictLeaderSchedulerConfig) getSchedulerName() string {
127127
}
128128

129129
func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
130-
conf.mu.RLock()
131-
defer conf.mu.RUnlock()
130+
conf.RLock()
131+
defer conf.RUnlock()
132132
ranges := conf.StoreIDWithRanges[id]
133133
res := make([]string, 0, len(ranges)*2)
134134
for index := range ranges {
@@ -138,8 +138,8 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
138138
}
139139

140140
func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) {
141-
conf.mu.Lock()
142-
defer conf.mu.Unlock()
141+
conf.Lock()
142+
defer conf.Unlock()
143143
_, exists := conf.StoreIDWithRanges[id]
144144
succ, last = false, false
145145
if exists {
@@ -152,15 +152,15 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
152152
}
153153

154154
func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
155-
conf.mu.Lock()
156-
defer conf.mu.Unlock()
155+
conf.Lock()
156+
defer conf.Unlock()
157157
conf.cluster.PauseLeaderTransfer(id)
158158
conf.StoreIDWithRanges[id] = keyRange
159159
}
160160

161161
func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
162-
conf.mu.RLock()
163-
defer conf.mu.RUnlock()
162+
conf.RLock()
163+
defer conf.RUnlock()
164164
if ranges, exist := conf.StoreIDWithRanges[id]; exist {
165165
return ranges
166166
}
@@ -203,14 +203,14 @@ func (s *evictLeaderScheduler) GetType() string {
203203
}
204204

205205
func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
206-
s.conf.mu.RLock()
207-
defer s.conf.mu.RUnlock()
206+
s.conf.RLock()
207+
defer s.conf.RUnlock()
208208
return EncodeConfig(s.conf)
209209
}
210210

211211
func (s *evictLeaderScheduler) ReloadConfig() error {
212-
s.conf.mu.Lock()
213-
defer s.conf.mu.Unlock()
212+
s.conf.Lock()
213+
defer s.conf.Unlock()
214214
cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName())
215215
if err != nil {
216216
return err
@@ -227,25 +227,9 @@ func (s *evictLeaderScheduler) ReloadConfig() error {
227227
return nil
228228
}
229229

230-
// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer.
231-
func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) {
232-
for id := range old {
233-
if _, ok := new[id]; ok {
234-
continue
235-
}
236-
cluster.ResumeLeaderTransfer(id)
237-
}
238-
for id := range new {
239-
if _, ok := old[id]; ok {
240-
continue
241-
}
242-
cluster.PauseLeaderTransfer(id)
243-
}
244-
}
245-
246-
func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
247-
s.conf.mu.RLock()
248-
defer s.conf.mu.RUnlock()
230+
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
231+
s.conf.RLock()
232+
defer s.conf.RUnlock()
249233
var res error
250234
for id := range s.conf.StoreIDWithRanges {
251235
if err := cluster.PauseLeaderTransfer(id); err != nil {
@@ -255,9 +239,9 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
255239
return res
256240
}
257241

258-
func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
259-
s.conf.mu.RLock()
260-
defer s.conf.mu.RUnlock()
242+
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
243+
s.conf.RLock()
244+
defer s.conf.RUnlock()
261245
for id := range s.conf.StoreIDWithRanges {
262246
cluster.ResumeLeaderTransfer(id)
263247
}
@@ -386,15 +370,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
386370
idFloat, ok := input["store_id"].(float64)
387371
if ok {
388372
id = (uint64)(idFloat)
389-
handler.config.mu.RLock()
373+
handler.config.RLock()
390374
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
391375
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
392-
handler.config.mu.RUnlock()
376+
handler.config.RUnlock()
393377
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
394378
return
395379
}
396380
}
397-
handler.config.mu.RUnlock()
381+
handler.config.RUnlock()
398382
args = append(args, strconv.FormatUint(id, 10))
399383
}
400384

@@ -407,9 +391,9 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
407391

408392
err := handler.config.BuildWithArgs(args)
409393
if err != nil {
410-
handler.config.mu.Lock()
394+
handler.config.Lock()
411395
handler.config.cluster.ResumeLeaderTransfer(id)
412-
handler.config.mu.Unlock()
396+
handler.config.Unlock()
413397
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
414398
return
415399
}

0 commit comments

Comments
 (0)