Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
accb06b
fix
HuSharp Sep 6, 2023
46406e7
Merge branch 'master' into fix_coordinator
HuSharp Sep 6, 2023
d161acb
address comment
HuSharp Sep 6, 2023
6be6f17
address comment
HuSharp Sep 7, 2023
1f16cda
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 11, 2023
68a1107
address comment
HuSharp Sep 11, 2023
c67ed32
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 13, 2023
67b4081
address comment
HuSharp Sep 13, 2023
7bae37c
change to healthy region and move to region tree
HuSharp Sep 18, 2023
382816d
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Sep 19, 2023
f89619a
add test
HuSharp Sep 19, 2023
4eebfe3
merge master
HuSharp Sep 20, 2023
e90fa56
remove redundant line
HuSharp Sep 20, 2023
cc2d832
merge master
HuSharp Sep 21, 2023
664427a
merge master
HuSharp Sep 26, 2023
8a0c520
follower to leader can start coordinator immediately
HuSharp Sep 28, 2023
c88260d
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
32f856f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
be7296f
Merge branch 'master' into fix_coordinator
HuSharp Sep 28, 2023
99dc596
Merge remote-tracking branch 'upstream/master' into fix_coordinator
HuSharp Oct 9, 2023
0663294
change to from
HuSharp Oct 9, 2023
183649a
change proper name
HuSharp Oct 9, 2023
45d2cc5
Merge branch 'master' into fix_coordinator
HuSharp Oct 9, 2023
6281684
address comment
HuSharp Oct 10, 2023
a9d6e3e
Merge branch 'master' into fix_coordinator
ti-chi-bot[bot] Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,31 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
}

// RegionSource is the source of region.
type RegionSource uint32

const (
// Storage means this region's meta info might be stale.
Storage RegionSource = iota
// Sync means this region's meta info is relatively fresher.
Sync
// Heartbeat means this region's meta info is relatively fresher.
Heartbeat
)

// IsSourceStale means this region's meta info might be stale.
func (r *RegionInfo) IsSourceStale() bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about not using stale/fresh? Just use LoadedFromStorage and !LoadedFromStorage. Or something like that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return r.source == Storage
}

// IsSourceFresh means this region's meta info is relatively fresher.
func (r *RegionInfo) IsSourceFresh() bool {
return r.source == Heartbeat || r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -192,6 +215,7 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
}

// scheduling service doesn't need the following fields.
Expand Down Expand Up @@ -667,11 +691,6 @@ func (r *RegionInfo) IsFlashbackChanged(l *RegionInfo) bool {
return r.meta.FlashbackStartTs != l.meta.FlashbackStartTs || r.meta.IsInFlashback != l.meta.IsInFlashback
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool {
return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0))
}
Expand Down Expand Up @@ -711,7 +730,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
if origin.IsSourceStale() {
isNew = true
}
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -1323,6 +1342,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterHealthyRegionsCnt get healthy region count of cluster
func (r *RegionsInfo) GetClusterHealthyRegionsCnt() int64 {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.healthyRegionsCnt
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
// SetSource sets the region info's come from.
func SetSource(source RegionSource) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
region.source = source
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the healthy meta regions
healthyRegionsCnt int64
}

func newRegionTree() *regionTree {
Expand All @@ -69,6 +71,7 @@ func newRegionTree() *regionTree {
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
healthyRegionsCnt: 0,
}
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
if region.IsSourceFresh() {
t.healthyRegionsCnt++
}

if !withOverlaps {
overlaps = t.overlaps(item)
Expand All @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if old.IsSourceFresh() {
t.healthyRegionsCnt--
}
}

return result
Expand All @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

// If region meta from `stale` to `fresh`, need to add the healthy region count.
if origin.IsSourceStale() && region.IsSourceFresh() {
t.healthyRegionsCnt++
}
// If region meta from `fresh` to `stale`, need to sub the healthy region count.
if origin.IsSourceFresh() && region.IsSourceStale() {
t.healthyRegionsCnt--
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if region.IsSourceFresh() {
t.healthyRegionsCnt--
}
t.tree.Delete(item)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package schedule
import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type prepareChecker struct {
Expand Down Expand Up @@ -47,6 +49,11 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.prepared = true
return true
}
if float64(c.GetClusterHealthyRegionsCnt()) > float64(c.GetTotalRegionCount())*collectFactor {
log.Info("healthy meta region number is satisfied, finish prepare checker", zap.Int64("healthy-region", c.GetClusterHealthyRegionsCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need it?

Copy link
Copy Markdown
Member Author

@HuSharp HuSharp Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is better to remove them in another pr? This pr just focuses on accelerating coordinator.
What do u think :)

return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
}

nextID = region.GetId() + 1
overlaps := f(core.NewRegionInfo(region, nil))
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
for _, item := range overlaps {
if err := se.DeleteRegion(item.GetMeta()); err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
}

func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
re := require.New(b)
ctx := context.Background()
dir := b.TempDir()
lb, err := newLevelDBBackend(ctx, dir, nil)
Expand All @@ -426,10 +427,8 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
}()

b.ResetTimer()
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
if err != nil {
b.Fatal(err)
}
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.NoError(err)
}

var volumes = []struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Info("region syncer start load region")
start := time.Now()
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
}
Expand Down Expand Up @@ -183,10 +183,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
core.SetSource(core.Sync),
)
} else {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync))
}

origin, _, err := bc.PreCheckPutRegion(region)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
Expand All @@ -1012,7 +1012,7 @@ func TestRegionSizeChanged(t *testing.T) {
core.WithLeader(region.GetPeers()[2]),
core.SetApproximateSize(curMaxMergeSize+1),
core.SetApproximateKeys(curMaxMergeKeys+1),
core.SetFromHeartbeat(true),
core.SetSource(core.Heartbeat),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
region := core.RegionFromHeartbeat(request, flowRoundOption)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
48 changes: 47 additions & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,52 @@ func TestPrepareChecker(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

// ref: https://github.com/tikv/pd/issues/6988
func TestPrepareCheckerWithTransferLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
re.NoError(leaderServer.BootstrapCluster())
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 100
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}
// ensure flush to region storage
time.Sleep(3 * time.Second)
re.True(leaderServer.GetRaftCluster().IsPrepared())

// join new PD
pd2, err := cluster.Join(ctx)
re.NoError(err)
err = pd2.Run()
re.NoError(err)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd2", cluster.WaitLeader())

// transfer leader to pd1, can start coordinator immediately.
err = cluster.ResignLeader()
re.NoError(err)
re.Equal("pd1", cluster.WaitLeader())
re.True(rc.IsPrepared())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand All @@ -264,7 +310,7 @@ func initRegions(regionLen int) []*core.RegionInfo {
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
region := core.NewRegionInfo(r, r.Peers[0])
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
buckets := &metapb.Buckets{
Expand Down