-
Notifications
You must be signed in to change notification settings - Fork 759
Scatter: make peer scatter logic same with the leader #6965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
dea649a
scatter
bufferflies f8f38fd
simply scatter
bufferflies e77d70a
remove total count
bufferflies 3540f71
rename
bufferflies 0654696
pass ut
bufferflies 78b52bd
addition
bufferflies b8e2f61
Merge branch 'master' into scatter_bug
bufferflies 8412f8c
address comment
bufferflies 9783c53
Merge branch 'master' into scatter_bug
bufferflies a067ab5
Merge branch 'master' into scatter_bug
ti-chi-bot[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ import ( | |
| "context" | ||
| "fmt" | ||
| "math" | ||
| "strconv" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -106,26 +107,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64, | |
| return s.getDistributionByGroupLocked(group) | ||
| } | ||
|
|
||
| // TotalCountByStore counts the total count by store | ||
| func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 { | ||
| s.mu.RLock() | ||
| defer s.mu.RUnlock() | ||
| groups := s.groupDistribution.GetAllID() | ||
| totalCount := uint64(0) | ||
| for _, group := range groups { | ||
| storeDistribution, ok := s.getDistributionByGroupLocked(group) | ||
| if !ok { | ||
| continue | ||
| } | ||
| count, ok := storeDistribution[storeID] | ||
| if !ok { | ||
| continue | ||
| } | ||
| totalCount += count | ||
| } | ||
| return totalCount | ||
| } | ||
|
|
||
| // getDistributionByGroupLocked should be called with lock | ||
| func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) { | ||
| if result, ok := s.groupDistribution.Get(group); ok { | ||
|
|
@@ -332,6 +313,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s | |
| selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set | ||
| leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader | ||
| scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer | ||
| filterLen := len(context.filterFuncs) + 2 | ||
| filters := make([]filter.Filter, filterLen) | ||
| for i, filterFunc := range context.filterFuncs { | ||
| filters[i] = filterFunc() | ||
| } | ||
| filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) | ||
| for _, peer := range peers { | ||
| if _, ok := selectedStores[peer.GetStoreId()]; ok { | ||
| if allowLeader(oldFit, peer) { | ||
|
|
@@ -340,9 +327,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s | |
| // It is both sourcePeer and targetPeer itself, no need to select. | ||
| continue | ||
| } | ||
| sourceStore := r.cluster.GetStore(peer.GetStoreId()) | ||
| if sourceStore == nil { | ||
| log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) | ||
| continue | ||
| } | ||
| filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) | ||
bufferflies marked this conversation as resolved.
Show resolved
Hide resolved
bufferflies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for { | ||
| candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context) | ||
| newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) | ||
| newPeer := r.selectNewPeer(context, group, peer, filters) | ||
| targetPeers[newPeer.GetStoreId()] = newPeer | ||
| selectedStores[newPeer.GetStoreId()] = struct{}{} | ||
| // If the selected peer is a peer other than origin peer in this region, | ||
|
|
@@ -363,7 +355,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s | |
| // FIXME: target leader only considers the ordinary stores, maybe we need to consider the | ||
| // special engine stores if the engine supports to become a leader. But now there is only | ||
| // one engine, tiflash, which does not support the leader, so don't consider it for now. | ||
| targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) | ||
| targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) | ||
| if targetLeader == 0 { | ||
| scatterSkipNoLeaderCounter.Inc() | ||
| return nil | ||
|
|
@@ -398,6 +390,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s | |
| if op != nil { | ||
| scatterSuccessCounter.Inc() | ||
| r.Put(targetPeers, targetLeader, group) | ||
| op.AdditionalInfos["group"] = group | ||
| op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) | ||
| op.SetPriorityLevel(constant.High) | ||
| } | ||
| return op | ||
|
|
@@ -432,69 +426,52 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. | |
| return region.GetLeader().GetStoreId() == targetLeader | ||
| } | ||
|
|
||
| func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { | ||
| sourceStore := r.cluster.GetStore(sourceStoreID) | ||
| if sourceStore == nil { | ||
| log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) | ||
| return nil | ||
| } | ||
| filters := []filter.Filter{ | ||
| filter.NewExcludedFilter(r.name, nil, selectedStores), | ||
| } | ||
| scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) | ||
| for _, filterFunc := range context.filterFuncs { | ||
| filters = append(filters, filterFunc()) | ||
| } | ||
| filters = append(filters, scoreGuard) | ||
| // selectNewPeer return the new peer which pick the fewest picked count. | ||
| // it keeps the origin peer if the origin store's pick count is equal the fewest pick. | ||
| // it can be diveded into three steps: | ||
| // 1. found the max pick count and the min pick count. | ||
| // 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer. | ||
| // 3. otherwise, select the store which pick count is the min pick count and pass all filter. | ||
| func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { | ||
| stores := r.cluster.GetStores() | ||
| candidates := make([]uint64, 0) | ||
| maxStoreTotalCount := uint64(0) | ||
| minStoreTotalCount := uint64(math.MaxUint64) | ||
| for _, store := range stores { | ||
| count := context.selectedPeer.TotalCountByStore(store.GetID()) | ||
| count := context.selectedPeer.Get(store.GetID(), group) | ||
| if count > maxStoreTotalCount { | ||
| maxStoreTotalCount = count | ||
| } | ||
| if count < minStoreTotalCount { | ||
| minStoreTotalCount = count | ||
| } | ||
| } | ||
|
|
||
| var newPeer *metapb.Peer | ||
| minCount := uint64(math.MaxUint64) | ||
| originStorePickedCount := uint64(math.MaxUint64) | ||
| for _, store := range stores { | ||
| storeCount := context.selectedPeer.TotalCountByStore(store.GetID()) | ||
| storeCount := context.selectedPeer.Get(store.GetID(), group) | ||
| if store.GetID() == peer.GetId() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if store.GetID() == peer.GetStoreId() { |
||
| originStorePickedCount = storeCount | ||
| } | ||
| // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. | ||
| // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store | ||
| // could be selected as candidate. | ||
| if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { | ||
| if filter.Target(r.cluster.GetSharedConfig(), store, filters) { | ||
| candidates = append(candidates, store.GetID()) | ||
| if storeCount < minCount { | ||
| minCount = storeCount | ||
| newPeer = &metapb.Peer{ | ||
| StoreId: store.GetID(), | ||
| Role: peer.GetRole(), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return candidates | ||
| } | ||
|
|
||
| func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer { | ||
| if len(candidates) < 1 { | ||
| if originStorePickedCount <= minCount { | ||
| return peer | ||
| } | ||
| var newPeer *metapb.Peer | ||
| minCount := uint64(math.MaxUint64) | ||
| for _, storeID := range candidates { | ||
| count := context.selectedPeer.Get(storeID, group) | ||
| if count < minCount { | ||
| minCount = count | ||
| newPeer = &metapb.Peer{ | ||
| StoreId: storeID, | ||
| Role: peer.GetRole(), | ||
| } | ||
| } | ||
| } | ||
| // if the source store have the least count, we don't need to scatter this peer | ||
| for _, storeID := range candidates { | ||
| if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount { | ||
| return peer | ||
| } | ||
| } | ||
| if newPeer == nil { | ||
| return peer | ||
| } | ||
|
|
@@ -503,11 +480,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto | |
|
|
||
| // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by | ||
| // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. | ||
| func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 { | ||
| func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, | ||
| leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) { | ||
| sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) | ||
| if sourceStore == nil { | ||
| log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) | ||
| return 0 | ||
| return 0, 0 | ||
| } | ||
| minStoreGroupLeader := uint64(math.MaxUint64) | ||
| id := uint64(0) | ||
|
|
@@ -522,7 +500,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core. | |
| id = storeID | ||
| } | ||
| } | ||
| return id | ||
| return id, minStoreGroupLeader | ||
| } | ||
|
|
||
| // Put put the final distribution in the context no matter the operator was created | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.