Skip to content

Commit a9d6e3e

Browse files
Merge branch 'master' into fix_coordinator
2 parents 6281684 + 0adb86f commit a9d6e3e

File tree

23 files changed

+923
-289
lines changed

23 files changed

+923
-289
lines changed

metrics/grafana/pd.json

Lines changed: 124 additions & 131 deletions
Large diffs are not rendered by default.

pkg/mcs/scheduling/server/cluster.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,12 +425,75 @@ func (c *Cluster) runCoordinator() {
425425
c.coordinator.RunUntilStop()
426426
}
427427

428+
func (c *Cluster) runMetricsCollectionJob() {
429+
defer logutil.LogPanic()
430+
defer c.wg.Done()
431+
432+
ticker := time.NewTicker(10 * time.Second)
433+
defer ticker.Stop()
434+
435+
for {
436+
select {
437+
case <-c.ctx.Done():
438+
log.Info("metrics are reset")
439+
c.resetMetrics()
440+
log.Info("metrics collection job has been stopped")
441+
return
442+
case <-ticker.C:
443+
c.collectMetrics()
444+
}
445+
}
446+
}
447+
448+
func (c *Cluster) collectMetrics() {
449+
statsMap := statistics.NewStoreStatisticsMap(c.persistConfig)
450+
stores := c.GetStores()
451+
for _, s := range stores {
452+
statsMap.Observe(s)
453+
statsMap.ObserveHotStat(s, c.hotStat.StoresStats)
454+
}
455+
statsMap.Collect()
456+
457+
c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
458+
c.coordinator.CollectHotSpotMetrics()
459+
c.collectClusterMetrics()
460+
}
461+
462+
func (c *Cluster) collectClusterMetrics() {
463+
if c.regionStats == nil {
464+
return
465+
}
466+
c.regionStats.Collect()
467+
c.labelStats.Collect()
468+
// collect hot cache metrics
469+
c.hotStat.CollectMetrics()
470+
}
471+
472+
func (c *Cluster) resetMetrics() {
473+
statistics.Reset()
474+
475+
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
476+
c.coordinator.ResetHotSpotMetrics()
477+
c.resetClusterMetrics()
478+
}
479+
480+
func (c *Cluster) resetClusterMetrics() {
481+
if c.regionStats == nil {
482+
return
483+
}
484+
c.regionStats.Reset()
485+
c.labelStats.Reset()
486+
// reset hot cache metrics
487+
c.hotStat.ResetMetrics()
488+
}
489+
428490
// StartBackgroundJobs starts background jobs.
429491
func (c *Cluster) StartBackgroundJobs() {
430-
c.wg.Add(3)
492+
c.wg.Add(4)
431493
go c.updateScheduler()
432494
go c.runUpdateStoreStats()
433495
go c.runCoordinator()
496+
go c.runMetricsCollectionJob()
434497
c.running.Store(true)
435498
}
436499

pkg/mcs/scheduling/server/config/config.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,11 @@ func (o *PersistConfig) IsSchedulingHalted() bool {
499499
return o.GetScheduleConfig().HaltScheduling
500500
}
501501

502+
// GetStoresLimit gets the stores' limit.
503+
func (o *PersistConfig) GetStoresLimit() map[uint64]sc.StoreLimitConfig {
504+
return o.GetScheduleConfig().StoreLimit
505+
}
506+
502507
// GetStoreLimitByType returns the limit of a store with a given type.
503508
func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) {
504509
limit := o.GetStoreLimit(storeID)
@@ -620,11 +625,21 @@ func (o *PersistConfig) GetRegionMaxSize() uint64 {
620625
return o.GetStoreConfig().GetRegionMaxSize()
621626
}
622627

623-
// GetRegionMaxKeys returns the region split keys
628+
// GetRegionMaxKeys returns the max region keys
624629
func (o *PersistConfig) GetRegionMaxKeys() uint64 {
625630
return o.GetStoreConfig().GetRegionMaxKeys()
626631
}
627632

633+
// GetRegionSplitSize returns the region split size in MB
634+
func (o *PersistConfig) GetRegionSplitSize() uint64 {
635+
return o.GetStoreConfig().GetRegionSplitSize()
636+
}
637+
638+
// GetRegionSplitKeys returns the region split keys
639+
func (o *PersistConfig) GetRegionSplitKeys() uint64 {
640+
return o.GetStoreConfig().GetRegionSplitKeys()
641+
}
642+
628643
// IsEnableRegionBucket return true if the region bucket is enabled.
629644
func (o *PersistConfig) IsEnableRegionBucket() bool {
630645
return o.GetStoreConfig().IsEnableRegionBucket()

pkg/schedule/config/config_provider.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type SchedulerConfigProvider interface {
4747
SharedConfigProvider
4848

4949
IsSchedulingHalted() bool
50+
GetStoresLimit() map[uint64]StoreLimitConfig
5051

5152
IsSchedulerDisabled(string) bool
5253
AddSchedulerCfg(string, []string)
@@ -137,6 +138,8 @@ type ConfProvider interface {
137138
type StoreConfigProvider interface {
138139
GetRegionMaxSize() uint64
139140
GetRegionMaxKeys() uint64
141+
GetRegionSplitSize() uint64
142+
GetRegionSplitKeys() uint64
140143
CheckRegionSize(uint64, uint64) error
141144
CheckRegionKeys(uint64, uint64) error
142145
IsEnableRegionBucket() bool

pkg/schedule/coordinator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func (c *Coordinator) PatrolRegions() {
156156
// Note: we reset the ticker here to support updating configuration dynamically.
157157
ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
158158
case <-c.ctx.Done():
159+
patrolCheckRegionsGauge.Set(0)
159160
log.Info("patrol regions has been stopped")
160161
return
161162
}

pkg/schedule/schedulers/evict_slow_trend.go

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
package schedulers
1616

1717
import (
18+
"net/http"
1819
"strconv"
20+
"sync/atomic"
1921
"time"
2022

23+
"github.com/gorilla/mux"
2124
"github.com/pingcap/errors"
2225
"github.com/pingcap/failpoint"
2326
"github.com/pingcap/log"
@@ -26,6 +29,8 @@ import (
2629
"github.com/tikv/pd/pkg/schedule/operator"
2730
"github.com/tikv/pd/pkg/schedule/plan"
2831
"github.com/tikv/pd/pkg/storage/endpoint"
32+
"github.com/tikv/pd/pkg/utils/apiutil"
33+
"github.com/unrolled/render"
2934
"go.uber.org/zap"
3035
)
3136

@@ -54,11 +59,28 @@ type evictSlowTrendSchedulerConfig struct {
5459
evictCandidate slowCandidate
5560
// Last chosen candidate for eviction.
5661
lastEvictCandidate slowCandidate
57-
62+
// Duration gap for recovering the candidate, unit: s.
63+
RecoveryDurationGap uint64 `json:"recovery-duration"`
5864
// Only evict one store for now
5965
EvictedStores []uint64 `json:"evict-by-trend-stores"`
6066
}
6167

68+
func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig {
69+
return &evictSlowTrendSchedulerConfig{
70+
storage: storage,
71+
evictCandidate: slowCandidate{},
72+
lastEvictCandidate: slowCandidate{},
73+
RecoveryDurationGap: defaultRecoveryDurationGap,
74+
EvictedStores: make([]uint64, 0),
75+
}
76+
}
77+
78+
func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig {
79+
return &evictSlowTrendSchedulerConfig{
80+
RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap),
81+
}
82+
}
83+
6284
func (conf *evictSlowTrendSchedulerConfig) Persist() error {
6385
name := conf.getSchedulerName()
6486
data, err := EncodeConfig(conf)
@@ -116,6 +138,15 @@ func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 {
116138
return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS)
117139
}
118140

141+
// readyForRecovery checks whether the last cpatured candidate is ready for recovery.
142+
func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool {
143+
recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap)
144+
failpoint.Inject("transientRecoveryGap", func() {
145+
recoveryDurationGap = 0
146+
})
147+
return conf.lastCandidateCapturedSecs() >= recoveryDurationGap
148+
}
149+
119150
func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) {
120151
conf.evictCandidate = slowCandidate{
121152
storeID: id,
@@ -162,9 +193,52 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule
162193
return oldID, conf.Persist()
163194
}
164195

196+
type evictSlowTrendHandler struct {
197+
rd *render.Render
198+
config *evictSlowTrendSchedulerConfig
199+
}
200+
201+
func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler {
202+
h := &evictSlowTrendHandler{
203+
config: config,
204+
rd: render.New(render.Options{IndentJSON: true}),
205+
}
206+
router := mux.NewRouter()
207+
router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost)
208+
router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet)
209+
return router
210+
}
211+
212+
func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
213+
var input map[string]interface{}
214+
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
215+
return
216+
}
217+
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
218+
if !ok {
219+
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
220+
return
221+
}
222+
recoveryDurationGap := (uint64)(recoveryDurationGapFloat)
223+
prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap)
224+
atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap)
225+
log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
226+
handler.rd.JSON(w, http.StatusOK, nil)
227+
}
228+
229+
func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
230+
conf := handler.config.Clone()
231+
handler.rd.JSON(w, http.StatusOK, conf)
232+
}
233+
165234
type evictSlowTrendScheduler struct {
166235
*BaseScheduler
167-
conf *evictSlowTrendSchedulerConfig
236+
conf *evictSlowTrendSchedulerConfig
237+
handler http.Handler
238+
}
239+
240+
func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
241+
s.handler.ServeHTTP(w, r)
168242
}
169243

170244
func (s *evictSlowTrendScheduler) GetName() string {
@@ -244,7 +318,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
244318
// slow node next time.
245319
log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID()))
246320
storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc()
247-
} else if checkStoreCanRecover(cluster, store, s.conf.lastCandidateCapturedSecs()) {
321+
} else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() {
248322
log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID()))
249323
storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc()
250324
} else {
@@ -301,9 +375,11 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
301375
}
302376

303377
func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler {
378+
handler := newEvictSlowTrendHandler(conf)
304379
return &evictSlowTrendScheduler{
305380
BaseScheduler: NewBaseScheduler(opController),
306381
conf: conf,
382+
handler: handler,
307383
}
308384
}
309385

@@ -453,7 +529,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor
453529
return slowerThanStoresNum >= expected
454530
}
455531

456-
func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo, recoveryGap uint64) bool {
532+
func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool {
457533
/*
458534
//
459535
// This might not be necessary,
@@ -473,7 +549,7 @@ func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo,
473549
storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc()
474550
}
475551
*/
476-
return checkStoreFasterThanOthers(cluster, target) && checkStoreReadyForRecover(target, recoveryGap)
552+
return checkStoreFasterThanOthers(cluster, target)
477553
}
478554

479555
func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool {
@@ -507,19 +583,6 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor
507583
return fasterThanStores >= expected
508584
}
509585

510-
// checkStoreReadyForRecover checks whether the given target store is ready for recover.
511-
func checkStoreReadyForRecover(target *core.StoreInfo, recoveryGap uint64) bool {
512-
durationGap := uint64(defaultRecoveryDurationGap)
513-
failpoint.Inject("transientRecoveryGap", func() {
514-
durationGap = 0
515-
})
516-
if targetSlowTrend := target.GetSlowTrend(); targetSlowTrend != nil {
517-
// TODO: setting the recovery time in SlowTrend
518-
return recoveryGap >= durationGap
519-
}
520-
return true
521-
}
522-
523586
// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s.
524587
func DurationSinceAsSecs(startTS time.Time) uint64 {
525588
return uint64(time.Since(startTS).Seconds())

pkg/schedule/schedulers/evict_slow_trend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendBasicFuncs() {
9393
suite.Equal(*lastCapturedCandidate, es2.conf.evictCandidate)
9494
suite.Equal(es2.conf.candidateCapturedSecs(), uint64(0))
9595
suite.Equal(es2.conf.lastCandidateCapturedSecs(), uint64(0))
96-
suite.False(checkStoreReadyForRecover(store, es2.conf.lastCandidateCapturedSecs()))
96+
suite.False(es2.conf.readyForRecovery())
9797
recoverTS := lastCapturedCandidate.recoverTS
9898
suite.True(recoverTS.After(lastCapturedCandidate.captureTS))
9999
// Pop captured store 1 and mark it has recovered.

pkg/schedule/schedulers/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ func schedulersRegister() {
466466
})
467467

468468
RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) {
469-
conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: slowCandidate{}, lastEvictCandidate: slowCandidate{}}
469+
conf := initEvictSlowTrendSchedulerConfig(storage)
470470
if err := decoder(conf); err != nil {
471471
return nil, err
472472
}

0 commit comments

Comments
 (0)