Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd/client

go 1.23.0

replace github.com/pingcap/kvproto => github.com/hujiatao0/kvproto v0.0.0-20250923042047-f6f0fb3d1f87

require (
github.com/BurntSushi/toml v0.3.1
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hujiatao0/kvproto v0.0.0-20250923042047-f6f0fb3d1f87 h1:hmleDBMW/Z/ruGsRmML7hGAgKduokXqfWLX9HOb0V5I=
github.com/hujiatao0/kvproto v0.0.0-20250923042047-f6f0fb3d1f87/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
Expand All @@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3 h1:OcZxUJEwZzFIqY8AkRIHuEK8U1X5OyLfqAwVnhaKsag=
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ go 1.23.0
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/hujiatao0/kvproto v0.0.0-20250923042047-f6f0fb3d1f87

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/BurntSushi/toml v1.5.0
Expand Down
1,674 changes: 1,665 additions & 9 deletions go.sum

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ func (s *StoreInfo) IsSlow() bool {
return s.IsEvictedAsSlowTrend() || s.rawStats.GetSlowScore() >= slowStoreThreshold
}

// IsStopping checks if the store is in stopping state.
func (s *StoreInfo) IsStopping() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.rawStats.GetIsStopping()
}

// GetSlowTrend returns the slow trend information of the store.
func (s *StoreInfo) GetSlowTrend() *pdpb.SlowTrend {
s.mu.RLock()
Expand Down Expand Up @@ -933,7 +940,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64, direction constant.Dir
s.stores[storeID] = store.Clone(ResumeLeaderTransfer(direction))
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// SlowStoreEvicted marks a store as a slow/stopping store and prevents transferring
// leader to the store
func (s *StoresInfo) SlowStoreEvicted(storeID uint64) error {
s.Lock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@
}
}

func (s *Service) RegionBuckets(stream schedulingpb.Scheduling_RegionBucketsServer) error {

Check failure on line 165 in pkg/mcs/scheduling/server/grpc_service.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported method Service.RegionBuckets should have comment or be unexported (revive)
return nil
}

// StoreHeartbeat implements gRPC SchedulingServer.
func (s *Service) StoreHeartbeat(_ context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) {
c := s.GetCluster()
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ var defaultSchedulersInit = func() SchedulerConfigs {
}
if !kerneltype.IsNextGen() {
defaultSchedulers = append(defaultSchedulers, SchedulerConfig{Type: types.SchedulerTypeCompatibleMap[types.EvictSlowStoreScheduler]})
defaultSchedulers = append(defaultSchedulers, SchedulerConfig{Type: types.SchedulerTypeCompatibleMap[types.EvictStoppingStoreScheduler]})
}
return defaultSchedulers
}
Expand Down
306 changes: 306 additions & 0 deletions pkg/schedule/schedulers/evict_stopping_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"strconv"

"github.com/gorilla/mux"
"github.com/unrolled/render"
"go.uber.org/zap"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/core"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/keyutil"
)

type stoppingStoreType string

const (
gracefulShutdownStore stoppingStoreType = "graceful-shutdown"
)

type evictStoppingStoreSchedulerConfig struct {
baseDefaultSchedulerConfig

cluster *core.BasicCluster
// EvictedStores stores the stopping stores that are being evicted
EvictedStores []uint64 `json:"evicted-stores"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func initEvictStoppingStoreSchedulerConfig() *evictStoppingStoreSchedulerConfig {
return &evictStoppingStoreSchedulerConfig{
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
EvictedStores: make([]uint64, 0),
Batch: EvictLeaderBatchSize,
}
}

func (conf *evictStoppingStoreSchedulerConfig) persistLocked(updateFn func()) error {
var (
oldEvictedStores = conf.EvictedStores
oldBatch = conf.Batch
)
updateFn()
if err := conf.save(); err != nil {
conf.EvictedStores = oldEvictedStores
conf.Batch = oldBatch
return err
}
return nil
}

func (conf *evictStoppingStoreSchedulerConfig) getStores() []uint64 {
conf.RLock()
defer conf.RUnlock()
return conf.EvictedStores
}

func (*evictStoppingStoreSchedulerConfig) getKeyRangesByID(uint64) []keyutil.KeyRange {
return []keyutil.KeyRange{keyutil.NewKeyRange("", "")}
}

func (conf *evictStoppingStoreSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
return conf.Batch
}

func (conf *evictStoppingStoreSchedulerConfig) evictStore() uint64 {
conf.RLock()
defer conf.RUnlock()
if len(conf.EvictedStores) == 0 {
return 0
}
return conf.EvictedStores[0]
}

func (conf *evictStoppingStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
conf.Lock()
defer conf.Unlock()
return conf.persistLocked(func() {
conf.EvictedStores = []uint64{id}
})
}

func (conf *evictStoppingStoreSchedulerConfig) clearEvictedAndPersist() (oldID uint64, err error) {
oldID = conf.evictStore()
conf.Lock()
defer conf.Unlock()
if oldID > 0 {
err = conf.persistLocked(func() {
conf.EvictedStores = []uint64{}
})
}
return
}

type evictStoppingStoreHandler struct {
rd *render.Render
config *evictStoppingStoreSchedulerConfig
}

func newEvictStoppingStoreHandler(config *evictStoppingStoreSchedulerConfig) http.Handler {
h := &evictStoppingStoreHandler{
config: config,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet)
return router
}

func (handler *evictStoppingStoreHandler) updateConfig(w http.ResponseWriter, r *http.Request) {

Check failure on line 134 in pkg/schedule/schedulers/evict_stopping_store.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'r' seems to be unused, consider removing or renaming it to match ^_ (revive)
handler.rd.JSON(w, http.StatusOK, "Config updated.")
}

func (handler *evictStoppingStoreHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
handler.config.RLock()
defer handler.config.RUnlock()
conf := &evictStoppingStoreSchedulerConfig{
Batch: handler.config.Batch,
EvictedStores: handler.config.EvictedStores,
}
handler.rd.JSON(w, http.StatusOK, conf)
}

type evictStoppingStoreScheduler struct {
*BaseScheduler
conf *evictStoppingStoreSchedulerConfig
handler http.Handler
}

// ServeHTTP implements the http.Handler interface.
func (s *evictStoppingStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// EncodeConfig implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

// ReloadConfig implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

newCfg := &evictStoppingStoreSchedulerConfig{}
if err := s.conf.load(newCfg); err != nil {
return err
}
if newCfg.Batch == 0 {
newCfg.Batch = EvictLeaderBatchSize
}

s.conf.EvictedStores = newCfg.EvictedStores
s.conf.Batch = newCfg.Batch
return nil
}

// PrepareConfig implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictStore := s.conf.evictStore()
if evictStore != 0 {
if err := cluster.SlowStoreEvicted(evictStore); err != nil {
return err
}
}
return nil
}

// CleanConfig implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

func (s *evictStoppingStoreScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error {
if err := cluster.SlowStoreEvicted(storeID); err != nil {
log.Info("failed to evict stopping store", zap.Uint64("store-id", storeID), zap.Error(err))
return err
}

if err := s.conf.setStoreAndPersist(storeID); err != nil {
log.Info("failed to persist evicted stopping store", zap.Uint64("store-id", storeID), zap.Error(err))
cluster.SlowStoreRecovered(storeID)
return err
}

return nil
}

func (s *evictStoppingStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) {
evictStoppingStore, err := s.conf.clearEvictedAndPersist()
if err != nil {
log.Info("evict-stopping-store-scheduler persist config failed", zap.Uint64("store-id", evictStoppingStore))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to keep the consistency between storage and memory.

Suggested change
log.Info("evict-stopping-store-scheduler persist config failed", zap.Uint64("store-id", evictStoppingStore))
log.Warn("evict-stopping-store-scheduler persist config failed", zap.Uint64("store-id", evictStoppingStore))
return

}
if evictStoppingStore == 0 {
return
}
cluster.SlowStoreRecovered(evictStoppingStore)
}

func (s *evictStoppingStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator {
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf)
}

// IsScheduleAllowed implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
if s.conf.evictStore() != 0 {
allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader)
}
return allowed
}
return true
}

// Schedule implements the Scheduler interface.
func (s *evictStoppingStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
evictLeaderCounter.Inc()
s.scheduleStoppingStore(cluster)
return s.schedulerEvictLeader(cluster), nil
}

func (s *evictStoppingStoreScheduler) scheduleStoppingStore(cluster sche.SchedulerCluster) {
if s.conf.evictStore() != 0 {
store := cluster.GetStore(s.conf.evictStore())
if store == nil || store.IsRemoved() {
// Previous stopping store had been removed, remove the scheduler and check next time.
log.Info("stopping store has been removed", zap.Uint64("store-id", s.conf.evictStore()))
s.cleanupEvictLeader(cluster)
return
}

// recover stopping store if it's no longer in stopping state.
if !store.IsStopping() {
log.Info("stopping store has been recovered", zap.Uint64("store-id", store.GetID()))
s.cleanupEvictLeader(cluster)
return
}
return
}

var stoppingStore *core.StoreInfo

for _, store := range cluster.GetStores() {
if store.IsRemoved() {
continue
}

if (store.IsPreparing() || store.IsServing()) && store.IsStopping() {
// Do nothing if there is more than one stopping store.
if stoppingStore != nil {
return
}
stoppingStore = store
}
}

if stoppingStore == nil {
return
}

// If there is only one stopping store, evict leaders from that store.
log.Info("detected stopping store, start to evict leaders", zap.Uint64("store-id", stoppingStore.GetID()))
err := s.prepareEvictLeader(cluster, stoppingStore.GetID())
if err != nil {
log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", stoppingStore.GetID()))
return
}
// Record the stopping store evicted status.
storeIDStr := strconv.FormatUint(stoppingStore.GetID(), 10)
evictedSlowStoreStatusGauge.WithLabelValues(storeIDStr, string(gracefulShutdownStore)).Set(1)
}

// newEvictStoppingStoreScheduler creates a scheduler that detects and evicts stopping stores.
func newEvictStoppingStoreScheduler(opController *operator.Controller, conf *evictStoppingStoreSchedulerConfig) Scheduler {
handler := newEvictStoppingStoreHandler(conf)
return &evictStoppingStoreScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictStoppingStoreScheduler, conf),
conf: conf,
handler: handler,
}
}
Loading
Loading