Skip to content

Commit 9b18ac3

Browse files
ti-chi-botokJiang
andauthored
store: update StoreInfo inside putStoreLocked (#9187) (#9252)
close #9185 store: update StoreInfo inside putStoreLocked Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: okJiang <819421878@qq.com>
1 parent 9c32f96 commit 9b18ac3

File tree

7 files changed

+78
-53
lines changed

7 files changed

+78
-53
lines changed

pkg/core/basic_cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
182182
}
183183

184184
// PutStore put a store.
185-
func (bc *BasicCluster) PutStore(store *StoreInfo) {
185+
func (bc *BasicCluster) PutStore(store *StoreInfo, opts ...StoreCreateOption) {
186186
bc.Stores.mu.Lock()
187187
defer bc.Stores.mu.Unlock()
188-
bc.Stores.SetStore(store)
188+
bc.Stores.SetStore(store, opts...)
189189
}
190190

191191
// ResetStores resets the store cache.

pkg/core/store.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,10 @@ func (s *StoresInfo) GetStore(storeID uint64) *StoreInfo {
659659
}
660660

661661
// SetStore sets a StoreInfo with storeID.
662-
func (s *StoresInfo) SetStore(store *StoreInfo) {
662+
func (s *StoresInfo) SetStore(store *StoreInfo, opts ...StoreCreateOption) {
663+
if len(opts) > 0 {
664+
store = s.stores[store.GetID()].Clone(opts...)
665+
}
663666
s.stores[store.GetID()] = store
664667
}
665668

pkg/core/store_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,30 @@ func newStoreInfoWithDisk(id, used, available, capacity, regionSize uint64) *Sto
235235
)
236236
return store
237237
}
238+
239+
func TestSetStore(t *testing.T) {
240+
store := newStoreInfoWithAvailable(1, 20*units.GiB, 100*units.GiB, 1.4)
241+
storesInfo := NewStoresInfo()
242+
storesInfo.SetStore(store)
243+
re := require.New(t)
244+
re.Equal(store, storesInfo.GetStore(store.GetID()))
245+
246+
opts := []StoreCreateOption{SetStoreState(metapb.StoreState_Up)}
247+
store = store.Clone(opts...)
248+
re.NotEqual(store, storesInfo.GetStore(store.GetID()))
249+
storesInfo.SetStore(store, opts...)
250+
re.Equal(store, storesInfo.GetStore(store.GetID()))
251+
252+
opts = []StoreCreateOption{
253+
SetStoreStats(&pdpb.StoreStats{
254+
Capacity: 100 * units.GiB,
255+
Available: 20 * units.GiB,
256+
UsedSize: 80 * units.GiB,
257+
}),
258+
SetLastHeartbeatTS(time.Now()),
259+
}
260+
store = store.Clone(opts...)
261+
re.NotEqual(store, storesInfo.GetStore(store.GetID()))
262+
storesInfo.SetStore(store, opts...)
263+
re.Equal(store, storesInfo.GetStore(store.GetID()))
264+
}

pkg/mcs/scheduling/server/cluster.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,9 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
382382
return errors.Errorf("store %v not found", storeID)
383383
}
384384

385-
nowTime := time.Now()
386-
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
387-
388-
if store := c.GetStore(storeID); store != nil {
389-
statistics.UpdateStoreHeartbeatMetrics(store)
390-
}
391-
c.PutStore(newStore)
392-
c.hotStat.Observe(storeID, newStore.GetStoreStats())
385+
statistics.UpdateStoreHeartbeatMetrics(store)
386+
c.PutStore(store, core.SetStoreStats(stats), core.SetLastHeartbeatTS(time.Now()))
387+
c.hotStat.Observe(storeID, stats)
393388
c.hotStat.FilterUnhealthyStore(c)
394389
reportInterval := stats.GetInterval()
395390
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

pkg/mcs/scheduling/server/meta/watcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (w *Watcher) initializeStoreWatcher() error {
8383
if origin == nil {
8484
w.basicCluster.PutStore(core.NewStoreInfo(store))
8585
} else {
86-
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
86+
w.basicCluster.PutStore(origin, core.SetStoreMeta(store))
8787
}
8888

8989
if store.GetNodeState() == metapb.NodeState_Removed {

pkg/storage/endpoint/meta.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type MetaStorage interface {
3535
LoadStoreMeta(storeID uint64, store *metapb.Store) (bool, error)
3636
SaveStoreMeta(store *metapb.Store) error
3737
SaveStoreWeight(storeID uint64, leader, region float64) error
38-
LoadStores(f func(store *core.StoreInfo)) error
38+
LoadStores(f func(store *core.StoreInfo, opts ...core.StoreCreateOption)) error
3939
DeleteStoreMeta(store *metapb.Store) error
4040
RegionStorage
4141
}
@@ -92,7 +92,7 @@ func (se *StorageEndpoint) SaveStoreWeight(storeID uint64, leader, region float6
9292
}
9393

9494
// LoadStores loads all stores from storage to StoresInfo.
95-
func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error {
95+
func (se *StorageEndpoint) LoadStores(putStore func(store *core.StoreInfo, opts ...core.StoreCreateOption)) error {
9696
nextID := uint64(0)
9797
endKey := StorePath(math.MaxUint64)
9898
for {
@@ -120,10 +120,12 @@ func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error {
120120
if err != nil {
121121
return err
122122
}
123-
newStoreInfo := core.NewStoreInfo(store, core.SetLeaderWeight(leaderWeight), core.SetRegionWeight(regionWeight))
124123

125124
nextID = store.GetId() + 1
126-
f(newStoreInfo)
125+
putStore(core.NewStoreInfo(store,
126+
core.SetLeaderWeight(leaderWeight),
127+
core.SetRegionWeight(regionWeight),
128+
))
127129
}
128130
if len(res) < MinKVRangeLimit {
129131
return nil

server/cluster/cluster.go

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -855,32 +855,30 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
855855

856856
limit := store.GetStoreLimit()
857857
version := c.opt.GetStoreLimitVersion()
858-
var opt core.StoreCreateOption
858+
opts := make([]core.StoreCreateOption, 0)
859859
if limit == nil || limit.Version() != version {
860860
if version == storelimit.VersionV2 {
861861
limit = storelimit.NewSlidingWindows()
862862
} else {
863863
limit = storelimit.NewStoreRateLimit(0.0)
864864
}
865-
opt = core.SetStoreLimit(limit)
865+
opts = append(opts, core.SetStoreLimit(limit))
866866
}
867867

868868
nowTime := time.Now()
869-
var newStore *core.StoreInfo
870869
// If this cluster has slow stores, we should awaken hibernated regions in other stores.
871870
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
872871
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
873872
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
874-
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
873+
opts = append(opts, core.SetLastAwakenTime(nowTime))
875874
resp.AwakenRegions = &pdpb.AwakenRegions{
876875
AbnormalStores: slowStoreIDs,
877876
}
878-
} else {
879-
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
880877
}
881-
} else {
882-
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
883878
}
879+
opts = append(opts, core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
880+
881+
newStore := store.Clone(opts...)
884882

885883
if newStore.IsLowSpace(c.opt.GetLowSpaceRatio()) {
886884
log.Warn("store does not have enough disk space",
@@ -892,17 +890,15 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
892890
if err := c.storage.SaveStoreMeta(newStore.GetMeta()); err != nil {
893891
log.Error("failed to persist store", zap.Uint64("store-id", storeID), errs.ZapError(err))
894892
} else {
895-
newStore = newStore.Clone(core.SetLastPersistTime(nowTime))
893+
opts = append(opts, core.SetLastPersistTime(nowTime))
896894
}
897895
}
898-
if store := c.core.GetStore(storeID); store != nil {
899-
statistics.UpdateStoreHeartbeatMetrics(store)
900-
}
896+
statistics.UpdateStoreHeartbeatMetrics(store)
897+
c.core.PutStore(newStore, opts...)
901898
// Supply NodeState in the response to help the store handle special cases
902899
// more conveniently, such as avoiding calling `remove_peer` redundantly under
903900
// NodeState_Removing.
904901
resp.State = store.GetNodeState()
905-
c.core.PutStore(newStore)
906902
var (
907903
regions map[uint64]*core.RegionInfo
908904
interval uint64
@@ -1288,8 +1284,8 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error {
12881284
}
12891285

12901286
// PutStore puts a store.
1291-
func (c *RaftCluster) PutStore(store *metapb.Store) error {
1292-
if err := c.putStoreImpl(store, false); err != nil {
1287+
func (c *RaftCluster) PutStore(store *metapb.Store, opts ...core.StoreCreateOption) error {
1288+
if err := c.putStoreImpl(store, false, opts...); err != nil {
12931289
return err
12941290
}
12951291
c.OnStoreVersionChange()
@@ -1300,7 +1296,7 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {
13001296
// putStoreImpl puts a store.
13011297
// If 'force' is true, the store's labels will overwrite those labels which already existed in the store.
13021298
// If 'force' is false, the store's labels will merge into those labels which already existed in the store.
1303-
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
1299+
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool, opts ...core.StoreCreateOption) error {
13041300
c.Lock()
13051301
defer c.Unlock()
13061302

@@ -1333,19 +1329,19 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
13331329
if !force {
13341330
labels = core.MergeLabels(s.GetLabels(), labels)
13351331
}
1336-
// Update an existed store.
1337-
s = s.Clone(
1332+
opts = append(opts,
13381333
core.SetStoreAddress(store.Address, store.StatusAddress, store.PeerAddress),
13391334
core.SetStoreVersion(store.GitHash, store.Version),
13401335
core.SetStoreLabels(labels),
13411336
core.SetStoreStartTime(store.StartTimestamp),
1342-
core.SetStoreDeployPath(store.DeployPath),
1343-
)
1337+
core.SetStoreDeployPath(store.DeployPath))
1338+
// Update an existed store.
1339+
s = s.Clone(opts...)
13441340
}
13451341
if err := c.checkStoreLabels(s); err != nil {
13461342
return err
13471343
}
1348-
return c.putStoreLocked(s)
1344+
return c.putStoreLocked(s, opts...)
13491345
}
13501346

13511347
func (c *RaftCluster) checkStoreVersion(store *metapb.Store) error {
@@ -1415,12 +1411,15 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
14151411
return err
14161412
}
14171413
}
1418-
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed))
1414+
14191415
log.Warn("store has been offline",
14201416
zap.Uint64("store-id", storeID),
1421-
zap.String("store-address", newStore.GetAddress()),
1422-
zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed()))
1423-
err := c.putStoreLocked(newStore)
1417+
zap.String("store-address", store.GetAddress()),
1418+
zap.Bool("physically-destroyed", store.IsPhysicallyDestroyed()))
1419+
err := c.putStoreLocked(
1420+
store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed)),
1421+
core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed),
1422+
)
14241423
if err == nil {
14251424
regionSize := float64(c.core.GetStoreRegionSize(storeID))
14261425
c.resetProgress(storeID, store.GetAddress())
@@ -1506,13 +1505,15 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
15061505
}
15071506
}
15081507

1509-
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
15101508
log.Warn("store has been Tombstone",
15111509
zap.Uint64("store-id", storeID),
1512-
zap.String("store-address", newStore.GetAddress()),
1510+
zap.String("store-address", store.GetAddress()),
15131511
zap.String("state", store.GetState().String()),
15141512
zap.Bool("physically-destroyed", store.IsPhysicallyDestroyed()))
1515-
err := c.putStoreLocked(newStore)
1513+
err := c.putStoreLocked(
1514+
store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)),
1515+
core.SetStoreState(metapb.StoreState_Tombstone),
1516+
)
15161517
c.onStoreVersionChangeLocked()
15171518
if err == nil {
15181519
// clean up the residual information.
@@ -1627,7 +1628,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error {
16271628
log.Warn("store has been up",
16281629
zap.Uint64("store-id", storeID),
16291630
zap.String("store-address", newStore.GetAddress()))
1630-
err := c.putStoreLocked(newStore)
1631+
err := c.putStoreLocked(newStore, options...)
16311632
if err == nil {
16321633
if exist {
16331634
// persist the store limit
@@ -1665,7 +1666,7 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error {
16651666
log.Info("store has changed to serving",
16661667
zap.Uint64("store-id", storeID),
16671668
zap.String("store-address", newStore.GetAddress()))
1668-
err := c.putStoreLocked(newStore)
1669+
err := c.putStoreLocked(newStore, core.SetStoreState(metapb.StoreState_Up))
16691670
if err == nil {
16701671
c.resetProgress(storeID, store.GetAddress())
16711672
}
@@ -1683,21 +1684,19 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight
16831684
return err
16841685
}
16851686

1686-
newStore := store.Clone(
1687+
return c.putStoreLocked(store,
16871688
core.SetLeaderWeight(leaderWeight),
16881689
core.SetRegionWeight(regionWeight),
16891690
)
1690-
1691-
return c.putStoreLocked(newStore)
16921691
}
16931692

1694-
func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
1693+
func (c *RaftCluster) putStoreLocked(store *core.StoreInfo, opts ...core.StoreCreateOption) error {
16951694
if c.storage != nil {
16961695
if err := c.storage.SaveStoreMeta(store.GetMeta()); err != nil {
16971696
return err
16981697
}
16991698
}
1700-
c.core.PutStore(store)
1699+
c.core.PutStore(store, opts...)
17011700
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
17021701
c.updateStoreStatistics(store.GetID(), store.IsSlow())
17031702
}
@@ -2273,8 +2272,7 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
22732272
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
22742273
}
22752274

2276-
newStore := store.Clone(core.SetMinResolvedTS(minResolvedTS))
2277-
c.core.PutStore(newStore)
2275+
c.core.PutStore(store, core.SetMinResolvedTS(minResolvedTS))
22782276
return nil
22792277
}
22802278

0 commit comments

Comments
 (0)