Skip to content

Commit 52966d8

Browse files
ti-chi-botHuSharp
andauthored
coordinator: use a healthy region count to start coordinator (#7044) (#7364)
close #6988, close #7016 Signed-off-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: Hu# <jinhao.hu@pingcap.com>
1 parent 85f1525 commit 52966d8

File tree

11 files changed

+126
-29
lines changed

11 files changed

+126
-29
lines changed

pkg/core/region.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,26 @@ type RegionInfo struct {
7373
queryStats *pdpb.QueryStats
7474
flowRoundDivisor uint64
7575
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
76-
buckets unsafe.Pointer
77-
fromHeartbeat bool
76+
buckets unsafe.Pointer
77+
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
78+
source RegionSource
79+
}
80+
81+
// RegionSource is the source of region.
82+
type RegionSource uint32
83+
84+
const (
85+
// Storage means this region's meta info might be stale.
86+
Storage RegionSource = iota
87+
// Sync means this region's meta info is relatively fresher.
88+
Sync
89+
// Heartbeat means this region's meta info is relatively fresher.
90+
Heartbeat
91+
)
92+
93+
// LoadedFromStorage means this region's meta info loaded from storage.
94+
func (r *RegionInfo) LoadedFromStorage() bool {
95+
return r.source == Storage
7896
}
7997

8098
// NewRegionInfo creates RegionInfo with region's meta and leader peer.
@@ -171,6 +189,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
171189
interval: heartbeat.GetInterval(),
172190
replicationStatus: heartbeat.GetReplicationStatus(),
173191
queryStats: heartbeat.GetQueryStats(),
192+
source: Heartbeat,
174193
}
175194

176195
for _, opt := range opts {
@@ -633,11 +652,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
633652
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
634653
}
635654

636-
// IsFromHeartbeat returns whether the region info is from the region heartbeat.
637-
func (r *RegionInfo) IsFromHeartbeat() bool {
638-
return r.fromHeartbeat
639-
}
640-
641655
func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
642656
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
643657
}
@@ -677,7 +691,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
677691
}
678692
saveKV, saveCache, isNew = true, true, true
679693
} else {
680-
if !origin.IsFromHeartbeat() {
694+
if origin.LoadedFromStorage() {
681695
isNew = true
682696
}
683697
r := region.GetRegionEpoch()
@@ -1275,6 +1289,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
12751289
return
12761290
}
12771291

1292+
// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
1293+
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
1294+
r.t.RLock()
1295+
defer r.t.RUnlock()
1296+
return r.tree.notFromStorageRegionsCnt
1297+
}
1298+
12781299
// GetMetaRegions gets a set of metapb.Region from regionMap
12791300
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
12801301
r.t.RLock()

pkg/core/region_option.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
374374
}
375375
}
376376

377-
// SetFromHeartbeat sets if the region info comes from the region heartbeat.
378-
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
377+
// SetSource sets the region info's come from.
378+
func SetSource(source RegionSource) RegionCreateOption {
379379
return func(region *RegionInfo) {
380-
region.fromHeartbeat = fromHeartbeat
380+
region.source = source
381381
}
382382
}
383383

pkg/core/region_tree.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,17 @@ type regionTree struct {
6161
totalSize int64
6262
totalWriteBytesRate float64
6363
totalWriteKeysRate float64
64+
// count the number of regions that not loaded from storage.
65+
notFromStorageRegionsCnt int
6466
}
6567

6668
func newRegionTree() *regionTree {
6769
return &regionTree{
68-
tree: btree.NewG[*regionItem](defaultBTreeDegree),
69-
totalSize: 0,
70-
totalWriteBytesRate: 0,
71-
totalWriteKeysRate: 0,
70+
tree: btree.NewG[*regionItem](defaultBTreeDegree),
71+
totalSize: 0,
72+
totalWriteBytesRate: 0,
73+
totalWriteKeysRate: 0,
74+
notFromStorageRegionsCnt: 0,
7275
}
7376
}
7477

@@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
112115
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
113116
t.totalWriteBytesRate += regionWriteBytesRate
114117
t.totalWriteKeysRate += regionWriteKeysRate
118+
if !region.LoadedFromStorage() {
119+
t.notFromStorageRegionsCnt++
120+
}
115121

116122
if !withOverlaps {
117123
overlaps = t.overlaps(item)
@@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
133139
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
134140
t.totalWriteBytesRate -= regionWriteBytesRate
135141
t.totalWriteKeysRate -= regionWriteKeysRate
142+
if !old.LoadedFromStorage() {
143+
t.notFromStorageRegionsCnt--
144+
}
136145
}
137146

138147
return result
@@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
149158
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
150159
t.totalWriteBytesRate -= regionWriteBytesRate
151160
t.totalWriteKeysRate -= regionWriteKeysRate
161+
162+
// If the region meta information not loaded from storage anymore, decrease the counter.
163+
if origin.LoadedFromStorage() && !region.LoadedFromStorage() {
164+
t.notFromStorageRegionsCnt++
165+
}
166+
// If the region meta information updated to load from storage, increase the counter.
167+
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
168+
t.notFromStorageRegionsCnt--
169+
}
152170
}
153171

154172
// remove removes a region if the region is in the tree.
@@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
168186
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
169187
t.totalWriteBytesRate -= regionWriteBytesRate
170188
t.totalWriteKeysRate -= regionWriteKeysRate
189+
if !region.LoadedFromStorage() {
190+
t.notFromStorageRegionsCnt--
191+
}
171192
t.tree.Delete(item)
172193
}
173194

pkg/storage/endpoint/meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
203203
}
204204

205205
nextID = region.GetId() + 1
206-
overlaps := f(core.NewRegionInfo(region, nil))
206+
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
207207
for _, item := range overlaps {
208208
if err := se.DeleteRegion(item.GetMeta()); err != nil {
209209
return err

pkg/storage/storage_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
407407
}
408408

409409
func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
410+
re := require.New(b)
410411
ctx := context.Background()
411412
dir := b.TempDir()
412413
lb, err := newLevelDBBackend(ctx, dir, nil)
@@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
426427
}()
427428

428429
b.ResetTimer()
429-
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
430-
if err != nil {
431-
b.Fatal(err)
432-
}
430+
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
431+
re.NoError(err)
433432
}
434433

435434
var volumes = []struct {

server/cluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
491491
start = time.Now()
492492

493493
// used to load region from kv storage to cache storage.
494-
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
494+
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
495495
return nil, err
496496
}
497497
log.Info("load regions",

server/cluster/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ func TestRegionSizeChanged(t *testing.T) {
989989
core.WithLeader(region.GetPeers()[2]),
990990
core.SetApproximateSize(curMaxMergeSize-1),
991991
core.SetApproximateKeys(curMaxMergeKeys-1),
992-
core.SetFromHeartbeat(true),
992+
core.SetSource(core.Heartbeat),
993993
)
994994
cluster.processRegionHeartbeat(region)
995995
regionID := region.GetID()
@@ -999,7 +999,7 @@ func TestRegionSizeChanged(t *testing.T) {
999999
core.WithLeader(region.GetPeers()[2]),
10001000
core.SetApproximateSize(curMaxMergeSize+1),
10011001
core.SetApproximateKeys(curMaxMergeKeys+1),
1002-
core.SetFromHeartbeat(true),
1002+
core.SetSource(core.Heartbeat),
10031003
)
10041004
cluster.processRegionHeartbeat(region)
10051005
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))

server/cluster/prepare_checker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package cluster
1717
import (
1818
"time"
1919

20+
"github.com/pingcap/log"
2021
"github.com/tikv/pd/pkg/core"
2122
"github.com/tikv/pd/pkg/utils/syncutil"
23+
"go.uber.org/zap"
2224
)
2325

2426
type prepareChecker struct {
@@ -47,8 +49,16 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
4749
checker.prepared = true
4850
return true
4951
}
52+
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
53+
totalRegionsCnt := c.GetRegionCount()
54+
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
55+
log.Info("meta not loaded from region number is satisfied, finish prepare checker",
56+
zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
57+
checker.prepared = true
58+
return true
59+
}
5060
// The number of active regions should be more than total region of all stores * collectFactor
51-
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
61+
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
5262
return false
5363
}
5464
for _, store := range c.GetStores() {

server/grpc_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
812812
lastBind = time.Now()
813813
}
814814

815-
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
815+
region := core.RegionFromHeartbeat(request, flowRoundOption)
816816
if region.GetLeader() == nil {
817817
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
818818
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()

server/region_syncer/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
126126
log.Info("region syncer start load region")
127127
start := time.Now()
128128
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
129-
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
129+
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
130130
if err != nil {
131131
log.Warn("failed to load regions", errs.ZapError(err))
132132
}
@@ -207,10 +207,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
207207
core.SetWrittenKeys(stats[i].KeysWritten),
208208
core.SetReadBytes(stats[i].BytesRead),
209209
core.SetReadKeys(stats[i].KeysRead),
210-
core.SetFromHeartbeat(false),
210+
core.SetSource(core.Sync),
211211
)
212212
} else {
213-
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
213+
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync))
214214
}
215215

216216
origin, _, err := bc.PreCheckPutRegion(region)

0 commit comments

Comments
 (0)