@@ -60,20 +60,21 @@ type Agent struct {
6060
6161 agentServer motan.Server
6262
63- clusterMap * motan.CopyOnWriteMap
64- httpClusterMap * motan.CopyOnWriteMap
65- status int64
66- agentURL * motan.URL
67- logdir string
68- port int
69- mport int
70- eport int
71- hport int
72- pidfile string
73- runtimedir string
63+ clusterGroupServiceMap * motan.CopyOnWriteMap
64+ clusterGroupMap * motan.CopyOnWriteMap
65+ httpClusterMap * motan.CopyOnWriteMap
66+
67+ status int64
68+ agentURL * motan.URL
69+ logdir string
70+ port int
71+ mport int
72+ eport int
73+ hport int
74+ pidfile string
75+ runtimedir string
7476
7577 serviceExporters * motan.CopyOnWriteMap
76- serviceMap * motan.CopyOnWriteMap
7778 agentPortServer map [int ]motan.Server
7879 serviceRegistries * motan.CopyOnWriteMap
7980 httpProxyServer * mserver.HTTPProxyServer
@@ -95,9 +96,9 @@ type CommandHandler interface {
9596 Serve () (currentCommandInfo string )
9697}
9798
98- type serviceMapItem struct {
99- url * motan.URL
100- cluster * cluster. MotanCluster
99+ type clusterGroupServiceMapItem struct {
100+ url * motan.URL
101+ clusterGroup motan. ClusterGroup
101102}
102103
103104func NewAgent (extfactory motan.ExtensionFactory ) * Agent {
@@ -108,14 +109,14 @@ func NewAgent(extfactory motan.ExtensionFactory) *Agent {
108109 } else {
109110 agent = & Agent {extFactory : extfactory }
110111 }
111- agent .clusterMap = motan .NewCopyOnWriteMap ()
112+ agent .clusterGroupMap = motan .NewCopyOnWriteMap ()
112113 agent .httpClusterMap = motan .NewCopyOnWriteMap ()
113114 agent .serviceExporters = motan .NewCopyOnWriteMap ()
114115 agent .agentPortServer = make (map [int ]motan.Server )
115116 agent .serviceRegistries = motan .NewCopyOnWriteMap ()
116117 agent .manageHandlers = make (map [string ]http.Handler )
117118 agent .envHandlers = make (map [string ]map [string ]http.Handler )
118- agent .serviceMap = motan .NewCopyOnWriteMap ()
119+ agent .clusterGroupServiceMap = motan .NewCopyOnWriteMap ()
119120 return agent
120121}
121122
@@ -213,7 +214,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
213214 metrics .StartReporter (a .Context )
214215 a .registerStatusSampler ()
215216 a .initStatus ()
216- a .initClusters ()
217+ a .initClusterGroups ()
217218 a .startServerAgent ()
218219 a .initHTTPClusters ()
219220 a .startHTTPAgent ()
@@ -497,11 +498,10 @@ func (h *httpClusterGetter) GetHTTPCluster(host string) *cluster.HTTPCluster {
497498func (a * Agent ) reloadClusters (ctx * motan.Context ) {
498499 a .clsLock .Lock ()
499500 defer a .clsLock .Unlock ()
500-
501501 a .Context = ctx
502502
503503 serviceItemKeep := make (map [string ]bool )
504- clusterMap := make (map [interface {}]interface {})
504+ clusterGroupMap := make (map [interface {}]interface {})
505505 serviceMap := make (map [interface {}]interface {})
506506 var allRefersURLs []* motan.URL
507507 if a .configurer != nil {
@@ -521,10 +521,10 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
521521 service := url .Path
522522 mapKey := getClusterKey (url .Group , url .GetStringParamsWithDefault (motan .VersionKey , motan .DefaultReferVersion ), url .Protocol , url .Path )
523523
524- // find exists old serviceMap
525- var serviceMapValue serviceMapItem
526- if v , exists := a .serviceMap .Load (service ); exists {
527- vItems := v .([]serviceMapItem )
524+ // find exists old clusterGroupMap
525+ var serviceMapValue clusterGroupServiceMapItem
526+ if v , exists := a .clusterGroupServiceMap .Load (service ); exists {
527+ vItems := v .([]clusterGroupServiceMapItem )
528528
529529 for _ , vItem := range vItems {
530530 urlExtInfo := url .ToExtInfo ()
@@ -536,51 +536,51 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
536536 }
537537 }
538538
539- // new serviceMap & cluster
539+ // new clusterGroupMap & clusterGroup
540540 if serviceMapValue .url == nil {
541541 vlog .Infoln ("hot create service:" + url .ToExtInfo ())
542- c := cluster .NewCluster (a .Context , a .extFactory , url , true )
543- serviceMapValue = serviceMapItem {
544- url : url ,
545- cluster : c ,
542+ cg := cluster .NewClusterGroup (a .Context , a .extFactory , url , true )
543+ serviceMapValue = clusterGroupServiceMapItem {
544+ url : url ,
545+ clusterGroup : cg ,
546546 }
547547 }
548- clusterMap [mapKey ] = serviceMapValue .cluster
548+ clusterGroupMap [mapKey ] = serviceMapValue .clusterGroup
549549
550- var serviceMapItemArr []serviceMapItem
550+ var serviceMapItemArr []clusterGroupServiceMapItem
551551 if v , exists := serviceMap [service ]; exists {
552- serviceMapItemArr = v .([]serviceMapItem )
552+ serviceMapItemArr = v .([]clusterGroupServiceMapItem )
553553 }
554554 serviceMapItemArr = append (serviceMapItemArr , serviceMapValue )
555555 serviceMap [url .Path ] = serviceMapItemArr
556556 }
557557
558- oldServiceMap := a .serviceMap .Swap (serviceMap )
559- a .clusterMap .Swap (clusterMap )
558+ oldServiceMap := a .clusterGroupServiceMap .Swap (serviceMap )
559+ a .clusterGroupMap .Swap (clusterGroupMap )
560560
561561 // diff and destroy service
562562 for _ , v := range oldServiceMap {
563- vItems := v .([]serviceMapItem )
563+ vItems := v .([]clusterGroupServiceMapItem )
564564 for _ , item := range vItems {
565565 if _ , ok := serviceItemKeep [item .url .ToExtInfo ()]; ! ok {
566566 vlog .Infoln ("hot destroy service:" + item .url .ToExtInfo ())
567- item .cluster .Destroy ()
567+ item .clusterGroup .Destroy ()
568568 }
569569 }
570570 }
571571}
572572
573- func (a * Agent ) initClusters () {
573+ func (a * Agent ) initClusterGroups () {
574574 initTimeout := a .Context .AgentURL .GetIntValue (motan .InitClusterTimeoutKey , defaultInitClusterTimeout )
575575 timer := time .NewTimer (time .Millisecond * time .Duration (initTimeout ))
576576 wg := sync.WaitGroup {}
577577 wg .Add (len (a .Context .RefersURLs ))
578578 for _ , url := range a .Context .RefersURLs {
579- // concurrently initialize cluster
579+ // concurrently initialize clusterGroup
580580 go func (u * motan.URL ) {
581581 defer wg .Done ()
582582 defer motan .HandlePanic (nil )
583- a .initCluster (u )
583+ a .initClusterGroup (u )
584584 }(url )
585585 }
586586 finishChan := make (chan struct {})
@@ -590,38 +590,37 @@ func (a *Agent) initClusters() {
590590 }()
591591 select {
592592 case <- timer .C :
593- vlog .Infof ("agent init cluster timeout(%dms), do not wait(rest cluster keep doing initialization backend)" , initTimeout )
593+ vlog .Infof ("agent init clusterGroup timeout(%dms), do not wait(rest clusterGroup keep doing initialization backend)" , initTimeout )
594594 case <- finishChan :
595595 defer timer .Stop ()
596- vlog .Infoln ("agent cluster init complete" )
596+ vlog .Infoln ("agent clusterGroup init complete" )
597597 }
598598}
599599
600- func (a * Agent ) initCluster (url * motan.URL ) {
600+ func (a * Agent ) initClusterGroup (url * motan.URL ) {
601601 if url .Parameters [motan .ApplicationKey ] == "" {
602602 url .Parameters [motan .ApplicationKey ] = a .agentURL .Parameters [motan .ApplicationKey ]
603603 }
604-
605- c := cluster .NewCluster (a .Context , a .extFactory , url , true )
606- item := serviceMapItem {
607- url : url ,
608- cluster : c ,
604+ cg := cluster .NewClusterGroup (a .Context , a .extFactory , url , true )
605+ item := clusterGroupServiceMapItem {
606+ url : url ,
607+ clusterGroup : cg ,
609608 }
610609 service := url .Path
611- a .serviceMap .SafeDoFunc (func () {
612- var serviceMapItemArr []serviceMapItem
613- if v , exists := a .serviceMap .Load (service ); exists {
614- serviceMapItemArr = v .([]serviceMapItem )
610+ a .clusterGroupServiceMap .SafeDoFunc (func () {
611+ var serviceMapItemArr []clusterGroupServiceMapItem
612+ if v , exists := a .clusterGroupServiceMap .Load (service ); exists {
613+ serviceMapItemArr = v .([]clusterGroupServiceMapItem )
615614 serviceMapItemArr = append (serviceMapItemArr , item )
616615 } else {
617- serviceMapItemArr = []serviceMapItem {item }
616+ serviceMapItemArr = []clusterGroupServiceMapItem {item }
618617 }
619- a .serviceMap .UnsafeStore (url .Path , serviceMapItemArr )
618+ a .clusterGroupServiceMap .UnsafeStore (url .Path , serviceMapItemArr )
620619 })
621620 mapKey := getClusterKey (url .Group , url .GetStringParamsWithDefault (motan .VersionKey , motan .DefaultReferVersion ), url .Protocol , url .Path )
622621 a .clsLock .Lock () // Mutually exclusive with the reloadClusters method
623622 defer a .clsLock .Unlock ()
624- a .clusterMap .Store (mapKey , c )
623+ a .clusterGroupMap .Store (mapKey , cg )
625624}
626625
627626func (a * Agent ) SetSanpshotConf () {
@@ -726,13 +725,13 @@ func (a *agentMessageHandler) GetRuntimeInfo() map[string]interface{} {
726725 return info
727726}
728727
729- func (a * agentMessageHandler ) clusterCall (request motan.Request , ck string , motanCluster * cluster. MotanCluster ) (res motan.Response ) {
728+ func (a * agentMessageHandler ) clusterCall (request motan.Request , ck string , motanClusterGroup motan. ClusterGroup ) (res motan.Response ) {
730729 // fill default request info
731- fillDefaultReqInfo (request , motanCluster .GetURL ())
732- res = motanCluster .Call (request )
730+ fillDefaultReqInfo (request , motanClusterGroup .GetURL ())
731+ res = motanClusterGroup .Call (request )
733732 if res == nil {
734- vlog .Warningf ("motanCluster Call return nil. cluster :%s" , ck )
735- res = getDefaultResponse (request .GetRequestID (), "motanCluster Call return nil. cluster :" + ck )
733+ vlog .Warningf ("motanCluster Call return nil. clusterGroup :%s" , ck )
734+ res = getDefaultResponse (request .GetRequestID (), "motanCluster Call return nil. clusterGroup :" + ck )
736735 }
737736 return res
738737}
@@ -759,8 +758,8 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
759758 }
760759 res = httpCluster .Call (request )
761760 if res == nil {
762- vlog .Warningf ("httpCluster Call return nil. cluster :%s" , ck )
763- return getDefaultResponse (request .GetRequestID (), "httpCluster Call return nil. cluster :" + ck )
761+ vlog .Warningf ("httpCluster Call return nil. clusterGroup :%s" , ck )
762+ return getDefaultResponse (request .GetRequestID (), "httpCluster Call return nil. clusterGroup :" + ck )
764763 }
765764 }
766765 // has response and response not a no endpoint exception
@@ -826,7 +825,7 @@ func fillDefaultReqInfo(r motan.Request, url *motan.URL) {
826825}
827826
828827func (a * agentMessageHandler ) Call (request motan.Request ) (res motan.Response ) {
829- c , ck , err := a .findCluster (request )
828+ c , ck , err := a .findClusterGroup (request )
830829 if err == nil {
831830 res = a .clusterCall (request , ck , c )
832831 } else if httpCluster := a .agent .httpClusterMap .LoadOrNil (request .GetServiceName ()); httpCluster != nil {
@@ -842,37 +841,37 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
842841 return res
843842}
844843
845- func (a * agentMessageHandler ) findCluster (request motan.Request ) (c * cluster. MotanCluster , key string , err error ) {
844+ func (a * agentMessageHandler ) findClusterGroup (request motan.Request ) (c motan. ClusterGroup , key string , err error ) {
846845 service := request .GetServiceName ()
847846 if service == "" {
848847 err = fmt .Errorf ("empty service is not supported. service: %s" , service )
849848 return
850849 }
851- serviceItemArrI , exists := a .agent .serviceMap .Load (service )
850+ serviceItemArrI , exists := a .agent .clusterGroupServiceMap .Load (service )
852851 if ! exists {
853- err = fmt .Errorf ("cluster not found. service: %s" , service )
852+ err = fmt .Errorf ("clusterGroup not found. service: %s" , service )
854853 return
855854 }
856- clusters := serviceItemArrI .([]serviceMapItem )
857- if len (clusters ) == 1 {
855+ clusterGroups := serviceItemArrI .([]clusterGroupServiceMapItem )
856+ if len (clusterGroups ) == 1 {
858857 //TODO: add strict mode to avoid incorrect group call
859- c = clusters [0 ].cluster
858+ c = clusterGroups [0 ].clusterGroup
860859 return
861860 }
862861 group := request .GetAttachment (mpro .MGroup )
863862 if group == "" {
864- err = fmt .Errorf ("multiple clusters are matched with service: %s, but the group is empty" , service )
863+ err = fmt .Errorf ("multiple clusterGroups are matched with service: %s, but the group is empty" , service )
865864 return
866865 }
867866 version := request .GetAttachment (mpro .MVersion )
868867 protocol := request .GetAttachment (mpro .MProxyProtocol )
869- for _ , j := range clusters {
868+ for _ , j := range clusterGroups {
870869 if j .url .IsMatch (service , group , protocol , version ) {
871- c = j .cluster
870+ c = j .clusterGroup
872871 return
873872 }
874873 }
875- err = fmt .Errorf ("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}" , service , group , protocol , version )
874+ err = fmt .Errorf ("no clusterGroup matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}" , service , group , protocol , version )
876875 return
877876}
878877
@@ -1143,10 +1142,11 @@ func (a *AgentListener) NotifyCommand(registryURL *motan.URL, commandType int, c
11431142 }
11441143 }
11451144
1146- a .agent .clusterMap .Range (func (k , v interface {}) bool {
1147- cls := v .(* cluster.MotanCluster )
1148- for _ , registry := range cls .Registries {
1149- if cr , ok := registry .(motan.CommandNotifyListener ); ok {
1145+ a .agent .clusterGroupMap .Range (func (k , v interface {}) bool {
1146+ clusterGroup := v .(motan.ClusterGroup )
1147+ masterCls := clusterGroup .GetMasterCluster ().(* cluster.MotanCluster )
1148+ for _ , reg := range masterCls .Registries {
1149+ if cr , ok := reg .(motan.CommandNotifyListener ); ok {
11501150 cr .NotifyCommand (registryURL , cluster .AgentCmd , commandInfo )
11511151 }
11521152 }
@@ -1306,7 +1306,7 @@ func (a *Agent) SubscribeService(url *motan.URL) error {
13061306 if urlExist (url , a .Context .RefersURLs ) {
13071307 return fmt .Errorf ("url exist, ignore subscribe, url: %s" , url .GetIdentity ())
13081308 }
1309- a .initCluster (url )
1309+ a .initClusterGroup (url )
13101310 return nil
13111311}
13121312
0 commit comments