Skip to content

Commit d016fed

Browse files
ti-chi-botHuSharpti-chi-bot[bot]
authored
coordinator: use a healthy region count to start coordinator (#7044) (#7363)
close #6988, close #7016 Signed-off-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: husharp <jinhao.hu@pingcap.com> Co-authored-by: Hu# <jinhao.hu@pingcap.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 5a95241 commit d016fed

File tree

11 files changed

+126
-29
lines changed

11 files changed

+126
-29
lines changed

server/cluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
379379
start = time.Now()
380380

381381
// used to load region from kv storage to cache storage.
382-
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
382+
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
383383
return nil, err
384384
}
385385
log.Info("load regions",

server/cluster/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ func TestRegionSizeChanged(t *testing.T) {
978978
core.WithLeader(region.GetPeers()[2]),
979979
core.SetApproximateSize(curMaxMergeSize-1),
980980
core.SetApproximateKeys(curMaxMergeKeys-1),
981-
core.SetFromHeartbeat(true),
981+
core.SetSource(core.Heartbeat),
982982
)
983983
cluster.processRegionHeartbeat(region)
984984
regionID := region.GetID()
@@ -988,7 +988,7 @@ func TestRegionSizeChanged(t *testing.T) {
988988
core.WithLeader(region.GetPeers()[2]),
989989
core.SetApproximateSize(curMaxMergeSize+1),
990990
core.SetApproximateKeys(curMaxMergeKeys+1),
991-
core.SetFromHeartbeat(true),
991+
core.SetSource(core.Heartbeat),
992992
)
993993
cluster.processRegionHeartbeat(region)
994994
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))

server/cluster/prepare_checker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package cluster
1717
import (
1818
"time"
1919

20+
"github.com/pingcap/log"
2021
"github.com/tikv/pd/pkg/syncutil"
2122
"github.com/tikv/pd/server/core"
23+
"go.uber.org/zap"
2224
)
2325

2426
type prepareChecker struct {
@@ -47,8 +49,16 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
4749
checker.prepared = true
4850
return true
4951
}
52+
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
53+
totalRegionsCnt := c.GetRegionCount()
54+
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
55+
log.Info("meta not loaded from region number is satisfied, finish prepare checker",
56+
zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
57+
checker.prepared = true
58+
return true
59+
}
5060
// The number of active regions should be more than total region of all stores * collectFactor
51-
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
61+
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
5262
return false
5363
}
5464
for _, store := range c.GetStores() {

server/core/region.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,26 @@ type RegionInfo struct {
7171
queryStats *pdpb.QueryStats
7272
flowRoundDivisor uint64
7373
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
74-
buckets unsafe.Pointer
75-
fromHeartbeat bool
74+
buckets unsafe.Pointer
75+
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
76+
source RegionSource
77+
}
78+
79+
// RegionSource is the source of region.
80+
type RegionSource uint32
81+
82+
const (
83+
// Storage means this region's meta info might be stale.
84+
Storage RegionSource = iota
85+
// Sync means this region's meta info is relatively fresher.
86+
Sync
87+
// Heartbeat means this region's meta info is relatively fresher.
88+
Heartbeat
89+
)
90+
91+
// LoadedFromStorage means this region's meta info loaded from storage.
92+
func (r *RegionInfo) LoadedFromStorage() bool {
93+
return r.source == Storage
7694
}
7795

7896
// NewRegionInfo creates RegionInfo with region's meta and leader peer.
@@ -169,6 +187,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
169187
interval: heartbeat.GetInterval(),
170188
replicationStatus: heartbeat.GetReplicationStatus(),
171189
queryStats: heartbeat.GetQueryStats(),
190+
source: Heartbeat,
172191
}
173192

174193
for _, opt := range opts {
@@ -591,11 +610,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
591610
return r.meta.IsInFlashback != l.meta.IsInFlashback
592611
}
593612

594-
// IsFromHeartbeat returns whether the region info is from the region heartbeat.
595-
func (r *RegionInfo) IsFromHeartbeat() bool {
596-
return r.fromHeartbeat
597-
}
598-
599613
func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
600614
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
601615
}
@@ -704,7 +718,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
704718
if region.IsFlashbackChanged(origin) {
705719
saveCache = true
706720
}
707-
if !origin.IsFromHeartbeat() {
721+
if origin.LoadedFromStorage() {
708722
isNew = true
709723
}
710724
}
@@ -1173,6 +1187,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
11731187
return
11741188
}
11751189

1190+
// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
1191+
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
1192+
r.t.RLock()
1193+
defer r.t.RUnlock()
1194+
return r.tree.notFromStorageRegionsCnt
1195+
}
1196+
11761197
// GetMetaRegions gets a set of metapb.Region from regionMap
11771198
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
11781199
r.t.RLock()

server/core/region_option.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,10 +369,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
369369
}
370370
}
371371

372-
// SetFromHeartbeat sets if the region info comes from the region heartbeat.
373-
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
372+
// SetSource sets the region info's come from.
373+
func SetSource(source RegionSource) RegionCreateOption {
374374
return func(region *RegionInfo) {
375-
region.fromHeartbeat = fromHeartbeat
375+
region.source = source
376376
}
377377
}
378378

server/core/region_tree.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,17 @@ type regionTree struct {
6262
totalSize int64
6363
totalWriteBytesRate float64
6464
totalWriteKeysRate float64
65+
// count the number of regions that not loaded from storage.
66+
notFromStorageRegionsCnt int
6567
}
6668

6769
func newRegionTree() *regionTree {
6870
return &regionTree{
69-
tree: btree.NewG[*regionItem](defaultBTreeDegree),
70-
totalSize: 0,
71-
totalWriteBytesRate: 0,
72-
totalWriteKeysRate: 0,
71+
tree: btree.NewG[*regionItem](defaultBTreeDegree),
72+
totalSize: 0,
73+
totalWriteBytesRate: 0,
74+
totalWriteKeysRate: 0,
75+
notFromStorageRegionsCnt: 0,
7376
}
7477
}
7578

@@ -124,6 +127,9 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo {
124127
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
125128
t.totalWriteBytesRate += regionWriteBytesRate
126129
t.totalWriteKeysRate += regionWriteKeysRate
130+
if !region.LoadedFromStorage() {
131+
t.notFromStorageRegionsCnt++
132+
}
127133

128134
overlaps := t.overlaps(item)
129135
for _, old := range overlaps {
@@ -142,6 +148,9 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo {
142148
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
143149
t.totalWriteBytesRate -= regionWriteBytesRate
144150
t.totalWriteKeysRate -= regionWriteKeysRate
151+
if !old.LoadedFromStorage() {
152+
t.notFromStorageRegionsCnt--
153+
}
145154
}
146155

147156
return result
@@ -158,6 +167,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
158167
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
159168
t.totalWriteBytesRate -= regionWriteBytesRate
160169
t.totalWriteKeysRate -= regionWriteKeysRate
170+
171+
// If the region meta information not loaded from storage anymore, decrease the counter.
172+
if origin.LoadedFromStorage() && !region.LoadedFromStorage() {
173+
t.notFromStorageRegionsCnt++
174+
}
175+
// If the region meta information updated to load from storage, increase the counter.
176+
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
177+
t.notFromStorageRegionsCnt--
178+
}
161179
}
162180

163181
// remove removes a region if the region is in the tree.
@@ -177,6 +195,9 @@ func (t *regionTree) remove(region *RegionInfo) {
177195
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
178196
t.totalWriteBytesRate -= regionWriteBytesRate
179197
t.totalWriteKeysRate -= regionWriteKeysRate
198+
if !region.LoadedFromStorage() {
199+
t.notFromStorageRegionsCnt--
200+
}
180201
t.tree.Delete(item)
181202
}
182203

server/grpc_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
937937
lastBind = time.Now()
938938
}
939939

940-
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
940+
region := core.RegionFromHeartbeat(request, flowRoundOption)
941941
if region.GetLeader() == nil {
942942
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
943943
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()

server/region_syncer/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
126126
log.Info("region syncer start load region")
127127
start := time.Now()
128128
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
129-
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
129+
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
130130
if err != nil {
131131
log.Warn("failed to load regions.", errs.ZapError(err))
132132
}
@@ -207,10 +207,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
207207
core.SetWrittenKeys(stats[i].KeysWritten),
208208
core.SetReadBytes(stats[i].BytesRead),
209209
core.SetReadKeys(stats[i].KeysRead),
210-
core.SetFromHeartbeat(false),
210+
core.SetSource(core.Sync),
211211
)
212212
} else {
213-
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
213+
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync))
214214
}
215215

216216
origin, err := bc.PreCheckPutRegion(region)

server/storage/endpoint/meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
203203
}
204204

205205
nextID = region.GetId() + 1
206-
overlaps := f(core.NewRegionInfo(region, nil))
206+
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
207207
for _, item := range overlaps {
208208
if err := se.DeleteRegion(item.GetMeta()); err != nil {
209209
return err

server/storage/storage_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
407407
}
408408

409409
func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
410+
re := require.New(b)
410411
ctx := context.Background()
411412
dir := b.TempDir()
412413
lb, err := newLevelDBBackend(ctx, dir, nil)
@@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
426427
}()
427428

428429
b.ResetTimer()
429-
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
430-
if err != nil {
431-
b.Fatal(err)
432-
}
430+
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
431+
re.NoError(err)
433432
}
434433

435434
var volumes = []struct {

0 commit comments

Comments
 (0)