-
Notifications
You must be signed in to change notification settings - Fork 759
coordinator: use a healthy region count to start coordinator #7044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
accb06b
46406e7
d161acb
6be6f17
1f16cda
68a1107
c67ed32
67b4081
7bae37c
382816d
f89619a
4eebfe3
e90fa56
cc2d832
664427a
8a0c520
c88260d
32f856f
be7296f
99dc596
0663294
183649a
45d2cc5
6281684
a9d6e3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,8 +71,36 @@ type RegionInfo struct { | |
| queryStats *pdpb.QueryStats | ||
| flowRoundDivisor uint64 | ||
| // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. | ||
| buckets unsafe.Pointer | ||
| fromHeartbeat bool | ||
| buckets unsafe.Pointer | ||
| // source is used to indicate region's source, such as FromHeartbeat/FromSync/FromStorage. | ||
| source RegionSource | ||
| } | ||
|
|
||
| // RegionSource is the source of region. | ||
| type RegionSource uint32 | ||
|
|
||
| const ( | ||
| // FromStorage means this region's meta info might be stale. | ||
| FromStorage RegionSource = iota | ||
| // FromSync means this region's meta info might be stale. | ||
| FromSync | ||
| // FromHeartbeat means this region's meta info is relatively fresher. | ||
| FromHeartbeat | ||
| ) | ||
|
|
||
| // IsSourceStale means this region's meta info might be stale. | ||
| func (r *RegionInfo) IsSourceStale() bool { | ||
| return r.source == FromStorage || r.source == FromSync | ||
|
||
| } | ||
|
|
||
| // IsSourceFresh means this region's meta info is relatively fresher. | ||
| func (r *RegionInfo) IsSourceFresh() bool { | ||
| return r.source == FromHeartbeat | ||
| } | ||
|
|
||
| // GetRegionSource returns the region source. | ||
| func (r *RegionInfo) GetRegionSource() RegionSource { | ||
| return r.source | ||
| } | ||
|
|
||
| // NewRegionInfo creates RegionInfo with region's meta and leader peer. | ||
|
|
@@ -172,6 +200,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC | |
| interval: heartbeat.GetInterval(), | ||
| replicationStatus: heartbeat.GetReplicationStatus(), | ||
| queryStats: heartbeat.GetQueryStats(), | ||
| source: FromHeartbeat, | ||
| } | ||
|
|
||
| for _, opt := range opts { | ||
|
|
@@ -637,11 +666,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool { | |
| return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback | ||
| } | ||
|
|
||
| // IsFromHeartbeat returns whether the region info is from the region heartbeat. | ||
| func (r *RegionInfo) IsFromHeartbeat() bool { | ||
| return r.fromHeartbeat | ||
| } | ||
|
|
||
| func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool { | ||
| return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0)) | ||
| } | ||
|
|
@@ -681,7 +705,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { | |
| } | ||
| saveKV, saveCache, isNew = true, true, true | ||
| } else { | ||
| if !origin.IsFromHeartbeat() { | ||
| if origin.IsSourceStale() { | ||
| isNew = true | ||
| } | ||
| r := region.GetRegionEpoch() | ||
|
|
@@ -838,6 +862,8 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { | |
| origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) | ||
| r.t.Unlock() | ||
| r.UpdateSubTree(region, origin, overlaps, rangeChanged) | ||
| // FromStorage means this region's meta info might be stale. | ||
| r.AtomicAddStaleRegionCnt() | ||
|
||
| return overlaps | ||
| } | ||
|
|
||
|
|
@@ -855,6 +881,21 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg | |
| return origin, overlaps, err | ||
| } | ||
|
|
||
| // GetStaleRegionCnt returns the stale region count. | ||
| func (r *RegionsInfo) GetStaleRegionCnt() int64 { | ||
| r.t.RLock() | ||
| defer r.t.RUnlock() | ||
| if r.tree.length() == 0 { | ||
| return 0 | ||
| } | ||
| return r.tree.staleRegionCnt | ||
| } | ||
|
|
||
| // AtomicAddStaleRegionCnt atomically adds the stale region count. | ||
| func (r *RegionsInfo) AtomicAddStaleRegionCnt() { | ||
|
||
| r.tree.AtomicAddStaleRegionCnt() | ||
| } | ||
|
|
||
| // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. | ||
| func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { | ||
| r.t.Lock() | ||
|
|
@@ -868,6 +909,10 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo | |
| r.t.Unlock() | ||
| return nil, err | ||
| } | ||
| // If origin is stale, need to sub the stale region count. | ||
| if origin != nil && origin.IsSourceStale() && region.IsSourceFresh() { | ||
| r.tree.AtomicSubStaleRegionCnt() | ||
| } | ||
| origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) | ||
| r.t.Unlock() | ||
| r.UpdateSubTree(region, origin, overlaps, rangeChanged) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ package core | |
| import ( | ||
| "bytes" | ||
| "math/rand" | ||
| "sync/atomic" | ||
|
|
||
| "github.com/pingcap/kvproto/pkg/metapb" | ||
| "github.com/pingcap/log" | ||
|
|
@@ -61,6 +62,8 @@ type regionTree struct { | |
| totalSize int64 | ||
| totalWriteBytesRate float64 | ||
| totalWriteKeysRate float64 | ||
| // count the stale meta regions | ||
| staleRegionCnt int64 | ||
| } | ||
|
|
||
| func newRegionTree() *regionTree { | ||
|
|
@@ -69,6 +72,7 @@ func newRegionTree() *regionTree { | |
| totalSize: 0, | ||
| totalWriteBytesRate: 0, | ||
| totalWriteKeysRate: 0, | ||
| staleRegionCnt: 0, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -342,3 +346,20 @@ func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) { | |
| } | ||
| return t.totalWriteBytesRate, t.totalWriteKeysRate | ||
| } | ||
|
|
||
| func (t *regionTree) AtomicAddStaleRegionCnt() { | ||
| if t.length() == 0 { | ||
|
||
| return | ||
| } | ||
| atomic.AddInt64(&t.staleRegionCnt, 1) | ||
| } | ||
|
|
||
| func (t *regionTree) AtomicSubStaleRegionCnt() { | ||
| if t.length() == 0 { | ||
| return | ||
| } | ||
| if t.staleRegionCnt == 0 { | ||
HuSharp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| } | ||
| atomic.AddInt64(&t.staleRegionCnt, -1) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,10 @@ package schedule | |
| import ( | ||
| "time" | ||
|
|
||
| "github.com/pingcap/log" | ||
| "github.com/tikv/pd/pkg/core" | ||
| "github.com/tikv/pd/pkg/utils/syncutil" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| type prepareChecker struct { | ||
|
|
@@ -47,6 +49,11 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { | |
| checker.prepared = true | ||
| return true | ||
| } | ||
| if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) { | ||
| log.Info("stale meta region number is satisfied, skip prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount())) | ||
HuSharp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| checker.prepared = true | ||
| return true | ||
| } | ||
| // The number of active regions should be more than total region of all stores * collectFactor | ||
| if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) { | ||
|
||
| return false | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) { | |
| core.WithLeader(region.GetPeers()[2]), | ||
| core.SetApproximateSize(curMaxMergeSize-1), | ||
| core.SetApproximateKeys(curMaxMergeKeys-1), | ||
| core.SetFromHeartbeat(true), | ||
| core.SetSource(core.FromHeartbeat), | ||
|
||
| ) | ||
| cluster.processRegionHeartbeat(region) | ||
| regionID := region.GetID() | ||
|
|
@@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) { | |
| core.WithLeader(region.GetPeers()[2]), | ||
| core.SetApproximateSize(curMaxMergeSize+1), | ||
| core.SetApproximateKeys(curMaxMergeKeys+1), | ||
| core.SetFromHeartbeat(true), | ||
| core.SetSource(core.FromHeartbeat), | ||
| ) | ||
| cluster.processRegionHeartbeat(region) | ||
| re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) | ||
|
|
@@ -2375,6 +2375,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er | |
| peer, _ := c.AllocPeer(id) | ||
| region.Peers = append(region.Peers, peer) | ||
| } | ||
| c.core.AtomicAddStaleRegionCnt() | ||
| return c.putRegion(core.NewRegionInfo(region, nil)) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about not using stale/fresh? Just use
LoadedFromStorageand!LoadedFromStorage. Or something like that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed