Skip to content

Commit a09717b

Browse files
mcs: support ttl config (#7409)
close #7296 Signed-off-by: Ryan Leung <rleungx@gmail.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 9f4803d commit a09717b

File tree

7 files changed

+324
-151
lines changed

7 files changed

+324
-151
lines changed

pkg/mcs/scheduling/server/config/config.go

Lines changed: 192 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"os"
2020
"path/filepath"
2121
"reflect"
22+
"strconv"
2223
"strings"
2324
"sync/atomic"
2425
"time"
@@ -30,6 +31,7 @@ import (
3031
"github.com/pingcap/kvproto/pkg/metapb"
3132
"github.com/pingcap/log"
3233
"github.com/spf13/pflag"
34+
"github.com/tikv/pd/pkg/cache"
3335
"github.com/tikv/pd/pkg/core/constant"
3436
"github.com/tikv/pd/pkg/core/storelimit"
3537
"github.com/tikv/pd/pkg/mcs/utils"
@@ -220,6 +222,7 @@ func (c *Config) Clone() *Config {
220222
// PersistConfig wraps all configurations that need to persist to storage and
221223
// allows to access them safely.
222224
type PersistConfig struct {
225+
ttl *cache.TTLString
223226
// Store the global configuration that is related to the scheduling.
224227
clusterVersion unsafe.Pointer
225228
schedule atomic.Value
@@ -231,14 +234,15 @@ type PersistConfig struct {
231234
}
232235

233236
// NewPersistConfig creates a new PersistConfig instance.
234-
func NewPersistConfig(cfg *Config) *PersistConfig {
237+
func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
235238
o := &PersistConfig{}
236239
o.SetClusterVersion(&cfg.ClusterVersion)
237240
o.schedule.Store(&cfg.Schedule)
238241
o.replication.Store(&cfg.Replication)
239242
// storeConfig will be fetched from TiKV by PD API server,
240243
// so we just set an empty value here first.
241244
o.storeConfig.Store(&sc.StoreConfig{})
245+
o.ttl = ttl
242246
return o
243247
}
244248

@@ -329,16 +333,6 @@ func (o *PersistConfig) GetMaxReplicas() int {
329333
return int(o.GetReplicationConfig().MaxReplicas)
330334
}
331335

332-
// GetMaxSnapshotCount returns the max snapshot count.
333-
func (o *PersistConfig) GetMaxSnapshotCount() uint64 {
334-
return o.GetScheduleConfig().MaxSnapshotCount
335-
}
336-
337-
// GetMaxPendingPeerCount returns the max pending peer count.
338-
func (o *PersistConfig) GetMaxPendingPeerCount() uint64 {
339-
return o.GetScheduleConfig().MaxPendingPeerCount
340-
}
341-
342336
// IsPlacementRulesEnabled returns if the placement rules is enabled.
343337
func (o *PersistConfig) IsPlacementRulesEnabled() bool {
344338
return o.GetReplicationConfig().EnablePlacementRules
@@ -354,31 +348,6 @@ func (o *PersistConfig) GetHighSpaceRatio() float64 {
354348
return o.GetScheduleConfig().HighSpaceRatio
355349
}
356350

357-
// GetHotRegionScheduleLimit returns the limit for hot region schedule.
358-
func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 {
359-
return o.GetScheduleConfig().HotRegionScheduleLimit
360-
}
361-
362-
// GetRegionScheduleLimit returns the limit for region schedule.
363-
func (o *PersistConfig) GetRegionScheduleLimit() uint64 {
364-
return o.GetScheduleConfig().RegionScheduleLimit
365-
}
366-
367-
// GetLeaderScheduleLimit returns the limit for leader schedule.
368-
func (o *PersistConfig) GetLeaderScheduleLimit() uint64 {
369-
return o.GetScheduleConfig().LeaderScheduleLimit
370-
}
371-
372-
// GetReplicaScheduleLimit returns the limit for replica schedule.
373-
func (o *PersistConfig) GetReplicaScheduleLimit() uint64 {
374-
return o.GetScheduleConfig().ReplicaScheduleLimit
375-
}
376-
377-
// GetMergeScheduleLimit returns the limit for merge schedule.
378-
func (o *PersistConfig) GetMergeScheduleLimit() uint64 {
379-
return o.GetScheduleConfig().MergeScheduleLimit
380-
}
381-
382351
// GetLeaderSchedulePolicy is to get leader schedule policy.
383352
func (o *PersistConfig) GetLeaderSchedulePolicy() constant.SchedulePolicy {
384353
return constant.StringToSchedulePolicy(o.GetScheduleConfig().LeaderSchedulePolicy)
@@ -419,26 +388,11 @@ func (o *PersistConfig) IsOneWayMergeEnabled() bool {
419388
return o.GetScheduleConfig().EnableOneWayMerge
420389
}
421390

422-
// GetMaxMergeRegionSize returns the max region size.
423-
func (o *PersistConfig) GetMaxMergeRegionSize() uint64 {
424-
return o.GetScheduleConfig().MaxMergeRegionSize
425-
}
426-
427-
// GetMaxMergeRegionKeys returns the max region keys.
428-
func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 {
429-
return o.GetScheduleConfig().MaxMergeRegionKeys
430-
}
431-
432391
// GetRegionScoreFormulaVersion returns the region score formula version.
433392
func (o *PersistConfig) GetRegionScoreFormulaVersion() string {
434393
return o.GetScheduleConfig().RegionScoreFormulaVersion
435394
}
436395

437-
// GetSchedulerMaxWaitingOperator returns the scheduler max waiting operator.
438-
func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 {
439-
return o.GetScheduleConfig().SchedulerMaxWaitingOperator
440-
}
441-
442396
// GetHotRegionCacheHitsThreshold returns the hot region cache hits threshold.
443397
func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
444398
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
@@ -474,11 +428,6 @@ func (o *PersistConfig) GetTolerantSizeRatio() float64 {
474428
return o.GetScheduleConfig().TolerantSizeRatio
475429
}
476430

477-
// GetWitnessScheduleLimit returns the limit for region schedule.
478-
func (o *PersistConfig) GetWitnessScheduleLimit() uint64 {
479-
return o.GetScheduleConfig().WitnessScheduleLimit
480-
}
481-
482431
// IsDebugMetricsEnabled returns if debug metrics is enabled.
483432
func (o *PersistConfig) IsDebugMetricsEnabled() bool {
484433
return o.GetScheduleConfig().EnableDebugMetrics
@@ -509,11 +458,6 @@ func (o *PersistConfig) IsRemoveExtraReplicaEnabled() bool {
509458
return o.GetScheduleConfig().EnableRemoveExtraReplica
510459
}
511460

512-
// IsLocationReplacementEnabled returns if location replace is enabled.
513-
func (o *PersistConfig) IsLocationReplacementEnabled() bool {
514-
return o.GetScheduleConfig().EnableLocationReplacement
515-
}
516-
517461
// IsWitnessAllowed returns if the witness is allowed.
518462
func (o *PersistConfig) IsWitnessAllowed() bool {
519463
return o.GetScheduleConfig().EnableWitness
@@ -534,8 +478,87 @@ func (o *PersistConfig) GetStoresLimit() map[uint64]sc.StoreLimitConfig {
534478
return o.GetScheduleConfig().StoreLimit
535479
}
536480

481+
// TTL related methods.
482+
483+
// GetLeaderScheduleLimit returns the limit for leader schedule.
484+
func (o *PersistConfig) GetLeaderScheduleLimit() uint64 {
485+
return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit)
486+
}
487+
488+
// GetRegionScheduleLimit returns the limit for region schedule.
489+
func (o *PersistConfig) GetRegionScheduleLimit() uint64 {
490+
return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit)
491+
}
492+
493+
// GetWitnessScheduleLimit returns the limit for region schedule.
494+
func (o *PersistConfig) GetWitnessScheduleLimit() uint64 {
495+
return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit)
496+
}
497+
498+
// GetReplicaScheduleLimit returns the limit for replica schedule.
499+
func (o *PersistConfig) GetReplicaScheduleLimit() uint64 {
500+
return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit)
501+
}
502+
503+
// GetMergeScheduleLimit returns the limit for merge schedule.
504+
func (o *PersistConfig) GetMergeScheduleLimit() uint64 {
505+
return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit)
506+
}
507+
508+
// GetHotRegionScheduleLimit returns the limit for hot region schedule.
509+
func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 {
510+
return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit)
511+
}
512+
513+
// GetStoreLimit returns the limit of a store.
514+
func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) {
515+
defer func() {
516+
returnSC.RemovePeer = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returnSC.RemovePeer)
517+
returnSC.AddPeer = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returnSC.AddPeer)
518+
}()
519+
if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok {
520+
return limit
521+
}
522+
cfg := o.GetScheduleConfig().Clone()
523+
sc := sc.StoreLimitConfig{
524+
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
525+
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
526+
}
527+
v, ok1, err := o.getTTLFloat("default-add-peer")
528+
if err != nil {
529+
log.Warn("failed to parse default-add-peer from PersistOptions's ttl storage", zap.Error(err))
530+
}
531+
canSetAddPeer := ok1 && err == nil
532+
if canSetAddPeer {
533+
returnSC.AddPeer = v
534+
}
535+
536+
v, ok2, err := o.getTTLFloat("default-remove-peer")
537+
if err != nil {
538+
log.Warn("failed to parse default-remove-peer from PersistOptions's ttl storage", zap.Error(err))
539+
}
540+
canSetRemovePeer := ok2 && err == nil
541+
if canSetRemovePeer {
542+
returnSC.RemovePeer = v
543+
}
544+
545+
if canSetAddPeer || canSetRemovePeer {
546+
return returnSC
547+
}
548+
cfg.StoreLimit[storeID] = sc
549+
o.SetScheduleConfig(cfg)
550+
return o.GetScheduleConfig().StoreLimit[storeID]
551+
}
552+
537553
// GetStoreLimitByType returns the limit of a store with a given type.
538554
func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) {
555+
defer func() {
556+
if typ == storelimit.RemovePeer {
557+
returned = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returned)
558+
} else if typ == storelimit.AddPeer {
559+
returned = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returned)
560+
}
561+
}()
539562
limit := o.GetStoreLimit(storeID)
540563
switch typ {
541564
case storelimit.AddPeer:
@@ -550,20 +573,48 @@ func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type)
550573
}
551574
}
552575

553-
// GetStoreLimit returns the limit of a store.
554-
func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) {
555-
if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok {
556-
return limit
576+
// GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send.
577+
func (o *PersistConfig) GetMaxSnapshotCount() uint64 {
578+
return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount)
579+
}
580+
581+
// GetMaxPendingPeerCount returns the number of the max pending peers.
582+
func (o *PersistConfig) GetMaxPendingPeerCount() uint64 {
583+
return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount)
584+
}
585+
586+
// GetMaxMergeRegionSize returns the max region size.
587+
func (o *PersistConfig) GetMaxMergeRegionSize() uint64 {
588+
return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize)
589+
}
590+
591+
// GetMaxMergeRegionKeys returns the max number of keys.
592+
// It returns size * 10000 if the key of max-merge-region-Keys doesn't exist.
593+
func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 {
594+
keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey)
595+
if exist && err == nil {
596+
return keys
557597
}
558-
cfg := o.GetScheduleConfig().Clone()
559-
sc := sc.StoreLimitConfig{
560-
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
561-
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
598+
size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey)
599+
if exist && err == nil {
600+
return size * 10000
562601
}
602+
return o.GetScheduleConfig().GetMaxMergeRegionKeys()
603+
}
563604

564-
cfg.StoreLimit[storeID] = sc
565-
o.SetScheduleConfig(cfg)
566-
return o.GetScheduleConfig().StoreLimit[storeID]
605+
// GetSchedulerMaxWaitingOperator returns the number of the max waiting operators.
606+
func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 {
607+
return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator)
608+
}
609+
610+
// IsLocationReplacementEnabled returns if location replace is enabled.
611+
func (o *PersistConfig) IsLocationReplacementEnabled() bool {
612+
return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement)
613+
}
614+
615+
// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
616+
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
617+
return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion)
567618
}
568619

569620
// SetAllStoresLimit sets all store limit for a given type and rate.
@@ -680,11 +731,6 @@ func (o *PersistConfig) IsRaftKV2() bool {
680731
return o.GetStoreConfig().IsRaftKV2()
681732
}
682733

683-
// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
684-
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
685-
return o.GetScheduleConfig().EnableTiKVSplitRegion
686-
}
687-
688734
// TODO: implement the following methods
689735

690736
// AddSchedulerCfg adds the scheduler configurations.
@@ -710,3 +756,72 @@ func (o *PersistConfig) IsTraceRegionFlow() bool {
710756
func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error {
711757
return nil
712758
}
759+
760+
func (o *PersistConfig) getTTLUint(key string) (uint64, bool, error) {
761+
stringForm, ok := o.GetTTLData(key)
762+
if !ok {
763+
return 0, false, nil
764+
}
765+
r, err := strconv.ParseUint(stringForm, 10, 64)
766+
return r, true, err
767+
}
768+
769+
func (o *PersistConfig) getTTLUintOr(key string, defaultValue uint64) uint64 {
770+
if v, ok, err := o.getTTLUint(key); ok {
771+
if err == nil {
772+
return v
773+
}
774+
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
775+
}
776+
return defaultValue
777+
}
778+
779+
func (o *PersistConfig) getTTLBool(key string) (result bool, contains bool, err error) {
780+
stringForm, ok := o.GetTTLData(key)
781+
if !ok {
782+
return
783+
}
784+
result, err = strconv.ParseBool(stringForm)
785+
contains = true
786+
return
787+
}
788+
789+
func (o *PersistConfig) getTTLBoolOr(key string, defaultValue bool) bool {
790+
if v, ok, err := o.getTTLBool(key); ok {
791+
if err == nil {
792+
return v
793+
}
794+
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
795+
}
796+
return defaultValue
797+
}
798+
799+
func (o *PersistConfig) getTTLFloat(key string) (float64, bool, error) {
800+
stringForm, ok := o.GetTTLData(key)
801+
if !ok {
802+
return 0, false, nil
803+
}
804+
r, err := strconv.ParseFloat(stringForm, 64)
805+
return r, true, err
806+
}
807+
808+
func (o *PersistConfig) getTTLFloatOr(key string, defaultValue float64) float64 {
809+
if v, ok, err := o.getTTLFloat(key); ok {
810+
if err == nil {
811+
return v
812+
}
813+
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
814+
}
815+
return defaultValue
816+
}
817+
818+
// GetTTLData returns if there is a TTL data for a given key.
819+
func (o *PersistConfig) GetTTLData(key string) (string, bool) {
820+
if o.ttl == nil {
821+
return "", false
822+
}
823+
if result, ok := o.ttl.Get(key); ok {
824+
return result.(string), ok
825+
}
826+
return "", false
827+
}

0 commit comments

Comments
 (0)