1515package schedulers
1616
1717import (
18+ "net/http"
1819 "strconv"
20+ "sync/atomic"
1921 "time"
2022
23+ "github.com/gorilla/mux"
2124 "github.com/pingcap/errors"
2225 "github.com/pingcap/failpoint"
2326 "github.com/pingcap/log"
@@ -26,6 +29,8 @@ import (
2629 "github.com/tikv/pd/pkg/schedule/operator"
2730 "github.com/tikv/pd/pkg/schedule/plan"
2831 "github.com/tikv/pd/pkg/storage/endpoint"
32+ "github.com/tikv/pd/pkg/utils/apiutil"
33+ "github.com/unrolled/render"
2934 "go.uber.org/zap"
3035)
3136
@@ -54,11 +59,28 @@ type evictSlowTrendSchedulerConfig struct {
5459 evictCandidate slowCandidate
5560 // Last chosen candidate for eviction.
5661 lastEvictCandidate slowCandidate
57-
62+ // Duration gap for recovering the candidate, unit: s.
63+ RecoveryDurationGap uint64 `json:"recovery-duration"`
5864 // Only evict one store for now
5965 EvictedStores []uint64 `json:"evict-by-trend-stores"`
6066}
6167
68+ func initEvictSlowTrendSchedulerConfig (storage endpoint.ConfigStorage ) * evictSlowTrendSchedulerConfig {
69+ return & evictSlowTrendSchedulerConfig {
70+ storage : storage ,
71+ evictCandidate : slowCandidate {},
72+ lastEvictCandidate : slowCandidate {},
73+ RecoveryDurationGap : defaultRecoveryDurationGap ,
74+ EvictedStores : make ([]uint64 , 0 ),
75+ }
76+ }
77+
78+ func (conf * evictSlowTrendSchedulerConfig ) Clone () * evictSlowTrendSchedulerConfig {
79+ return & evictSlowTrendSchedulerConfig {
80+ RecoveryDurationGap : atomic .LoadUint64 (& conf .RecoveryDurationGap ),
81+ }
82+ }
83+
6284func (conf * evictSlowTrendSchedulerConfig ) Persist () error {
6385 name := conf .getSchedulerName ()
6486 data , err := EncodeConfig (conf )
@@ -116,6 +138,15 @@ func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 {
116138 return DurationSinceAsSecs (conf .lastEvictCandidate .captureTS )
117139}
118140
141+ // readyForRecovery checks whether the last cpatured candidate is ready for recovery.
142+ func (conf * evictSlowTrendSchedulerConfig ) readyForRecovery () bool {
143+ recoveryDurationGap := atomic .LoadUint64 (& conf .RecoveryDurationGap )
144+ failpoint .Inject ("transientRecoveryGap" , func () {
145+ recoveryDurationGap = 0
146+ })
147+ return conf .lastCandidateCapturedSecs () >= recoveryDurationGap
148+ }
149+
119150func (conf * evictSlowTrendSchedulerConfig ) captureCandidate (id uint64 ) {
120151 conf .evictCandidate = slowCandidate {
121152 storeID : id ,
@@ -162,9 +193,52 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule
162193 return oldID , conf .Persist ()
163194}
164195
196+ type evictSlowTrendHandler struct {
197+ rd * render.Render
198+ config * evictSlowTrendSchedulerConfig
199+ }
200+
201+ func newEvictSlowTrendHandler (config * evictSlowTrendSchedulerConfig ) http.Handler {
202+ h := & evictSlowTrendHandler {
203+ config : config ,
204+ rd : render .New (render.Options {IndentJSON : true }),
205+ }
206+ router := mux .NewRouter ()
207+ router .HandleFunc ("/config" , h .UpdateConfig ).Methods (http .MethodPost )
208+ router .HandleFunc ("/list" , h .ListConfig ).Methods (http .MethodGet )
209+ return router
210+ }
211+
212+ func (handler * evictSlowTrendHandler ) UpdateConfig (w http.ResponseWriter , r * http.Request ) {
213+ var input map [string ]interface {}
214+ if err := apiutil .ReadJSONRespondError (handler .rd , w , r .Body , & input ); err != nil {
215+ return
216+ }
217+ recoveryDurationGapFloat , ok := input ["recovery-duration" ].(float64 )
218+ if ! ok {
219+ handler .rd .JSON (w , http .StatusInternalServerError , errors .New ("invalid argument for 'recovery-duration'" ).Error ())
220+ return
221+ }
222+ recoveryDurationGap := (uint64 )(recoveryDurationGapFloat )
223+ prevRecoveryDurationGap := atomic .LoadUint64 (& handler .config .RecoveryDurationGap )
224+ atomic .StoreUint64 (& handler .config .RecoveryDurationGap , recoveryDurationGap )
225+ log .Info ("evict-slow-trend-scheduler update 'recovery-duration' - unit: s" , zap .Uint64 ("prev" , prevRecoveryDurationGap ), zap .Uint64 ("cur" , recoveryDurationGap ))
226+ handler .rd .JSON (w , http .StatusOK , nil )
227+ }
228+
229+ func (handler * evictSlowTrendHandler ) ListConfig (w http.ResponseWriter , r * http.Request ) {
230+ conf := handler .config .Clone ()
231+ handler .rd .JSON (w , http .StatusOK , conf )
232+ }
233+
165234type evictSlowTrendScheduler struct {
166235 * BaseScheduler
167- conf * evictSlowTrendSchedulerConfig
236+ conf * evictSlowTrendSchedulerConfig
237+ handler http.Handler
238+ }
239+
240+ func (s * evictSlowTrendScheduler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
241+ s .handler .ServeHTTP (w , r )
168242}
169243
170244func (s * evictSlowTrendScheduler ) GetName () string {
@@ -244,7 +318,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
244318 // slow node next time.
245319 log .Info ("store evicted by slow trend has been removed" , zap .Uint64 ("store-id" , store .GetID ()))
246320 storeSlowTrendActionStatusGauge .WithLabelValues ("evict" , "stop_removed" ).Inc ()
247- } else if checkStoreCanRecover (cluster , store , s .conf .lastCandidateCapturedSecs () ) {
321+ } else if checkStoreCanRecover (cluster , store ) && s .conf .readyForRecovery ( ) {
248322 log .Info ("store evicted by slow trend has been recovered" , zap .Uint64 ("store-id" , store .GetID ()))
249323 storeSlowTrendActionStatusGauge .WithLabelValues ("evict" , "stop_recovered" ).Inc ()
250324 } else {
@@ -301,9 +375,11 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
301375}
302376
303377func newEvictSlowTrendScheduler (opController * operator.Controller , conf * evictSlowTrendSchedulerConfig ) Scheduler {
378+ handler := newEvictSlowTrendHandler (conf )
304379 return & evictSlowTrendScheduler {
305380 BaseScheduler : NewBaseScheduler (opController ),
306381 conf : conf ,
382+ handler : handler ,
307383 }
308384}
309385
@@ -453,7 +529,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor
453529 return slowerThanStoresNum >= expected
454530}
455531
456- func checkStoreCanRecover (cluster sche.SchedulerCluster , target * core.StoreInfo , recoveryGap uint64 ) bool {
532+ func checkStoreCanRecover (cluster sche.SchedulerCluster , target * core.StoreInfo ) bool {
457533 /*
458534 //
459535 // This might not be necessary,
@@ -473,7 +549,7 @@ func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo,
473549 storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc()
474550 }
475551 */
476- return checkStoreFasterThanOthers (cluster , target ) && checkStoreReadyForRecover ( target , recoveryGap )
552+ return checkStoreFasterThanOthers (cluster , target )
477553}
478554
479555func checkStoreFasterThanOthers (cluster sche.SchedulerCluster , target * core.StoreInfo ) bool {
@@ -507,19 +583,6 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor
507583 return fasterThanStores >= expected
508584}
509585
510- // checkStoreReadyForRecover checks whether the given target store is ready for recover.
511- func checkStoreReadyForRecover (target * core.StoreInfo , recoveryGap uint64 ) bool {
512- durationGap := uint64 (defaultRecoveryDurationGap )
513- failpoint .Inject ("transientRecoveryGap" , func () {
514- durationGap = 0
515- })
516- if targetSlowTrend := target .GetSlowTrend (); targetSlowTrend != nil {
517- // TODO: setting the recovery time in SlowTrend
518- return recoveryGap >= durationGap
519- }
520- return true
521- }
522-
523586// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s.
524587func DurationSinceAsSecs (startTS time.Time ) uint64 {
525588 return uint64 (time .Since (startTS ).Seconds ())
0 commit comments