Skip to content

Commit 4fea0f0

Browse files
authored
*: fix the issue that loadcluster does not remove overlap regions(#2022) (#2039) (#2040)
Signed-off-by: nolouch <nolouch@gmail.com>
1 parent d9c97ed commit 4fea0f0

File tree

10 files changed

+80
-26
lines changed

10 files changed

+80
-26
lines changed

server/cluster_info.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
8282
)
8383

8484
start = time.Now()
85-
if err := kv.LoadRegions(c.core.Regions); err != nil {
85+
// used to load region from kv storage to cache storage.
86+
putRegion := func(region *core.RegionInfo) []*metapb.Region {
87+
c.Lock()
88+
defer c.Unlock()
89+
origin, err := c.core.PreCheckPutRegion(region)
90+
if err != nil {
91+
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
92+
// return the state region to delete.
93+
return []*metapb.Region{region.GetMeta()}
94+
}
95+
return c.core.Regions.SetRegion(region)
96+
}
97+
if err := kv.LoadRegions(putRegion); err != nil {
8698
return nil, err
8799
}
88100
log.Info("load regions",
@@ -512,14 +524,10 @@ func (c *clusterInfo) updateStoreStatusLocked(id uint64) {
512524
// handleRegionHeartbeat updates the region information.
513525
func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
514526
c.RLock()
515-
origin := c.core.Regions.GetRegion(region.GetID())
516-
if origin == nil {
517-
for _, item := range c.core.Regions.GetOverlaps(region) {
518-
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
519-
c.RUnlock()
520-
return ErrRegionIsStale(region.GetMeta(), item)
521-
}
522-
}
527+
origin, err := c.core.PreCheckPutRegion(region)
528+
if err != nil {
529+
c.RUnlock()
530+
return err
523531
}
524532
isWriteUpdate, writeItem := c.CheckWriteStatus(region)
525533
isReadUpdate, readItem := c.CheckReadStatus(region)
@@ -538,10 +546,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
538546
} else {
539547
r := region.GetRegionEpoch()
540548
o := origin.GetRegionEpoch()
541-
// Region meta is stale, return an error.
542-
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
543-
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
544-
}
549+
545550
if r.GetVersion() > o.GetVersion() {
546551
log.Info("region Version changed",
547552
zap.Uint64("region-id", region.GetID()),

server/cluster_info_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
3232
e := region.GetRegionEpoch()
3333

3434
if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() {
35-
return ErrRegionIsStale(region, origin)
35+
return core.ErrRegionIsStale(region, origin)
3636
}
3737

3838
return nil

server/core/basic_cluster.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,23 @@ func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
123123
func (bc *BasicCluster) PutRegion(region *RegionInfo) {
124124
bc.Regions.SetRegion(region)
125125
}
126+
127+
// PreCheckPutRegion checks if the region is valid to put.
128+
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
129+
for _, item := range bc.Regions.GetOverlaps(region) {
130+
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
131+
return nil, ErrRegionIsStale(region.GetMeta(), item)
132+
}
133+
}
134+
origin := bc.Regions.GetRegion(region.GetID())
135+
if origin == nil {
136+
return nil, nil
137+
}
138+
r := region.GetRegionEpoch()
139+
o := origin.GetRegionEpoch()
140+
// Region meta is stale, return an error.
141+
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
142+
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
143+
}
144+
return origin, nil
145+
}

server/core/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"net/http"
2222

2323
"github.com/pingcap/errcode"
24+
"github.com/pingcap/kvproto/pkg/metapb"
25+
"github.com/pkg/errors"
2426
)
2527

2628
var (
@@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string {
6163

6264
// Code returns StoreBlockedCode
6365
func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode }
66+
67+
// ErrRegionIsStale is error info for region is stale
68+
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
69+
return errors.Errorf("region is stale: region %v origin %v", region, origin)
70+
}

server/core/kv.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) {
128128
}
129129

130130
// LoadRegions loads all regions from KV to RegionsInfo.
131-
func (kv *KV) LoadRegions(regions *RegionsInfo) error {
131+
func (kv *KV) LoadRegions(f func(region *RegionInfo) []*metapb.Region) error {
132132
if atomic.LoadInt32(&kv.useRegionKV) > 0 {
133-
return loadRegions(kv.regionKV, regions)
133+
return loadRegions(kv.regionKV, f)
134134
}
135-
return loadRegions(kv.KVBase, regions)
135+
return loadRegions(kv.KVBase, f)
136136
}
137137

138138
// SaveRegion saves one region to KV.

server/core/kv_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) {
137137

138138
n := 10
139139
regions := mustSaveRegions(c, kv, n)
140-
c.Assert(kv.LoadRegions(cache), IsNil)
140+
c.Assert(kv.LoadRegions(cache.SetRegion), IsNil)
141141

142142
c.Assert(cache.GetRegionCount(), Equals, n)
143143
for _, region := range cache.GetMetaRegions() {
@@ -151,7 +151,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) {
151151

152152
n := 1000
153153
regions := mustSaveRegions(c, kv, n)
154-
c.Assert(kv.LoadRegions(cache), IsNil)
154+
c.Assert(kv.LoadRegions(cache.SetRegion), IsNil)
155155
c.Assert(cache.GetRegionCount(), Equals, n)
156156
for _, region := range cache.GetMetaRegions() {
157157
c.Assert(region, DeepEquals, regions[region.GetId()])

server/core/region.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ func (r *RegionInfo) GetID() uint64 {
296296

297297
// GetMeta returns the meta information of the region.
298298
func (r *RegionInfo) GetMeta() *metapb.Region {
299+
if r == nil {
300+
return nil
301+
}
299302
return r.meta
300303
}
301304

server/core/region_kv.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func deleteRegion(kv KVBase, region *metapb.Region) error {
118118
return kv.Delete(regionPath(region.GetId()))
119119
}
120120

121-
func loadRegions(kv KVBase, regions *RegionsInfo) error {
121+
func loadRegions(kv KVBase, f func(region *RegionInfo) []*metapb.Region) error {
122122
nextID := uint64(0)
123123
endKey := regionPath(math.MaxUint64)
124124

@@ -143,7 +143,7 @@ func loadRegions(kv KVBase, regions *RegionsInfo) error {
143143
}
144144

145145
nextID = region.GetId() + 1
146-
overlaps := regions.SetRegion(NewRegionInfo(region, nil))
146+
overlaps := f(NewRegionInfo(region, nil))
147147
for _, item := range overlaps {
148148
if err := deleteRegion(kv, item); err != nil {
149149
return err

server/handler.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"time"
2020

2121
"github.com/pingcap/errcode"
22-
"github.com/pingcap/kvproto/pkg/metapb"
2322
log "github.com/pingcap/log"
2423
"github.com/pingcap/pd/server/core"
2524
"github.com/pingcap/pd/server/schedule"
@@ -45,10 +44,6 @@ var (
4544
ErrRegionAbnormalPeer = func(regionID uint64) error {
4645
return errors.Errorf("region %v has abnormal peer", regionID)
4746
}
48-
// ErrRegionIsStale is error info for region is stale
49-
ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
50-
return errors.Errorf("region is stale: region %v origin %v", region, origin)
51-
}
5247
)
5348

5449
// Handler is a helper to export methods to handle API/RPC requests.

tests/server/region_syncer/region_syncer_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,30 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
8080
err = rc.HandleRegionHeartbeat(region)
8181
c.Assert(err, IsNil)
8282
}
83+
84+
// merge case
85+
// region2 -> region1 -> region0
86+
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
87+
regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
88+
err = rc.HandleRegionHeartbeat(regions[2])
89+
c.Assert(err, IsNil)
90+
91+
// merge case
92+
// region3 -> region4
93+
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
94+
regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion())
95+
err = rc.HandleRegionHeartbeat(regions[4])
96+
c.Assert(err, IsNil)
97+
98+
// merge case
99+
// region0 -> region4
100+
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
101+
regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
102+
err = rc.HandleRegionHeartbeat(regions[4])
103+
c.Assert(err, IsNil)
104+
regions = regions[4:]
105+
regionLen = len(regions)
106+
83107
// ensure flush to region kv
84108
time.Sleep(3 * time.Second)
85109
err = leaderServer.Stop()

0 commit comments

Comments
 (0)