@@ -18,6 +18,11 @@ import (
1818 "context"
1919 "fmt"
2020 "math"
21+ << << << < HEAD :server / schedule / region_scatterer.go
22+ == == == =
23+ "strconv"
24+ "sync"
25+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
2126 "time"
2227
2328 "github.com/pingcap/errors"
@@ -86,26 +91,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64,
8691 return s .getDistributionByGroupLocked (group )
8792}
8893
89- // TotalCountByStore counts the total count by store
90- func (s * selectedStores ) TotalCountByStore (storeID uint64 ) uint64 {
91- s .mu .RLock ()
92- defer s .mu .RUnlock ()
93- groups := s .groupDistribution .GetAllID ()
94- totalCount := uint64 (0 )
95- for _ , group := range groups {
96- storeDistribution , ok := s .getDistributionByGroupLocked (group )
97- if ! ok {
98- continue
99- }
100- count , ok := storeDistribution [storeID ]
101- if ! ok {
102- continue
103- }
104- totalCount += count
105- }
106- return totalCount
107- }
108-
10994// getDistributionByGroupLocked should be called with lock
11095func (s * selectedStores ) getDistributionByGroupLocked (group string ) (map [uint64 ]uint64 , bool ) {
11196 if result , ok := s .groupDistribution .Get (group ); ok {
@@ -309,6 +294,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
309294 selectedStores := make (map [uint64 ]struct {}, len (region .GetPeers ())) // selected StoreID set
310295 leaderCandidateStores := make ([]uint64 , 0 , len (region .GetPeers ())) // StoreID allowed to become Leader
311296 scatterWithSameEngine := func (peers map [uint64 ]* metapb.Peer , context engineContext ) { // peers: StoreID -> Peer
297+ filterLen := len (context .filterFuncs ) + 2
298+ filters := make ([]filter.Filter , filterLen )
299+ for i , filterFunc := range context .filterFuncs {
300+ filters [i ] = filterFunc ()
301+ }
302+ filters [filterLen - 2 ] = filter .NewExcludedFilter (r .name , nil , selectedStores )
312303 for _ , peer := range peers {
313304 if _ , ok := selectedStores [peer .GetStoreId ()]; ok {
314305 if allowLeader (oldFit , peer ) {
@@ -317,9 +308,19 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
317308 // It is both sourcePeer and targetPeer itself, no need to select.
318309 continue
319310 }
311+ sourceStore := r .cluster .GetStore (peer .GetStoreId ())
312+ if sourceStore == nil {
313+ log .Error ("failed to get the store" , zap .Uint64 ("store-id" , peer .GetStoreId ()), errs .ZapError (errs .ErrGetSourceStore ))
314+ continue
315+ }
316+ filters [filterLen - 1 ] = filter .NewPlacementSafeguard (r .name , r .cluster .GetSharedConfig (), r .cluster .GetBasicCluster (), r .cluster .GetRuleManager (), region , sourceStore , oldFit )
320317 for {
318+ << << << < HEAD:server / schedule / region_scatterer .go
321319 candidates := r .selectCandidates (region , peer .GetStoreId (), selectedStores , context )
322320 newPeer := r .selectStore (group , peer , peer .GetStoreId (), candidates , context )
321+ == == == =
322+ newPeer := r .selectNewPeer (context , group , peer , filters )
323+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
323324 targetPeers [newPeer .GetStoreId ()] = newPeer
324325 selectedStores [newPeer .GetStoreId ()] = struct {}{}
325326 // If the selected peer is a peer other than origin peer in this region,
@@ -340,7 +341,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
340341 // FIXME: target leader only considers the ordinary stores, maybe we need to consider the
341342 // special engine stores if the engine supports to become a leader. But now there is only
342343 // one engine, tiflash, which does not support the leader, so don't consider it for now.
343- targetLeader := r .selectAvailableLeaderStore (group , region , leaderCandidateStores , r .ordinaryEngine )
344+ targetLeader , leaderStorePickedCount := r .selectAvailableLeaderStore (group , region , leaderCandidateStores , r .ordinaryEngine )
344345 if targetLeader == 0 {
345346 scatterCounter .WithLabelValues ("no-leader" , "" ).Inc ()
346347 return nil
@@ -373,7 +374,13 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
373374 if op != nil {
374375 scatterCounter .WithLabelValues ("success" , "" ).Inc ()
375376 r .Put (targetPeers , targetLeader , group )
377+ << << << < HEAD:server / schedule / region_scatterer .go
376378 op .SetPriorityLevel (core .HighPriority )
379+ == == == =
380+ op .AdditionalInfos ["group" ] = group
381+ op .AdditionalInfos ["leader-picked-count" ] = strconv .FormatUint (leaderStorePickedCount , 10 )
382+ op .SetPriorityLevel (constant .High )
383+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
377384 }
378385 return op
379386}
@@ -404,6 +411,7 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
404411 return region .GetLeader ().GetStoreId () == targetLeader
405412}
406413
414+ << << << < HEAD:server / schedule / region_scatterer .go
407415func (r * RegionScatterer ) selectCandidates (region * core.RegionInfo , sourceStoreID uint64 , selectedStores map [uint64 ]struct {}, context engineContext ) []uint64 {
408416 sourceStore := r .cluster .GetStore (sourceStoreID )
409417 if sourceStore == nil {
@@ -416,55 +424,64 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
416424 scoreGuard := filter .NewPlacementSafeguard (r .name , r .cluster .GetOpts (), r .cluster .GetBasicCluster (), r .cluster .GetRuleManager (), region , sourceStore )
417425 filters = append (filters , context .filters ... )
418426 filters = append (filters , scoreGuard )
427+ == == == =
428+ // selectNewPeer return the new peer which pick the fewest picked count.
429+ // it keeps the origin peer if the origin store's pick count is equal the fewest pick.
430+ // it can be diveded into three steps:
431+ // 1. found the max pick count and the min pick count.
432+ // 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
433+ // 3. otherwise, select the store which pick count is the min pick count and pass all filter.
434+ func (r * RegionScatterer ) selectNewPeer (context engineContext , group string , peer * metapb .Peer , filters []filter.Filter ) * metapb.Peer {
435+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
419436 stores := r .cluster .GetStores ()
420- candidates := make ([]uint64 , 0 )
421437 maxStoreTotalCount := uint64 (0 )
422438 minStoreTotalCount := uint64 (math .MaxUint64 )
439+ << << << < HEAD:server / schedule / region_scatterer .go
423440 for _ , store := range r .cluster .GetStores () {
424441 count := context .selectedPeer .TotalCountByStore (store .GetID ())
442+ == == == =
443+ for _ , store := range stores {
444+ count := context .selectedPeer .Get (store .GetID (), group )
445+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
425446 if count > maxStoreTotalCount {
426447 maxStoreTotalCount = count
427448 }
428449 if count < minStoreTotalCount {
429450 minStoreTotalCount = count
430451 }
431452 }
453+
454+ var newPeer * metapb.Peer
455+ minCount := uint64 (math .MaxUint64 )
456+ originStorePickedCount := uint64 (math .MaxUint64 )
432457 for _ , store := range stores {
433- storeCount := context .selectedPeer .TotalCountByStore (store .GetID ())
458+ storeCount := context .selectedPeer .Get (store .GetID (), group )
459+ if store .GetID () == peer .GetId () {
460+ originStorePickedCount = storeCount
461+ }
434462 // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
435463 // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
436464 // could be selected as candidate.
437465 if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
466+ << << << < HEAD:server / schedule / region_scatterer .go
438467 if filter .Target (r .cluster .GetOpts (), store , filters ) {
439468 candidates = append (candidates , store .GetID ())
469+ == == == =
470+ if filter .Target (r .cluster .GetSharedConfig (), store , filters ) {
471+ if storeCount < minCount {
472+ minCount = storeCount
473+ newPeer = & metapb.Peer {
474+ StoreId : store .GetID (),
475+ Role : peer .GetRole (),
476+ }
477+ }
478+ >> >> >> > 72 a13c023 (Scatter : make peer scatter logic same with the leader (#6965 )):pkg / schedule / scatter / region_scatterer .go
440479 }
441480 }
442481 }
443- return candidates
444- }
445-
446- func (r * RegionScatterer ) selectStore (group string , peer * metapb.Peer , sourceStoreID uint64 , candidates []uint64 , context engineContext ) * metapb.Peer {
447- if len (candidates ) < 1 {
482+ if originStorePickedCount <= minCount {
448483 return peer
449484 }
450- var newPeer * metapb.Peer
451- minCount := uint64 (math .MaxUint64 )
452- for _ , storeID := range candidates {
453- count := context .selectedPeer .Get (storeID , group )
454- if count < minCount {
455- minCount = count
456- newPeer = & metapb.Peer {
457- StoreId : storeID ,
458- Role : peer .GetRole (),
459- }
460- }
461- }
462- // if the source store have the least count, we don't need to scatter this peer
463- for _ , storeID := range candidates {
464- if storeID == sourceStoreID && context .selectedPeer .Get (sourceStoreID , group ) <= minCount {
465- return peer
466- }
467- }
468485 if newPeer == nil {
469486 return peer
470487 }
@@ -473,11 +490,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
473490
474491// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
475492// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
476- func (r * RegionScatterer ) selectAvailableLeaderStore (group string , region * core.RegionInfo , leaderCandidateStores []uint64 , context engineContext ) uint64 {
493+ func (r * RegionScatterer ) selectAvailableLeaderStore (group string , region * core .RegionInfo ,
494+ leaderCandidateStores []uint64 , context engineContext ) (leaderID uint64 , leaderStorePickedCount uint64 ) {
477495 sourceStore := r.cluster .GetStore (region.GetLeader ().GetStoreId ())
478496 if sourceStore == nil {
479497 log.Error ("failed to get the store" , zap .Uint64 ("store-id" , region .GetLeader ().GetStoreId ()), errs .ZapError (errs .ErrGetSourceStore ))
480- return 0
498+ return 0 , 0
481499 }
482500 minStoreGroupLeader := uint64 (math .MaxUint64 )
483501 id := uint64 (0 )
@@ -492,7 +510,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.
492510 id = storeID
493511 }
494512 }
495- return id
513+ return id , minStoreGroupLeader
496514}
497515
498516// Put put the final distribution in the context no matter the operator was created
0 commit comments