Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
}

// PutStore put a store.
func (bc *BasicCluster) PutStore(store *StoreInfo) {
func (bc *BasicCluster) PutStore(store *StoreInfo, opts ...StoreCreateOption) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SetStore(store)
bc.Stores.SetStore(store, opts...)
}

// ResetStores resets the store cache.
Expand Down
5 changes: 4 additions & 1 deletion pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,10 @@ func (s *StoresInfo) GetStore(storeID uint64) *StoreInfo {
}

// SetStore sets a StoreInfo with storeID.
func (s *StoresInfo) SetStore(store *StoreInfo) {
func (s *StoresInfo) SetStore(store *StoreInfo, opts ...StoreCreateOption) {
if len(opts) > 0 {
store = s.stores[store.GetID()].Clone(opts...)
}
s.stores[store.GetID()] = store
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/core/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,30 @@ func newStoreInfoWithDisk(id, used, available, capacity, regionSize uint64) *Sto
)
return store
}

func TestSetStore(t *testing.T) {
store := newStoreInfoWithAvailable(1, 20*units.GiB, 100*units.GiB, 1.4)
storesInfo := NewStoresInfo()
storesInfo.SetStore(store)
re := require.New(t)
re.Equal(store, storesInfo.GetStore(store.GetID()))

opts := []StoreCreateOption{SetStoreState(metapb.StoreState_Up)}
store = store.Clone(opts...)
re.NotEqual(store, storesInfo.GetStore(store.GetID()))
storesInfo.SetStore(store, opts...)
re.Equal(store, storesInfo.GetStore(store.GetID()))

opts = []StoreCreateOption{
SetStoreStats(&pdpb.StoreStats{
Capacity: 100 * units.GiB,
Available: 20 * units.GiB,
UsedSize: 80 * units.GiB,
}),
SetLastHeartbeatTS(time.Now()),
}
store = store.Clone(opts...)
re.NotEqual(store, storesInfo.GetStore(store.GetID()))
storesInfo.SetStore(store, opts...)
re.Equal(store, storesInfo.GetStore(store.GetID()))
}
11 changes: 3 additions & 8 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,9 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
}
c.PutStore(newStore)
c.hotStat.Observe(storeID, newStore.GetStoreStats())
statistics.UpdateStoreHeartbeatMetrics(store)
c.PutStore(store, core.SetStoreStats(stats), core.SetLastHeartbeatTS(time.Now()))
c.hotStat.Observe(storeID, stats)
c.hotStat.FilterUnhealthyStore(c)
reportInterval := stats.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (w *Watcher) initializeStoreWatcher() error {
if origin == nil {
w.basicCluster.PutStore(core.NewStoreInfo(store))
} else {
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
w.basicCluster.PutStore(origin, core.SetStoreMeta(store))
}

if store.GetNodeState() == metapb.NodeState_Removed {
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MetaStorage interface {
LoadStoreMeta(storeID uint64, store *metapb.Store) (bool, error)
SaveStoreMeta(store *metapb.Store) error
SaveStoreWeight(storeID uint64, leader, region float64) error
LoadStores(f func(store *core.StoreInfo)) error
LoadStores(f func(store *core.StoreInfo, opts ...core.StoreCreateOption)) error
DeleteStoreMeta(store *metapb.Store) error
RegionStorage
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func (se *StorageEndpoint) SaveStoreWeight(storeID uint64, leader, region float6
}

// LoadStores loads all stores from storage to StoresInfo.
func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error {
func (se *StorageEndpoint) LoadStores(putStore func(store *core.StoreInfo, opts ...core.StoreCreateOption)) error {
nextID := uint64(0)
endKey := StorePath(math.MaxUint64)
for {
Expand Down Expand Up @@ -120,10 +120,12 @@ func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error {
if err != nil {
return err
}
newStoreInfo := core.NewStoreInfo(store, core.SetLeaderWeight(leaderWeight), core.SetRegionWeight(regionWeight))

nextID = store.GetId() + 1
f(newStoreInfo)
putStore(core.NewStoreInfo(store,
core.SetLeaderWeight(leaderWeight),
core.SetRegionWeight(regionWeight),
))
}
if len(res) < MinKVRangeLimit {
return nil
Expand Down
72 changes: 35 additions & 37 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,32 +855,30 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest

limit := store.GetStoreLimit()
version := c.opt.GetStoreLimitVersion()
var opt core.StoreCreateOption
opts := make([]core.StoreCreateOption, 0)
if limit == nil || limit.Version() != version {
if version == storelimit.VersionV2 {
limit = storelimit.NewSlidingWindows()
} else {
limit = storelimit.NewStoreRateLimit(0.0)
}
opt = core.SetStoreLimit(limit)
opts = append(opts, core.SetStoreLimit(limit))
}

nowTime := time.Now()
var newStore *core.StoreInfo
// If this cluster has slow stores, we should awaken hibernated regions in other stores.
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
opts = append(opts, core.SetLastAwakenTime(nowTime))
resp.AwakenRegions = &pdpb.AwakenRegions{
AbnormalStores: slowStoreIDs,
}
} else {
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
}
} else {
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
}
opts = append(opts, core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))

newStore := store.Clone(opts...)

if newStore.IsLowSpace(c.opt.GetLowSpaceRatio()) {
log.Warn("store does not have enough disk space",
Expand All @@ -892,17 +890,15 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
if err := c.storage.SaveStoreMeta(newStore.GetMeta()); err != nil {
log.Error("failed to persist store", zap.Uint64("store-id", storeID), errs.ZapError(err))
} else {
newStore = newStore.Clone(core.SetLastPersistTime(nowTime))
opts = append(opts, core.SetLastPersistTime(nowTime))
}
}
if store := c.core.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
}
statistics.UpdateStoreHeartbeatMetrics(store)
c.core.PutStore(newStore, opts...)
// Supply NodeState in the response to help the store handle special cases
// more conveniently, such as avoiding calling `remove_peer` redundantly under
// NodeState_Removing.
resp.State = store.GetNodeState()
c.core.PutStore(newStore)
var (
regions map[uint64]*core.RegionInfo
interval uint64
Expand Down Expand Up @@ -1288,8 +1284,8 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error {
}

// PutStore puts a store.
func (c *RaftCluster) PutStore(store *metapb.Store) error {
if err := c.putStoreImpl(store, false); err != nil {
func (c *RaftCluster) PutStore(store *metapb.Store, opts ...core.StoreCreateOption) error {
if err := c.putStoreImpl(store, false, opts...); err != nil {
return err
}
c.OnStoreVersionChange()
Expand All @@ -1300,7 +1296,7 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {
// putStoreImpl puts a store.
// If 'force' is true, the store's labels will overwrite those labels which already existed in the store.
// If 'force' is false, the store's labels will merge into those labels which already existed in the store.
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool, opts ...core.StoreCreateOption) error {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -1333,19 +1329,19 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
if !force {
labels = core.MergeLabels(s.GetLabels(), labels)
}
// Update an existed store.
s = s.Clone(
opts = append(opts,
core.SetStoreAddress(store.Address, store.StatusAddress, store.PeerAddress),
core.SetStoreVersion(store.GitHash, store.Version),
core.SetStoreLabels(labels),
core.SetStoreStartTime(store.StartTimestamp),
core.SetStoreDeployPath(store.DeployPath),
)
core.SetStoreDeployPath(store.DeployPath))
// Update an existed store.
s = s.Clone(opts...)
}
if err := c.checkStoreLabels(s); err != nil {
return err
}
return c.putStoreLocked(s)
return c.putStoreLocked(s, opts...)
}

func (c *RaftCluster) checkStoreVersion(store *metapb.Store) error {
Expand Down Expand Up @@ -1415,12 +1411,15 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
return err
}
}
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed))

log.Warn("store has been offline",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed()))
err := c.putStoreLocked(newStore)
zap.String("store-address", store.GetAddress()),
zap.Bool("physically-destroyed", store.IsPhysicallyDestroyed()))
err := c.putStoreLocked(
store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed)),
core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed),
)
if err == nil {
regionSize := float64(c.core.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
Expand Down Expand Up @@ -1506,13 +1505,15 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
}
}

newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
log.Warn("store has been Tombstone",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
zap.String("store-address", store.GetAddress()),
zap.String("state", store.GetState().String()),
zap.Bool("physically-destroyed", store.IsPhysicallyDestroyed()))
err := c.putStoreLocked(newStore)
err := c.putStoreLocked(
store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)),
core.SetStoreState(metapb.StoreState_Tombstone),
)
c.onStoreVersionChangeLocked()
if err == nil {
// clean up the residual information.
Expand Down Expand Up @@ -1627,7 +1628,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error {
log.Warn("store has been up",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()))
err := c.putStoreLocked(newStore)
err := c.putStoreLocked(newStore, options...)
if err == nil {
if exist {
// persist the store limit
Expand Down Expand Up @@ -1665,7 +1666,7 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error {
log.Info("store has changed to serving",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()))
err := c.putStoreLocked(newStore)
err := c.putStoreLocked(newStore, core.SetStoreState(metapb.StoreState_Up))
if err == nil {
c.resetProgress(storeID, store.GetAddress())
}
Expand All @@ -1683,21 +1684,19 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight
return err
}

newStore := store.Clone(
return c.putStoreLocked(store,
core.SetLeaderWeight(leaderWeight),
core.SetRegionWeight(regionWeight),
)

return c.putStoreLocked(newStore)
}

func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
func (c *RaftCluster) putStoreLocked(store *core.StoreInfo, opts ...core.StoreCreateOption) error {
if c.storage != nil {
if err := c.storage.SaveStoreMeta(store.GetMeta()); err != nil {
return err
}
}
c.core.PutStore(store)
c.core.PutStore(store, opts...)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.updateStoreStatistics(store.GetID(), store.IsSlow())
}
Expand Down Expand Up @@ -2273,8 +2272,7 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

newStore := store.Clone(core.SetMinResolvedTS(minResolvedTS))
c.core.PutStore(newStore)
c.core.PutStore(store, core.SetMinResolvedTS(minResolvedTS))
return nil
}

Expand Down