Skip to content

Commit 050093d

Browse files
committed
Merge branch 'master' of github.com:tikv/pd into feat/router_grpc_impl
2 parents 9a85795 + 9003262 commit 050093d

File tree

7 files changed

+117
-18
lines changed

7 files changed

+117
-18
lines changed

client/clients/tso/stream_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/pingcap/log"
3030

3131
"github.com/tikv/pd/client/errs"
32+
"github.com/tikv/pd/client/pkg/utils/testutil"
3233
)
3334

3435
const mockStreamURL = "mock:///"
@@ -356,10 +357,11 @@ func (s *testTSOStreamSuite) TestTSOStreamBasic() {
356357

357358
// After an error from the (simulated) RPC stream, the tsoStream should be in a broken status and can't accept
358359
// new request anymore.
359-
err := s.stream.processRequests(1, 2, 3, 1, time.Now(), func(_result tsoRequestResult, _reqKeyspaceGroupID uint32, _err error) {
360-
panic("unreachable")
360+
testutil.Eventually(s.re, func() bool {
361+
return s.stream.processRequests(1, 2, 3, 1, time.Now(), func(_result tsoRequestResult, _reqKeyspaceGroupID uint32, err error) {
362+
s.re.Error(err)
363+
}) != nil
361364
})
362-
s.re.Error(err)
363365
}
364366

365367
func (s *testTSOStreamSuite) testTSOStreamBrokenImpl(err error, pendingRequests int) {

pkg/core/store.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type StoreInfo struct {
6363
pauseLeaderTransferOut atomic.Int64 // not allow to be used as source of transfer leader
6464
slowStoreEvicted atomic.Int64 // this store has been evicted as a slow store, should not transfer leader to it
6565
slowTrendEvicted atomic.Int64 // this store has been evicted as a slow store by trend, should not transfer leader to it
66+
stoppingStoreEvicted atomic.Int64 // this store has been evicted as a stopping store, should not transfer leader to it
6667
leaderCount int
6768
regionCount int
6869
learnerCount int
@@ -137,6 +138,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
137138
store.pauseLeaderTransferOut.Store(s.pauseLeaderTransferOut.Load())
138139
store.slowStoreEvicted.Store(s.slowStoreEvicted.Load())
139140
store.slowTrendEvicted.Store(s.slowTrendEvicted.Load())
141+
store.stoppingStoreEvicted.Store(s.stoppingStoreEvicted.Load())
140142
for _, opt := range opts {
141143
if opt != nil {
142144
opt(store)
@@ -183,6 +185,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
183185
store.pauseLeaderTransferOut.Store(s.pauseLeaderTransferOut.Load())
184186
store.slowStoreEvicted.Store(s.slowStoreEvicted.Load())
185187
store.slowTrendEvicted.Store(s.slowTrendEvicted.Load())
188+
store.stoppingStoreEvicted.Store(s.stoppingStoreEvicted.Load())
186189
for _, opt := range opts {
187190
opt(store)
188191
}
@@ -208,7 +211,7 @@ func (s *StoreInfo) EvictedAsSlowStore() bool {
208211

209212
// EvictedAsStoppingStore returns if the store should be evicted as a stopping store.
210213
func (s *StoreInfo) EvictedAsStoppingStore() bool {
211-
return s.rawStats.IsStopping
214+
return s.stoppingStoreEvicted.Load() > 0
212215
}
213216

214217
// IsEvictedAsSlowTrend returns if the store should be evicted as a slow store by trend.

pkg/core/store_option.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func SlowStoreEvicted() StoreCreateOption {
146146
// leader to the store
147147
func StoppingStoreEvicted() StoreCreateOption {
148148
return func(store *StoreInfo) {
149-
store.rawStats.IsStopping = true
149+
store.stoppingStoreEvicted.Add(1)
150150
}
151151
}
152152

@@ -175,7 +175,7 @@ func SlowStoreRecovered() StoreCreateOption {
175175
// StoppingStoreRecovered cleans the evicted state of a store.
176176
func StoppingStoreRecovered() StoreCreateOption {
177177
return func(store *StoreInfo) {
178-
store.rawStats.IsStopping = false
178+
store.stoppingStoreEvicted.Add(-1)
179179
}
180180
}
181181

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,15 @@ func (krgm *keyspaceResourceGroupManager) getServiceLimiter() *serviceLimiter {
278278
return krgm.sl
279279
}
280280

281+
func (krgm *keyspaceResourceGroupManager) getServiceLimit() (float64, bool) {
282+
krgm.RLock()
283+
defer krgm.RUnlock()
284+
if krgm.sl == nil || krgm.sl.ServiceLimit == 0 {
285+
return 0, false
286+
}
287+
return krgm.sl.ServiceLimit, true
288+
}
289+
281290
func (krgm *keyspaceResourceGroupManager) getOrCreateRUTracker(name string) *ruTracker {
282291
rt := krgm.getRUTracker(name)
283292
if rt == nil {
@@ -411,16 +420,16 @@ func (rt *ruTracker) getRUPerSec() float64 {
411420
// - No group exceeds its configured limits
412421
// - Inactive groups (no RU consumption) are skipped to avoid unnecessary computation
413422
func (krgm *keyspaceResourceGroupManager) conciliateFillRates() {
414-
serviceLimiter := krgm.getServiceLimiter()
423+
serviceLimit, isSet := krgm.getServiceLimit()
415424
// No need to conciliate if the service limit is not set or is 0.
416-
if serviceLimiter == nil || serviceLimiter.ServiceLimit == 0 {
425+
if !isSet || serviceLimit == 0 {
417426
return
418427
}
419428
priorityQueues := krgm.getPriorityQueues()
420429
if len(priorityQueues) == 0 {
421430
return
422431
}
423-
remainingServiceLimit := serviceLimiter.ServiceLimit
432+
remainingServiceLimit := serviceLimit
424433
for _, queue := range priorityQueues {
425434
if len(queue) == 0 {
426435
continue

server/cluster/cluster.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,8 +1809,9 @@ func (c *RaftCluster) checkStores() {
18091809
)
18101810

18111811
for _, store := range stores {
1812-
isInOffline := c.checkStore(store, stores)
1813-
if store.IsUp() && store.IsLowSpace(c.opt.GetLowSpaceRatio()) {
1812+
storeID := store.GetID()
1813+
isInUp, isInOffline := c.checkStore(storeID)
1814+
if isInUp {
18141815
upStoreCount++
18151816
}
18161817
if isInOffline {
@@ -1826,21 +1827,27 @@ func (c *RaftCluster) checkStores() {
18261827
}
18271828
}
18281829

1829-
func (c *RaftCluster) checkStore(store *core.StoreInfo, stores []*core.StoreInfo) (isInOffline bool) {
1830+
func (c *RaftCluster) checkStore(storeID uint64) (isInUp, isInOffline bool) {
1831+
c.storeStateLock.Lock(uint32(storeID))
1832+
defer c.storeStateLock.Unlock(uint32(storeID))
1833+
1834+
store := c.GetStore(storeID)
1835+
if store == nil {
1836+
log.Warn("store not found when checking, may be deleted", zap.Uint64("store-id", storeID))
1837+
return false, false
1838+
}
1839+
18301840
var (
18311841
regionSize = float64(store.GetRegionSize())
1832-
storeID = store.GetID()
18331842
threshold float64
18341843
)
1835-
c.storeStateLock.Lock(uint32(storeID))
1836-
defer c.storeStateLock.Unlock(uint32(storeID))
18371844
switch store.GetNodeState() {
18381845
case metapb.NodeState_Preparing:
18391846
readyToServe := store.GetUptime() >= c.opt.GetMaxStorePreparingTime() ||
18401847
c.GetTotalRegionCount() < core.InitClusterRegionThreshold
18411848
if !readyToServe && (c.IsPrepared() || (c.IsServiceIndependent(constant.SchedulingServiceName) && c.isStorePrepared())) {
18421849
kr := keyutil.NewKeyRange("", "")
1843-
threshold = c.getThreshold(stores, store, &kr)
1850+
threshold = c.getThreshold(c.GetStores(), store, &kr)
18441851
log.Debug("store preparing threshold", zap.Uint64("store-id", storeID),
18451852
zap.Float64("threshold", threshold),
18461853
zap.Float64("region-size", regionSize))
@@ -1886,7 +1893,11 @@ func (c *RaftCluster) checkStore(store *core.StoreInfo, stores []*core.StoreInfo
18861893
}
18871894
}
18881895
c.progressManager.UpdateProgress(store, regionSize, threshold)
1889-
return
1896+
// When store is preparing or serving, we think it is in up.
1897+
// `checkStore` only may change store state from preparing to serving.
1898+
// So we don't need to get store again.
1899+
isInUp = store.IsUp() && !store.IsLowSpace(c.opt.GetLowSpaceRatio())
1900+
return isInUp, isInOffline
18901901
}
18911902

18921903
func (c *RaftCluster) getThreshold(stores []*core.StoreInfo, store *core.StoreInfo, kr *keyutil.KeyRange) float64 {

server/cluster/cluster_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3921,6 +3921,78 @@ func TestConcurrentStoreStats(t *testing.T) {
39213921
wg.Wait()
39223922
}
39233923

3924+
func TestCheckStoresUpCountWithLowSpace(t *testing.T) {
3925+
re := require.New(t)
3926+
ctx, cancel := context.WithCancel(context.Background())
3927+
defer cancel()
3928+
3929+
_, opt, err := newTestScheduleConfig()
3930+
re.NoError(err)
3931+
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
3932+
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
3933+
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
3934+
if opt.IsPlacementRulesEnabled() {
3935+
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
3936+
if err != nil {
3937+
panic(err)
3938+
}
3939+
}
3940+
3941+
// Add stores with low space to the cluster.
3942+
storeCount := uint64(3)
3943+
for i := uint64(1); i <= storeCount; i++ {
3944+
store := &metapb.Store{
3945+
Id: i,
3946+
Address: fmt.Sprintf("mock://tikv-%d:%d", i, i),
3947+
StatusAddress: fmt.Sprintf("mock://tikv-%d:%d", i, i+1),
3948+
Version: "2.0.0",
3949+
DeployPath: getTestDeployPath(i),
3950+
NodeState: metapb.NodeState_Preparing,
3951+
}
3952+
err = cluster.PutMetaStore(store)
3953+
re.NoError(err)
3954+
req := &pdpb.StoreHeartbeatRequest{}
3955+
resp := &pdpb.StoreHeartbeatResponse{}
3956+
req.Stats = &pdpb.StoreStats{
3957+
StoreId: i,
3958+
Capacity: 100 * 1024, // 100 GiB
3959+
Available: 1 * 1024, // 1 GiB
3960+
RegionCount: 1,
3961+
}
3962+
re.NoError(cluster.HandleStoreHeartbeat(req, resp))
3963+
}
3964+
re.Equal(int(storeCount), cluster.GetStoreCount())
3965+
3966+
upStoreCount := 0
3967+
for _, s := range cluster.GetStores() {
3968+
isUp, _ := cluster.checkStore(s.GetID())
3969+
if isUp {
3970+
upStoreCount++
3971+
}
3972+
}
3973+
re.Equal(0, upStoreCount, "upStoreCount should be 0 when store is low space")
3974+
3975+
// Add stores to the cluster.
3976+
for i := uint64(1); i <= storeCount; i++ {
3977+
req := &pdpb.StoreHeartbeatRequest{}
3978+
resp := &pdpb.StoreHeartbeatResponse{}
3979+
req.Stats = &pdpb.StoreStats{
3980+
StoreId: i,
3981+
Capacity: 100 * 1024, // 100 GiB
3982+
Available: 50 * 1024, // 50 GiB
3983+
RegionCount: 1,
3984+
}
3985+
re.NoError(cluster.HandleStoreHeartbeat(req, resp))
3986+
}
3987+
for _, s := range cluster.GetStores() {
3988+
isUp, _ := cluster.checkStore(s.GetID())
3989+
if isUp {
3990+
upStoreCount++
3991+
}
3992+
}
3993+
re.Equal(int(storeCount), upStoreCount, "upStoreCount should be 3 when store is not low space")
3994+
}
3995+
39243996
func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
39253997
var res *pdpb.RegionHeartbeatResponse
39263998
testutil.Eventually(re, func() bool {

tests/server/gc/gc_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,9 @@ func TestGCOperations(t *testing.T) {
350350
re.Nil(resp.Header.Error)
351351
re.Equal("b1", resp.GetDeletedBarrierInfo().GetBarrierId())
352352
re.Equal(uint64(20), resp.GetDeletedBarrierInfo().GetBarrierTs())
353-
re.Equal(3600, int(resp.GetDeletedBarrierInfo().GetTtlSeconds()))
353+
// The TTL value decreases over time
354+
re.Greater(resp.GetDeletedBarrierInfo().GetTtlSeconds(), int64(3595))
355+
re.LessOrEqual(resp.GetDeletedBarrierInfo().GetTtlSeconds(), int64(3600))
354356
}
355357

356358
for _, keyspaceID := range []uint32{constant.NullKeyspaceID, ks1.Id} {

0 commit comments

Comments
 (0)