Skip to content

Commit 3153ee8

Browse files
author
wuhua3
committed
rename RefersFilter
cluster group cluster group cluster group cluster group code review code review bugfix bugfix bugfix unit test bugfix
1 parent 2379eb2 commit 3153ee8

27 files changed

+970
-235
lines changed

agent.go

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,21 @@ type Agent struct {
6060

6161
agentServer motan.Server
6262

63-
clusterMap *motan.CopyOnWriteMap
64-
httpClusterMap *motan.CopyOnWriteMap
65-
status int64
66-
agentURL *motan.URL
67-
logdir string
68-
port int
69-
mport int
70-
eport int
71-
hport int
72-
pidfile string
73-
runtimedir string
63+
clusterGroupServiceMap *motan.CopyOnWriteMap
64+
clusterGroupMap *motan.CopyOnWriteMap
65+
httpClusterMap *motan.CopyOnWriteMap
66+
67+
status int64
68+
agentURL *motan.URL
69+
logdir string
70+
port int
71+
mport int
72+
eport int
73+
hport int
74+
pidfile string
75+
runtimedir string
7476

7577
serviceExporters *motan.CopyOnWriteMap
76-
serviceMap *motan.CopyOnWriteMap
7778
agentPortServer map[int]motan.Server
7879
serviceRegistries *motan.CopyOnWriteMap
7980
httpProxyServer *mserver.HTTPProxyServer
@@ -95,9 +96,9 @@ type CommandHandler interface {
9596
Serve() (currentCommandInfo string)
9697
}
9798

98-
type serviceMapItem struct {
99-
url *motan.URL
100-
cluster *cluster.MotanCluster
99+
type clusterGroupServiceMapItem struct {
100+
url *motan.URL
101+
clusterGroup motan.ClusterGroup
101102
}
102103

103104
func NewAgent(extfactory motan.ExtensionFactory) *Agent {
@@ -108,14 +109,14 @@ func NewAgent(extfactory motan.ExtensionFactory) *Agent {
108109
} else {
109110
agent = &Agent{extFactory: extfactory}
110111
}
111-
agent.clusterMap = motan.NewCopyOnWriteMap()
112+
agent.clusterGroupMap = motan.NewCopyOnWriteMap()
112113
agent.httpClusterMap = motan.NewCopyOnWriteMap()
113114
agent.serviceExporters = motan.NewCopyOnWriteMap()
114115
agent.agentPortServer = make(map[int]motan.Server)
115116
agent.serviceRegistries = motan.NewCopyOnWriteMap()
116117
agent.manageHandlers = make(map[string]http.Handler)
117118
agent.envHandlers = make(map[string]map[string]http.Handler)
118-
agent.serviceMap = motan.NewCopyOnWriteMap()
119+
agent.clusterGroupServiceMap = motan.NewCopyOnWriteMap()
119120
return agent
120121
}
121122

@@ -213,7 +214,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
213214
metrics.StartReporter(a.Context)
214215
a.registerStatusSampler()
215216
a.initStatus()
216-
a.initClusters()
217+
a.initClusterGroups()
217218
a.startServerAgent()
218219
a.initHTTPClusters()
219220
a.startHTTPAgent()
@@ -497,11 +498,10 @@ func (h *httpClusterGetter) GetHTTPCluster(host string) *cluster.HTTPCluster {
497498
func (a *Agent) reloadClusters(ctx *motan.Context) {
498499
a.clsLock.Lock()
499500
defer a.clsLock.Unlock()
500-
501501
a.Context = ctx
502502

503503
serviceItemKeep := make(map[string]bool)
504-
clusterMap := make(map[interface{}]interface{})
504+
clusterGroupMap := make(map[interface{}]interface{})
505505
serviceMap := make(map[interface{}]interface{})
506506
var allRefersURLs []*motan.URL
507507
if a.configurer != nil {
@@ -521,10 +521,10 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
521521
service := url.Path
522522
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)
523523

524-
// find exists old serviceMap
525-
var serviceMapValue serviceMapItem
526-
if v, exists := a.serviceMap.Load(service); exists {
527-
vItems := v.([]serviceMapItem)
524+
// find exists old clusterGroupMap
525+
var serviceMapValue clusterGroupServiceMapItem
526+
if v, exists := a.clusterGroupServiceMap.Load(service); exists {
527+
vItems := v.([]clusterGroupServiceMapItem)
528528

529529
for _, vItem := range vItems {
530530
urlExtInfo := url.ToExtInfo()
@@ -536,51 +536,51 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
536536
}
537537
}
538538

539-
// new serviceMap & cluster
539+
// new clusterGroupMap & clusterGroup
540540
if serviceMapValue.url == nil {
541541
vlog.Infoln("hot create service:" + url.ToExtInfo())
542-
c := cluster.NewCluster(a.Context, a.extFactory, url, true)
543-
serviceMapValue = serviceMapItem{
544-
url: url,
545-
cluster: c,
542+
cg := cluster.NewClusterGroup(a.Context, a.extFactory, url, true)
543+
serviceMapValue = clusterGroupServiceMapItem{
544+
url: url,
545+
clusterGroup: cg,
546546
}
547547
}
548-
clusterMap[mapKey] = serviceMapValue.cluster
548+
clusterGroupMap[mapKey] = serviceMapValue.clusterGroup
549549

550-
var serviceMapItemArr []serviceMapItem
550+
var serviceMapItemArr []clusterGroupServiceMapItem
551551
if v, exists := serviceMap[service]; exists {
552-
serviceMapItemArr = v.([]serviceMapItem)
552+
serviceMapItemArr = v.([]clusterGroupServiceMapItem)
553553
}
554554
serviceMapItemArr = append(serviceMapItemArr, serviceMapValue)
555555
serviceMap[url.Path] = serviceMapItemArr
556556
}
557557

558-
oldServiceMap := a.serviceMap.Swap(serviceMap)
559-
a.clusterMap.Swap(clusterMap)
558+
oldServiceMap := a.clusterGroupServiceMap.Swap(serviceMap)
559+
a.clusterGroupMap.Swap(clusterGroupMap)
560560

561561
// diff and destroy service
562562
for _, v := range oldServiceMap {
563-
vItems := v.([]serviceMapItem)
563+
vItems := v.([]clusterGroupServiceMapItem)
564564
for _, item := range vItems {
565565
if _, ok := serviceItemKeep[item.url.ToExtInfo()]; !ok {
566566
vlog.Infoln("hot destroy service:" + item.url.ToExtInfo())
567-
item.cluster.Destroy()
567+
item.clusterGroup.Destroy()
568568
}
569569
}
570570
}
571571
}
572572

573-
func (a *Agent) initClusters() {
573+
func (a *Agent) initClusterGroups() {
574574
initTimeout := a.Context.AgentURL.GetIntValue(motan.InitClusterTimeoutKey, defaultInitClusterTimeout)
575575
timer := time.NewTimer(time.Millisecond * time.Duration(initTimeout))
576576
wg := sync.WaitGroup{}
577577
wg.Add(len(a.Context.RefersURLs))
578578
for _, url := range a.Context.RefersURLs {
579-
// concurrently initialize cluster
579+
// concurrently initialize clusterGroup
580580
go func(u *motan.URL) {
581581
defer wg.Done()
582582
defer motan.HandlePanic(nil)
583-
a.initCluster(u)
583+
a.initClusterGroup(u)
584584
}(url)
585585
}
586586
finishChan := make(chan struct{})
@@ -590,38 +590,37 @@ func (a *Agent) initClusters() {
590590
}()
591591
select {
592592
case <-timer.C:
593-
vlog.Infof("agent init cluster timeout(%dms), do not wait(rest cluster keep doing initialization backend)", initTimeout)
593+
vlog.Infof("agent init clusterGroup timeout(%dms), do not wait(rest clusterGroup keep doing initialization backend)", initTimeout)
594594
case <-finishChan:
595595
defer timer.Stop()
596-
vlog.Infoln("agent cluster init complete")
596+
vlog.Infoln("agent clusterGroup init complete")
597597
}
598598
}
599599

600-
func (a *Agent) initCluster(url *motan.URL) {
600+
func (a *Agent) initClusterGroup(url *motan.URL) {
601601
if url.Parameters[motan.ApplicationKey] == "" {
602602
url.Parameters[motan.ApplicationKey] = a.agentURL.Parameters[motan.ApplicationKey]
603603
}
604-
605-
c := cluster.NewCluster(a.Context, a.extFactory, url, true)
606-
item := serviceMapItem{
607-
url: url,
608-
cluster: c,
604+
cg := cluster.NewClusterGroup(a.Context, a.extFactory, url, true)
605+
item := clusterGroupServiceMapItem{
606+
url: url,
607+
clusterGroup: cg,
609608
}
610609
service := url.Path
611-
a.serviceMap.SafeDoFunc(func() {
612-
var serviceMapItemArr []serviceMapItem
613-
if v, exists := a.serviceMap.Load(service); exists {
614-
serviceMapItemArr = v.([]serviceMapItem)
610+
a.clusterGroupServiceMap.SafeDoFunc(func() {
611+
var serviceMapItemArr []clusterGroupServiceMapItem
612+
if v, exists := a.clusterGroupServiceMap.Load(service); exists {
613+
serviceMapItemArr = v.([]clusterGroupServiceMapItem)
615614
serviceMapItemArr = append(serviceMapItemArr, item)
616615
} else {
617-
serviceMapItemArr = []serviceMapItem{item}
616+
serviceMapItemArr = []clusterGroupServiceMapItem{item}
618617
}
619-
a.serviceMap.UnsafeStore(url.Path, serviceMapItemArr)
618+
a.clusterGroupServiceMap.UnsafeStore(url.Path, serviceMapItemArr)
620619
})
621620
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)
622621
a.clsLock.Lock() // Mutually exclusive with the reloadClusters method
623622
defer a.clsLock.Unlock()
624-
a.clusterMap.Store(mapKey, c)
623+
a.clusterGroupMap.Store(mapKey, cg)
625624
}
626625

627626
func (a *Agent) SetSanpshotConf() {
@@ -726,13 +725,13 @@ func (a *agentMessageHandler) GetRuntimeInfo() map[string]interface{} {
726725
return info
727726
}
728727

729-
func (a *agentMessageHandler) clusterCall(request motan.Request, ck string, motanCluster *cluster.MotanCluster) (res motan.Response) {
728+
func (a *agentMessageHandler) clusterCall(request motan.Request, ck string, motanClusterGroup motan.ClusterGroup) (res motan.Response) {
730729
// fill default request info
731-
fillDefaultReqInfo(request, motanCluster.GetURL())
732-
res = motanCluster.Call(request)
730+
fillDefaultReqInfo(request, motanClusterGroup.GetURL())
731+
res = motanClusterGroup.Call(request)
733732
if res == nil {
734-
vlog.Warningf("motanCluster Call return nil. cluster:%s", ck)
735-
res = getDefaultResponse(request.GetRequestID(), "motanCluster Call return nil. cluster:"+ck)
733+
vlog.Warningf("motanCluster Call return nil. clusterGroup:%s", ck)
734+
res = getDefaultResponse(request.GetRequestID(), "motanCluster Call return nil. clusterGroup:"+ck)
736735
}
737736
return res
738737
}
@@ -759,8 +758,8 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
759758
}
760759
res = httpCluster.Call(request)
761760
if res == nil {
762-
vlog.Warningf("httpCluster Call return nil. cluster:%s", ck)
763-
return getDefaultResponse(request.GetRequestID(), "httpCluster Call return nil. cluster:"+ck)
761+
vlog.Warningf("httpCluster Call return nil. clusterGroup:%s", ck)
762+
return getDefaultResponse(request.GetRequestID(), "httpCluster Call return nil. clusterGroup:"+ck)
764763
}
765764
}
766765
// has response and response not a no endpoint exception
@@ -826,7 +825,7 @@ func fillDefaultReqInfo(r motan.Request, url *motan.URL) {
826825
}
827826

828827
func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
829-
c, ck, err := a.findCluster(request)
828+
c, ck, err := a.findClusterGroup(request)
830829
if err == nil {
831830
res = a.clusterCall(request, ck, c)
832831
} else if httpCluster := a.agent.httpClusterMap.LoadOrNil(request.GetServiceName()); httpCluster != nil {
@@ -842,37 +841,37 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
842841
return res
843842
}
844843

845-
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
844+
func (a *agentMessageHandler) findClusterGroup(request motan.Request) (c motan.ClusterGroup, key string, err error) {
846845
service := request.GetServiceName()
847846
if service == "" {
848847
err = fmt.Errorf("empty service is not supported. service: %s", service)
849848
return
850849
}
851-
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
850+
serviceItemArrI, exists := a.agent.clusterGroupServiceMap.Load(service)
852851
if !exists {
853-
err = fmt.Errorf("cluster not found. service: %s", service)
852+
err = fmt.Errorf("clusterGroup not found. service: %s", service)
854853
return
855854
}
856-
clusters := serviceItemArrI.([]serviceMapItem)
857-
if len(clusters) == 1 {
855+
clusterGroups := serviceItemArrI.([]clusterGroupServiceMapItem)
856+
if len(clusterGroups) == 1 {
858857
//TODO: add strict mode to avoid incorrect group call
859-
c = clusters[0].cluster
858+
c = clusterGroups[0].clusterGroup
860859
return
861860
}
862861
group := request.GetAttachment(mpro.MGroup)
863862
if group == "" {
864-
err = fmt.Errorf("multiple clusters are matched with service: %s, but the group is empty", service)
863+
err = fmt.Errorf("multiple clusterGroups are matched with service: %s, but the group is empty", service)
865864
return
866865
}
867866
version := request.GetAttachment(mpro.MVersion)
868867
protocol := request.GetAttachment(mpro.MProxyProtocol)
869-
for _, j := range clusters {
868+
for _, j := range clusterGroups {
870869
if j.url.IsMatch(service, group, protocol, version) {
871-
c = j.cluster
870+
c = j.clusterGroup
872871
return
873872
}
874873
}
875-
err = fmt.Errorf("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
874+
err = fmt.Errorf("no clusterGroup matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
876875
return
877876
}
878877

@@ -1143,10 +1142,11 @@ func (a *AgentListener) NotifyCommand(registryURL *motan.URL, commandType int, c
11431142
}
11441143
}
11451144

1146-
a.agent.clusterMap.Range(func(k, v interface{}) bool {
1147-
cls := v.(*cluster.MotanCluster)
1148-
for _, registry := range cls.Registries {
1149-
if cr, ok := registry.(motan.CommandNotifyListener); ok {
1145+
a.agent.clusterGroupMap.Range(func(k, v interface{}) bool {
1146+
clusterGroup := v.(motan.ClusterGroup)
1147+
masterCls := clusterGroup.GetMasterCluster().(*cluster.MotanCluster)
1148+
for _, reg := range masterCls.Registries {
1149+
if cr, ok := reg.(motan.CommandNotifyListener); ok {
11501150
cr.NotifyCommand(registryURL, cluster.AgentCmd, commandInfo)
11511151
}
11521152
}
@@ -1306,7 +1306,7 @@ func (a *Agent) SubscribeService(url *motan.URL) error {
13061306
if urlExist(url, a.Context.RefersURLs) {
13071307
return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity())
13081308
}
1309-
a.initCluster(url)
1309+
a.initClusterGroup(url)
13101310
return nil
13111311
}
13121312

0 commit comments

Comments
 (0)