Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/backend/client/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/listers/apps/v1beta1"
"k8s.io/client-go/listers/core/v1"
)

type CacheFactory struct {
stopChan chan struct{}
sharedInformerFactory informers.SharedInformerFactory
}

func (c ClusterManager) Close() {
close(c.CacheFactory.stopChan)
}

func buildCacheController(client *kubernetes.Clientset) *CacheFactory {
stop := make(chan struct{})
sharedInformerFactory := informers.NewSharedInformerFactory(client, defaultResyncPeriod)

// Resources that need to be cached are started here
go sharedInformerFactory.Core().V1().Events().Informer().Run(stop)
go sharedInformerFactory.Core().V1().Pods().Informer().Run(stop)
go sharedInformerFactory.Apps().V1beta1().Deployments().Informer().Run(stop)
go sharedInformerFactory.Core().V1().Nodes().Informer().Run(stop)
go sharedInformerFactory.Core().V1().Endpoints().Informer().Run(stop)

sharedInformerFactory.Start(stop)

return &CacheFactory{
stopChan: stop,
sharedInformerFactory: sharedInformerFactory,
}
}

func (c *CacheFactory) PodLister() v1.PodLister {
return c.sharedInformerFactory.Core().V1().Pods().Lister()
}

func (c *CacheFactory) EventLister() v1.EventLister {
return c.sharedInformerFactory.Core().V1().Events().Lister()
}

func (c *CacheFactory) DeploymentLister() v1beta1.DeploymentLister {
return c.sharedInformerFactory.Apps().V1beta1().Deployments().Lister()
}

func (c *CacheFactory) NodeLister() v1.NodeLister {
return c.sharedInformerFactory.Core().V1().Nodes().Lister()
}

func (c *CacheFactory) EndpointLister() v1.EndpointsLister {
return c.sharedInformerFactory.Core().V1().Endpoints().Lister()
}
161 changes: 50 additions & 111 deletions src/backend/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package client
import (
"encoding/json"
"errors"
"sync"
"time"

"k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
clientcmdapiv1 "k8s.io/client-go/tools/clientcmd/api/v1"

"github.com/Qihoo360/wayne/src/backend/models"
"github.com/Qihoo360/wayne/src/backend/util/logs"
"github.com/Qihoo360/wayne/src/backend/util/maps"
)

const (
Expand All @@ -35,27 +33,14 @@ var (
)

var (
clusterManagerSets = make(map[string]*ClusterManager)
clusterManagerSets = &sync.Map{}
)

type ClusterManager struct {
Cluster *models.Cluster
Client *kubernetes.Clientset
Config *rest.Config
Indexer *CacheIndexer
}

type CacheIndexer struct {
stopChans chan struct{}
Pod kcache.Indexer
Event kcache.Indexer
Node kcache.Indexer
Deployment kcache.Indexer
Endpoints kcache.Indexer
}

func (c ClusterManager) Close() {
c.Indexer.stopChans <- struct{}{}
Cluster *models.Cluster
Client *kubernetes.Clientset
Config *rest.Config
CacheFactory *CacheFactory
}

func BuildApiserverClient() {
Expand All @@ -68,11 +53,12 @@ func BuildApiserverClient() {
changed := clusterChanged(newClusters)
if changed {
logs.Info("cluster changed, so resync info...")

shouldRemoveClusters(newClusters)
// build new clientManager
newClusterManagerSets := make(map[string]*ClusterManager)
for i := 0; i < len(newClusters); i++ {
cluster := newClusters[i]

// deal with deleted cluster
if cluster.Master == "" {
logs.Warning("cluster's master is null:%s", cluster.Name)
continue
Expand All @@ -83,44 +69,56 @@ func BuildApiserverClient() {
continue
}

cacheIndexer := buildCacheController(clientSet)
cacheFactory := buildCacheController(clientSet)
clusterManager := &ClusterManager{
Client: clientSet,
Config: config,
Cluster: &cluster,
Indexer: cacheIndexer,
Client: clientSet,
Config: config,
Cluster: &cluster,
CacheFactory: cacheFactory,
}
managerInterface, ok := clusterManagerSets.Load(cluster.Name)
if ok {
manager := managerInterface.(*ClusterManager)
manager.Close()
}
newClusterManagerSets[cluster.Name] = clusterManager

clusterManagerSets.Store(cluster.Name, clusterManager)
}
// stop all old cacheController
stopAllCacheController()
clusterManagerSets = newClusterManagerSets
}

}

func stopAllCacheController() {
// TODO 停止之后,controller 仍然会定期list维护中的集群资源,需要解决
for _, manager := range clusterManagerSets {
manager.Close()
// deal with deleted cluster
func shouldRemoveClusters(changedClusters []models.Cluster) {
changedClusterMap := make(map[string]struct{})
for _, cluster := range changedClusters {
changedClusterMap[cluster.Name] = struct{}{}
}

clusterManagerSets.Range(func(key, value interface{}) bool {
if _, ok := changedClusterMap[key.(string)]; !ok {
managerInterface, _ := clusterManagerSets.Load(key)
manager := managerInterface.(*ClusterManager)
manager.Close()
clusterManagerSets.Delete(key)
}
return true
})
}

func clusterChanged(clusters []models.Cluster) bool {
if len(clusterManagerSets) == 0 {
return true
}
if len(clusterManagerSets) != len(clusters) {
if maps.SyncMapLen(clusterManagerSets) != len(clusters) {
logs.Info("cluster length (%d) changed to (%d).", maps.SyncMapLen(clusterManagerSets), len(clusters))
return true
}

for _, cluster := range clusters {
manager, ok := clusterManagerSets[cluster.Name]
managerInterface, ok := clusterManagerSets.Load(cluster.Name)
if !ok {
// maybe add new cluster
return true
}
manager := managerInterface.(*ClusterManager)
// master changed, the cluster is changed, ignore others
if manager.Cluster.Master != cluster.Master {
logs.Info("cluster master (%s) changed to (%s).", manager.Cluster.Master, cluster.Master)
Expand All @@ -140,80 +138,29 @@ func clusterChanged(clusters []models.Cluster) bool {
return false
}

func buildCacheController(client *kubernetes.Clientset) *CacheIndexer {
stopCh := make(chan struct{})
// create the pod watcher
podListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
podIndexer, podInformer := kcache.NewIndexerInformer(podListWatcher, &v1.Pod{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go podInformer.Run(stopCh)

// create the event watcher
eventListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "events", v1.NamespaceAll, fields.Everything())
eventIndexer, eventInformer := kcache.NewIndexerInformer(eventListWatcher, &v1.Event{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go eventInformer.Run(stopCh)

// create the deployment watcher
deploymentListWatcher := kcache.NewListWatchFromClient(client.AppsV1beta1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything())
deploymentIndexer, deploymentInformer := kcache.NewIndexerInformer(deploymentListWatcher, &v1beta1.Deployment{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go deploymentInformer.Run(stopCh)

// create the node watcher
nodeListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
nodeIndexer, nodeInformer := kcache.NewIndexerInformer(nodeListWatcher, &v1.Node{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go nodeInformer.Run(stopCh)

// create the endpoint watcher
endpointsListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", v1.NamespaceAll, fields.Everything())
endpointsIndexer, endpointsinformer := kcache.NewIndexerInformer(endpointsListWatcher, &v1.Endpoints{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go endpointsinformer.Run(stopCh)
return &CacheIndexer{
Pod: podIndexer,
Event: eventIndexer,
Deployment: deploymentIndexer,
Node: nodeIndexer,
Endpoints: endpointsIndexer,
stopChans: stopCh,
}
}

func Cluster(cluster string) (*models.Cluster, error) {
manager, exist := clusterManagerSets[cluster]
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
if !exist {
return nil, ErrNotExist
}
}
if manager.Cluster.Status == models.ClusterStatusMaintaining {
return nil, ErrMaintaining
manager, err := Manager(cluster)
if err != nil {
return nil, err
}
return manager.Cluster, nil
}

func Client(cluster string) (*kubernetes.Clientset, error) {
manager, exist := clusterManagerSets[cluster]
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
if !exist {
return nil, ErrNotExist
}
}
if manager.Cluster.Status == models.ClusterStatusMaintaining {
return nil, ErrMaintaining
manager, err := Manager(cluster)
if err != nil {
return nil, err
}
return manager.Client, nil
}

func Manager(cluster string) (*ClusterManager, error) {
manager, exist := clusterManagerSets[cluster]
managerInterface, exist := clusterManagerSets.Load(cluster)
manager := managerInterface.(*ClusterManager)
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
_, exist = clusterManagerSets.Load(cluster)
if !exist {
return nil, ErrNotExist
}
Expand All @@ -224,15 +171,7 @@ func Manager(cluster string) (*ClusterManager, error) {
return manager, nil
}

func Clients() map[string]*kubernetes.Clientset {
clientSets := map[string]*kubernetes.Clientset{}
for cluster, cManager := range clusterManagerSets {
clientSets[cluster] = cManager.Client
}
return clientSets
}

func Managers() map[string]*ClusterManager {
func Managers() *sync.Map {
return clusterManagerSets
}

Expand Down
5 changes: 3 additions & 2 deletions src/backend/cmd/apiserver/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package apiserver

import (
"github.com/Qihoo360/wayne/src/backend/initial"
_ "github.com/Qihoo360/wayne/src/backend/routers"
"github.com/astaxie/beego"
"github.com/spf13/cobra"

"github.com/Qihoo360/wayne/src/backend/initial"
_ "github.com/Qihoo360/wayne/src/backend/routers"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion src/backend/controllers/kubernetes/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *KubeDaemonSetController) Get() {
name := c.Ctx.Input.Param(":daemonSet")
manager, err := client.Manager(cluster)
if err == nil {
result, err := daemonset.GetDaemonSetDetail(manager.Client, manager.Indexer, name, namespace)
result, err := daemonset.GetDaemonSetDetail(manager.Client, manager.CacheFactory, name, namespace)
if err != nil {
logs.Error("get kubernetes daemonSet detail error.", cluster, namespace, name, err)
c.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions src/backend/controllers/kubernetes/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *KubeDeploymentController) List() {

manager, err := client.Manager(cluster)
if err == nil {
result, err := deployment.GetDeploymentPage(manager.Indexer, namespace, param)
result, err := deployment.GetDeploymentPage(manager.CacheFactory, namespace, param)
if err != nil {
logs.Error("list kubernetes deployments error.", cluster, namespace, err)
c.HandleError(err)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *KubeDeploymentController) GetDetail() {
name := c.Ctx.Input.Param(":deployment")
manager, err := client.Manager(cluster)
if err == nil {
result, err := deployment.GetDeploymentDetail(manager.Client, manager.Indexer, name, namespace)
result, err := deployment.GetDeploymentDetail(manager.Client, manager.CacheFactory, name, namespace)
if err != nil {
logs.Error("get kubernetes deployment detail error.", cluster, namespace, name, err)
c.HandleError(err)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/controllers/kubernetes/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *KubeJobController) GetPodsEvent() {
cronjobName := c.Ctx.Input.Param(":cronjob")
manager, err := client.Manager(cluster)
if err == nil {
result, err := job.GetPodsEvent(manager.Client, manager.Indexer, namespace, name, cronjobName)
result, err := job.GetPodsEvent(manager.Client, manager.CacheFactory, namespace, name, cronjobName)
if err != nil {
logs.Error("get kubernetes job pods event error.", cluster, namespace, name, err)
c.HandleError(err)
Expand Down
Loading