Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions src/backend/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -45,10 +46,12 @@ type ClusterManager struct {
}

type CacheIndexer struct {
stopChans chan struct{}
Pod kcache.Indexer
Event kcache.Indexer
Endpoints kcache.Indexer
stopChans chan struct{}
Pod kcache.Indexer
Event kcache.Indexer
Node kcache.Indexer
Deployment kcache.Indexer
Endpoints kcache.Indexer
}

func (c ClusterManager) Close() {
Expand Down Expand Up @@ -149,15 +152,27 @@ func buildCacheController(client *kubernetes.Clientset) *CacheIndexer {
eventIndexer, eventInformer := kcache.NewIndexerInformer(eventListWatcher, &v1.Event{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go eventInformer.Run(stopCh)

// create the deployment watcher
deploymentListWatcher := kcache.NewListWatchFromClient(client.AppsV1beta1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything())
deploymentIndexer, deploymentInformer := kcache.NewIndexerInformer(deploymentListWatcher, &v1beta1.Deployment{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go deploymentInformer.Run(stopCh)

// create the node watcher
nodeListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
nodeIndexer, nodeInformer := kcache.NewIndexerInformer(nodeListWatcher, &v1.Node{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go nodeInformer.Run(stopCh)

// create the endpoint watcher
endpointsListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", v1.NamespaceAll, fields.Everything())
endpointsIndexer, endpointsinformer := kcache.NewIndexerInformer(endpointsListWatcher, &v1.Endpoints{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go endpointsinformer.Run(stopCh)
return &CacheIndexer{
Pod: podIndexer,
Event: eventIndexer,
Endpoints: endpointsIndexer,
stopChans: stopCh,
Pod: podIndexer,
Event: eventIndexer,
Deployment: deploymentIndexer,
Node: nodeIndexer,
Endpoints: endpointsIndexer,
stopChans: stopCh,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *KubeDeploymentController) List() {

manager, err := client.Manager(cluster)
if err == nil {
result, err := deployment.GetDeploymentPage(manager.Client, manager.Indexer, namespace, param)
result, err := deployment.GetDeploymentPage(manager.Indexer, namespace, param)
if err != nil {
logs.Error("list kubernetes deployments error.", cluster, namespace, err)
c.HandleError(err)
Expand Down
25 changes: 12 additions & 13 deletions src/backend/controllers/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"encoding/json"
"sync"

"k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/Qihoo360/wayne/src/backend/client"
"github.com/Qihoo360/wayne/src/backend/controllers/base"
"github.com/Qihoo360/wayne/src/backend/resources/node"
"github.com/Qihoo360/wayne/src/backend/util/logs"
"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
)

type KubeNodeController struct {
Expand Down Expand Up @@ -44,21 +43,21 @@ func (c *KubeNodeController) NodeStatistics() {
countSyncMap := sync.Map{}
countMap := make(map[string]int)
if cluster == "" {
clients := client.Clients()
managers := client.Managers()
var errs []error
wg := sync.WaitGroup{}
for clu, cli := range clients {
for clu, manager := range managers {
wg.Add(1)
go func(clu string, cli *kubernetes.Clientset) {
go func(clu string, mang *client.ClusterManager) {
defer wg.Done()
count, err := node.GetNodeCounts(cli)
count, err := node.GetNodeCounts(mang.Indexer)
if err != nil {
logs.Error("get k8s nodes count error. %v", err.Error())
errs = append(errs, err)
}
total += count
countSyncMap.Store(clu, count)
}(clu, cli)
}(clu, manager)

}
wg.Wait()
Expand All @@ -71,9 +70,9 @@ func (c *KubeNodeController) NodeStatistics() {
return true
})
} else {
cli, err := client.Client(cluster)
manager, err := client.Manager(cluster)
if err == nil {
count, err := node.GetNodeCounts(cli)
count, err := node.GetNodeCounts(manager.Indexer)
if err != nil {
logs.Error("get k8s nodes count error. %v", err.Error())
c.HandleError(err)
Expand All @@ -94,9 +93,9 @@ func (c *KubeNodeController) NodeStatistics() {
// @router /clusters/:cluster [get]
func (c *KubeNodeController) List() {
cluster := c.Ctx.Input.Param(":cluster")
cli, err := client.Client(cluster)
manager, err := client.Manager(cluster)
if err == nil {
result, err := node.ListNode(cli, metaV1.ListOptions{})
result, err := node.ListNode(manager.Indexer)
if err != nil {
logs.Error("list node by cluster (%s) error.%v", cluster, err)
c.HandleError(err)
Expand Down
22 changes: 17 additions & 5 deletions src/backend/resources/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deployment
import (
"fmt"
"net/http"
"sort"

"k8s.io/api/apps/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -23,13 +24,24 @@ type Deployment struct {
Containers []string `json:"containers"`
}

func GetDeploymentList(cli *kubernetes.Clientset, namespace string, opts metaV1.ListOptions) ([]v1beta1.Deployment, error) {
deployments, err := cli.AppsV1beta1().Deployments(namespace).List(opts)
if err != nil {
return nil, err
func GetDeploymentList(indexer *client.CacheIndexer, namespace string) ([]v1beta1.Deployment, error) {
cacheDeployments := indexer.Deployment.List()
var deployments []v1beta1.Deployment
for _, e := range cacheDeployments {
cacheDeployment, ok := e.(*v1beta1.Deployment)
if !ok {
continue
}
if cacheDeployment.Namespace != namespace {
continue
}
deployments = append(deployments, *cacheDeployment)
}

return deployments.Items, nil
sort.Slice(deployments, func(i, j int) bool {
return deployments[i].Name > deployments[j].Name
})
return deployments, nil
}

func GetDeploymentResource(cli *kubernetes.Clientset, deployment *v1beta1.Deployment) (*common.ResourceList, error) {
Expand Down
6 changes: 2 additions & 4 deletions src/backend/resources/deployment/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"github.com/Qihoo360/wayne/src/backend/client"
"github.com/Qihoo360/wayne/src/backend/common"
"github.com/Qihoo360/wayne/src/backend/resources/dataselector"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

func GetDeploymentPage(cli *kubernetes.Clientset, indexer *client.CacheIndexer, namespace string, q *common.QueryParam) (*common.Page, error) {
kubeDeployments, err := GetDeploymentList(cli, namespace, metaV1.ListOptions{})
func GetDeploymentPage(indexer *client.CacheIndexer, namespace string, q *common.QueryParam) (*common.Page, error) {
kubeDeployments, err := GetDeploymentList(indexer, namespace)
if err != nil {
return nil, err
}
Expand Down
32 changes: 19 additions & 13 deletions src/backend/resources/node/node.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package node

import (
"sort"
"strconv"

"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/Qihoo360/wayne/src/backend/client"
)

type NodeStatistics struct {
Expand Down Expand Up @@ -36,23 +39,26 @@ type NodeStatus struct {
NodeInfo v1.NodeSystemInfo `json:"nodeInfo,omitempty"`
}

func GetNodeCounts(cli *kubernetes.Clientset) (int, error) {
nodes, err := cli.CoreV1().Nodes().List(metaV1.ListOptions{})
if err != nil {
return 0, err
}
return len(nodes.Items), nil
func GetNodeCounts(indexer *client.CacheIndexer) (int, error) {
nodeList := indexer.Node.List()
return len(nodeList), nil
}

func ListNode(cli *kubernetes.Clientset, listOptions metaV1.ListOptions) ([]Node, error) {
nodeList, err := cli.CoreV1().Nodes().List(listOptions)
if err != nil {
return nil, err
}
func ListNode(indexer *client.CacheIndexer) ([]Node, error) {
nodeList := indexer.Node.List()
nodes := make([]Node, 0)
for _, node := range nodeList.Items {
nodes = append(nodes, toNode(node))
for _, node := range nodeList {
cacheNode, ok := node.(*v1.Node)
if !ok {
continue
}
nodes = append(nodes, toNode(*cacheNode))
}

sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})

return nodes, nil
}

Expand Down