Skip to content

Commit accb06b

Browse files
committed
fix
Signed-off-by: husharp <jinhao.hu@pingcap.com>
1 parent 745c942 commit accb06b

File tree

8 files changed

+85
-17
lines changed

8 files changed

+85
-17
lines changed

pkg/core/region.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,14 @@ type RegionInfo struct {
7171
queryStats *pdpb.QueryStats
7272
flowRoundDivisor uint64
7373
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
74-
buckets unsafe.Pointer
75-
fromHeartbeat bool
74+
buckets unsafe.Pointer
75+
// source is used to indicate region's source, such as FromHeartbeat/FromSync/InDisk.
76+
source RegionSource
77+
}
78+
79+
// GetRegionSource returns the region source.
80+
func (r *RegionInfo) GetRegionSource() RegionSource {
81+
return r.source
7682
}
7783

7884
// NewRegionInfo creates RegionInfo with region's meta and leader peer.
@@ -171,6 +177,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
171177
interval: heartbeat.GetInterval(),
172178
replicationStatus: heartbeat.GetReplicationStatus(),
173179
queryStats: heartbeat.GetQueryStats(),
180+
source: FromHeartbeat,
174181
}
175182

176183
for _, opt := range opts {
@@ -639,11 +646,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
639646
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
640647
}
641648

642-
// IsFromHeartbeat returns whether the region info is from the region heartbeat.
643-
func (r *RegionInfo) IsFromHeartbeat() bool {
644-
return r.fromHeartbeat
645-
}
646-
647649
func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
648650
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
649651
}
@@ -683,7 +685,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
683685
}
684686
saveKV, saveCache, isNew = true, true, true
685687
} else {
686-
if !origin.IsFromHeartbeat() {
688+
if origin.source == FromSync || origin.source == InDisk {
687689
isNew = true
688690
}
689691
r := region.GetRegionEpoch()
@@ -793,6 +795,18 @@ type RegionsInfo struct {
793795
pendingPeers map[uint64]*regionTree // storeID -> sub regionTree
794796
}
795797

798+
// RegionSource is the source of region.
799+
type RegionSource uint32
800+
801+
const (
802+
// InDisk means region is stale.
803+
InDisk RegionSource = iota
804+
// FromSync means region is stale.
805+
FromSync
806+
// FromHeartbeat means region is fresh.
807+
FromHeartbeat
808+
)
809+
796810
// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
797811
func NewRegionsInfo() *RegionsInfo {
798812
return &RegionsInfo{
@@ -840,6 +854,8 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
840854
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
841855
r.t.Unlock()
842856
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
857+
// InDisk means region is stale.
858+
r.AtomicAddStaleRegionCnt()
843859
return overlaps
844860
}
845861

@@ -857,6 +873,21 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg
857873
return origin, overlaps, err
858874
}
859875

876+
// GetStaleRegionCnt returns the stale region count.
877+
func (r *RegionsInfo) GetStaleRegionCnt() int64 {
878+
r.t.RLock()
879+
defer r.t.RUnlock()
880+
if r.tree.length() == 0 {
881+
return 0
882+
}
883+
return r.tree.staleRegionCnt
884+
}
885+
886+
// AtomicAddStaleRegionCnt atomically adds the stale region count.
887+
func (r *RegionsInfo) AtomicAddStaleRegionCnt() {
888+
r.tree.AtomicAddStaleRegionCnt()
889+
}
890+
860891
// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
861892
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
862893
r.t.Lock()
@@ -870,6 +901,10 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo
870901
r.t.Unlock()
871902
return nil, err
872903
}
904+
// If origin is stale, need to sub the stale region count.
905+
if origin != nil && origin.source != FromHeartbeat && region.source == FromHeartbeat {
906+
r.tree.AtomicSubStaleRegionCnt()
907+
}
873908
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
874909
r.t.Unlock()
875910
r.UpdateSubTree(region, origin, overlaps, rangeChanged)

pkg/core/region_option.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
381381
}
382382
}
383383

384-
// SetFromHeartbeat sets if the region info comes from the region heartbeat.
385-
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
384+
// SetSource sets the region info's come from.
385+
func SetSource(source RegionSource) RegionCreateOption {
386386
return func(region *RegionInfo) {
387-
region.fromHeartbeat = fromHeartbeat
387+
region.source = source
388388
}
389389
}
390390

pkg/core/region_tree.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package core
1616
import (
1717
"bytes"
1818
"math/rand"
19+
"sync/atomic"
1920

2021
"github.com/pingcap/kvproto/pkg/metapb"
2122
"github.com/pingcap/log"
@@ -61,6 +62,8 @@ type regionTree struct {
6162
totalSize int64
6263
totalWriteBytesRate float64
6364
totalWriteKeysRate float64
65+
66+
staleRegionCnt int64
6467
}
6568

6669
func newRegionTree() *regionTree {
@@ -69,6 +72,7 @@ func newRegionTree() *regionTree {
6972
totalSize: 0,
7073
totalWriteBytesRate: 0,
7174
totalWriteKeysRate: 0,
75+
staleRegionCnt: 0,
7276
}
7377
}
7478

@@ -342,3 +346,20 @@ func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) {
342346
}
343347
return t.totalWriteBytesRate, t.totalWriteKeysRate
344348
}
349+
350+
func (t *regionTree) AtomicAddStaleRegionCnt() {
351+
if t.length() == 0 {
352+
return
353+
}
354+
atomic.AddInt64(&t.staleRegionCnt, 1)
355+
}
356+
357+
func (t *regionTree) AtomicSubStaleRegionCnt() {
358+
if t.length() == 0 {
359+
return
360+
}
361+
if t.staleRegionCnt == 0 {
362+
return
363+
}
364+
atomic.AddInt64(&t.staleRegionCnt, -1)
365+
}

pkg/schedule/prepare_checker.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package schedule
1717
import (
1818
"time"
1919

20+
"github.com/pingcap/log"
2021
"github.com/tikv/pd/pkg/core"
2122
"github.com/tikv/pd/pkg/utils/syncutil"
23+
"go.uber.org/zap"
2224
)
2325

2426
type prepareChecker struct {
@@ -47,6 +49,11 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
4749
checker.prepared = true
4850
return true
4951
}
52+
if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) {
53+
log.Info("stale region num is satisfied, skip prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
54+
checker.prepared = true
55+
return true
56+
}
5057
// The number of active regions should be more than total region of all stores * collectFactor
5158
if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) {
5259
return false

pkg/syncer/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,17 +183,21 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
183183
core.SetWrittenKeys(stats[i].KeysWritten),
184184
core.SetReadBytes(stats[i].BytesRead),
185185
core.SetReadKeys(stats[i].KeysRead),
186-
core.SetFromHeartbeat(false),
186+
core.SetSource(core.FromSync),
187187
)
188188
} else {
189-
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
189+
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.FromSync))
190190
}
191191

192192
origin, _, err := bc.PreCheckPutRegion(region)
193193
if err != nil {
194194
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
195195
continue
196196
}
197+
// FromSync means region is stale.
198+
if origin == nil || (origin != nil && origin.GetRegionSource() == core.FromHeartbeat) {
199+
bc.RegionsInfo.AtomicAddStaleRegionCnt()
200+
}
197201
_, saveKV, _, _ := regionGuide(region, origin)
198202
overlaps := bc.PutRegion(region)
199203

server/cluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
11281128
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
11291129
// check its validation again here.
11301130
//
1131-
// However it can't solve the race condition of concurrent heartbeats from the same region.
1131+
// However, it can't solve the race condition of concurrent heartbeats from the same region.
11321132
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
11331133
return err
11341134
}

server/cluster/cluster_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) {
10021002
core.WithLeader(region.GetPeers()[2]),
10031003
core.SetApproximateSize(curMaxMergeSize-1),
10041004
core.SetApproximateKeys(curMaxMergeKeys-1),
1005-
core.SetFromHeartbeat(true),
1005+
core.SetSource(core.FromHeartbeat),
10061006
)
10071007
cluster.processRegionHeartbeat(region)
10081008
regionID := region.GetID()
@@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) {
10121012
core.WithLeader(region.GetPeers()[2]),
10131013
core.SetApproximateSize(curMaxMergeSize+1),
10141014
core.SetApproximateKeys(curMaxMergeKeys+1),
1015-
core.SetFromHeartbeat(true),
1015+
core.SetSource(core.FromHeartbeat),
10161016
)
10171017
cluster.processRegionHeartbeat(region)
10181018
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
@@ -2375,6 +2375,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
23752375
peer, _ := c.AllocPeer(id)
23762376
region.Peers = append(region.Peers, peer)
23772377
}
2378+
c.core.AtomicAddStaleRegionCnt()
23782379
return c.putRegion(core.NewRegionInfo(region, nil))
23792380
}
23802381

server/grpc_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
12611261
lastBind = time.Now()
12621262
}
12631263

1264-
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
1264+
region := core.RegionFromHeartbeat(request, flowRoundOption)
12651265
if region.GetLeader() == nil {
12661266
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
12671267
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()

0 commit comments

Comments
 (0)