Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/backend/controllers/kubernetes/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package event

import (
"fmt"
"net/http"

"github.com/Qihoo360/wayne/src/backend/client/api"
"github.com/Qihoo360/wayne/src/backend/common"
"github.com/Qihoo360/wayne/src/backend/controllers/base"
"github.com/Qihoo360/wayne/src/backend/models"
erroresult "github.com/Qihoo360/wayne/src/backend/models/response/errors"
"github.com/Qihoo360/wayne/src/backend/resources/event"
"github.com/Qihoo360/wayne/src/backend/util/logs"
)

type KubeEventController struct {
base.APIController
}

func (c *KubeEventController) URLMapping() {
c.Mapping("List", c.List)
}

func (c *KubeEventController) Prepare() {
// Check administration
c.APIController.Prepare()

methodActionMap := map[string]string{
"List": models.PermissionRead,
}
_, method := c.GetControllerAndAction()
c.PreparePermission(methodActionMap, method, models.PermissionTypeKubePod)
}

// @Title GetPodEvent
// @Description Get Pod Event by resource type and name
// @Param pageNo query int false "the page current no"
// @Param pageSize query int false "the page size"
// @Param type query string true "the query type. deployments, statefulsets, daemonsets,cronjobs"
// @Param name query string true "the query resource name."
// @Success 200 {object} models.Deployment success
// @router /namespaces/:namespace/clusters/:cluster [get]
func (c *KubeEventController) List() {
cluster := c.Ctx.Input.Param(":cluster")
namespace := c.Ctx.Input.Param(":namespace")
resourceType := c.Input().Get("type")
resourceName := c.Input().Get("name")
param := c.BuildKubernetesQueryParam()
manager := c.Manager(cluster)
var result *common.Page
var err error
switch resourceType {
case api.ResourceNameCronJob:
result, err = event.GetPodsEventByCronJobPage(manager.KubeClient, namespace, resourceName, param)
default:
err = &erroresult.ErrorResult{
Code: http.StatusBadRequest,
Msg: fmt.Sprintf("Unsupported resource type (%s). ", resourceType),
}
}
if err != nil {
logs.Error("Get kubernetes events by type error.", cluster, namespace, resourceType, resourceName, err)
c.HandleError(err)
return
}
c.Success(result)
}
68 changes: 16 additions & 52 deletions src/backend/controllers/kubernetes/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package job

import (
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/Qihoo360/wayne/src/backend/controllers/base"
"github.com/Qihoo360/wayne/src/backend/models"
Expand All @@ -20,76 +19,41 @@ type ClusterJob struct {
}

func (c *KubeJobController) URLMapping() {
c.Mapping("ListAllClusterByCronjob", c.ListAllClusterByCronjob)
c.Mapping("GetPodsEvent", c.GetPodsEvent)
c.Mapping("ListJobByCronJob", c.ListJobByCronJob)
}

func (c *KubeJobController) Prepare() {
// Check administration
c.APIController.Prepare()

methodActionMap := map[string]string{
"ListAllClusterByCronjob": models.PermissionRead,
"GetPodsEvent": models.PermissionRead,
"ListJobByCronJob": models.PermissionRead,
"GetEvent": models.PermissionRead,
}
_, method := c.GetControllerAndAction()
c.PreparePermission(methodActionMap, method, models.PermissionTypeKubeJob)
}

// @Title Get
// @Description find Job by cluster
// @Success 200 {object} models.Job success
// @router /listAllClusterByCronjob/:cronjob/namespaces/:namespace [get]
func (c *KubeJobController) ListAllClusterByCronjob() {
namespace := c.Ctx.Input.Param(":namespace")
cronjobName := c.Ctx.Input.Param(":cronjob")
clusters, err := models.ClusterModel.GetNames(false)
if err != nil {
logs.Error("get cluster error. %v", err)
c.HandleError(err)
return
}
var allJobs []ClusterJob
for _, cluster := range clusters {
cli := c.Client(cluster.Name)

jobs, err := job.GetJobsByCronjobName(cli, namespace, cronjobName)
if err != nil {
if errors.IsNotFound(err) {
continue
}
logs.Error("get job by cronjob (%s) and cluster (%s) error.%v", cronjobName, cluster, err)
c.HandleError(err)
return
}
for _, job := range jobs {
oneJob := ClusterJob{
Job: job,
Cluster: cluster.Name,
}
allJobs = append(allJobs, oneJob)
}
}
c.Success(allJobs)
}

// @Title Get
// @Description find job by cluster
// @Success 200 {object} models.Job success
// @router /getPodsEvent/:job/:cronjob/namespaces/:namespace/clusters/:cluster [get]
func (c *KubeJobController) GetPodsEvent() {
// @Title ListJobByCronJob
// @Description find jobs by cronjob
// @Param pageNo query int false "the page current no"
// @Param pageSize query int false "the page size"
// @Param name query string true "the cronjob name."
// @Param cluster query string true "the cluster name."
// @Success 200 {object} models.Deployment success
// @router /namespaces/:namespace/clusters/:cluster [get]
func (c *KubeJobController) ListJobByCronJob() {
cluster := c.Ctx.Input.Param(":cluster")
namespace := c.Ctx.Input.Param(":namespace")
name := c.Ctx.Input.Param(":job")
cronjobName := c.Ctx.Input.Param(":cronjob")
cronJob := c.Input().Get("name")
param := c.BuildKubernetesQueryParam()
manager := c.Manager(cluster)
result, err := job.GetRelatedJobByCronJob(manager.KubeClient, namespace, cronJob, param)

result, err := job.GetPodsEvent(manager.Client, manager.CacheFactory, namespace, name, cronjobName)
if err != nil {
logs.Error("get kubernetes job pods event error.", cluster, namespace, name, err)
logs.Error("Get kubernetes Job by CronJob error.", cluster, namespace, cronJob, err)
c.HandleError(err)
return
}
c.Success(result)

}
41 changes: 41 additions & 0 deletions src/backend/controllers/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (c *KubePodController) URLMapping() {
c.Mapping("PodStatistics", c.PodStatistics)
c.Mapping("List", c.List)
c.Mapping("Terminal", c.Terminal)
c.Mapping("GetEvent", c.GetEvent)
}

func (c *KubePodController) Prepare() {
Expand Down Expand Up @@ -133,3 +134,43 @@ func (c *KubePodController) List() {
}
c.Success(result)
}

// @Title GetPodEvent
// @Description Get Pod Event by resource type and name
// @Param pageNo query int false "the page current no"
// @Param pageSize query int false "the page size"
// @Param type query string true "the query type. deployments, statefulsets, daemonsets,cronjobs"
// @Param name query string true "the query resource name."
// @Success 200 {object} models.Deployment success
// @router /events/namespaces/:namespace/clusters/:cluster [get]
func (c *KubePodController) GetEvent() {
cluster := c.Ctx.Input.Param(":cluster")
namespace := c.Ctx.Input.Param(":namespace")
resourceType := c.Input().Get("type")
resourceName := c.Input().Get("name")
param := c.BuildKubernetesQueryParam()
manager := c.Manager(cluster)
var result *common.Page
var err error
switch resourceType {
case api.ResourceNameDeployment:
result, err = pod.GetPodsByDeploymentPage(manager.KubeClient, namespace, resourceName, param)
case api.ResourceNameStatefulSet:
result, err = pod.GetRelatedPodByType(manager.KubeClient, namespace, resourceName, api.ResourceNameStatefulSet, param)
case api.ResourceNameDaemonSet:
result, err = pod.GetRelatedPodByType(manager.KubeClient, namespace, resourceName, api.ResourceNameDaemonSet, param)
case api.ResourceNameCronJob:
result, err = pod.GetRelatedPodByType(manager.KubeClient, namespace, resourceName, api.ResourceNameJob, param)
default:
err = &erroresult.ErrorResult{
Code: http.StatusBadRequest,
Msg: fmt.Sprintf("Unsupported resource type (%s). ", resourceType),
}
}
if err != nil {
logs.Error("Get kubernetes pod by type error.", cluster, namespace, resourceType, resourceName, err)
c.HandleError(err)
return
}
c.Success(result)
}
24 changes: 24 additions & 0 deletions src/backend/resources/event/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package event

import (
"github.com/Qihoo360/wayne/src/backend/resources/common"
"github.com/Qihoo360/wayne/src/backend/resources/dataselector"
)

// implements dataselector.DataCell
type ObjectCell common.Event

// implements dataselector.DataCell
func (cell ObjectCell) GetProperty(name dataselector.PropertyName) dataselector.ComparableValue {
switch name {
case dataselector.NameProperty:
return dataselector.StdComparableString(cell.ObjectMeta.Name)
case dataselector.CreationTimestampProperty:
return dataselector.StdComparableTime(cell.ObjectMeta.CreationTimestamp.Time)
case dataselector.NamespaceProperty:
return dataselector.StdComparableString(cell.ObjectMeta.Namespace)
default:
// if name is not supported then just return a constant dummy value, sort will have no effect.
return nil
}
}
109 changes: 109 additions & 0 deletions src/backend/resources/event/event.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package event

import (
"fmt"
"sort"
"strings"

batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"

"github.com/Qihoo360/wayne/src/backend/client"
"github.com/Qihoo360/wayne/src/backend/client/api"
basecommon "github.com/Qihoo360/wayne/src/backend/common"
"github.com/Qihoo360/wayne/src/backend/resources/common"
"github.com/Qihoo360/wayne/src/backend/resources/dataselector"
"github.com/Qihoo360/wayne/src/backend/util/slice"
)

// FailedReasonPartials is an array of partial strings to correctly filter warning events.
Expand Down Expand Up @@ -59,6 +66,108 @@ func GetPodsWarningEvents(indexer *client.CacheFactory, pods []*apiv1.Pod) ([]co
return result, nil
}

func getPodsWarningEventsPage(kubeClient client.ResourceHandler, pods []*apiv1.Pod) ([]common.Event, error) {
eventObjs, err := kubeClient.List(api.ResourceNameEvent, "", labels.Everything().String())
if err != nil {
return nil, err
}

events := make([]*apiv1.Event, 0)
for _, obj := range eventObjs {
events = append(events, obj.(*apiv1.Event))
}

result := make([]common.Event, 0)

// Filter out only warning events
events = getWarningEvents(events)
failedPods := make([]*apiv1.Pod, 0)

// Filter out ready and successful pods
for _, pod := range pods {
if !isReadyOrSucceeded(pod) {
failedPods = append(failedPods, pod)
}
}

// Filter events by failed pods UID
events = filterEventsByPodsUID(events, failedPods)
events = removeDuplicates(events)

for _, event := range events {
result = append(result, common.Event{
Message: event.Message,
Reason: event.Reason,
Type: event.Type,
FirstSeen: event.FirstTimestamp,
LastSeen: event.LastTimestamp,
Count: event.Count,
SourceComponent: event.Source.Component,
Name: event.InvolvedObject.Name,
})
}

return result, nil
}

func GetPodsEventByCronJobPage(kubeClient client.ResourceHandler, namespace, name string, q *basecommon.QueryParam) (*basecommon.Page, error) {
jobs, err := kubeClient.List(api.ResourceNameJob, namespace, labels.Everything().String())
if err != nil {
return nil, err
}
relateJobs := make([]string, 0)
for _, obj := range jobs {
job, ok := obj.(*batchv1.Job)
if !ok {
return nil, fmt.Errorf("Convert rs obj (%v) error. ", obj)
}
for _, ref := range job.OwnerReferences {
if ref.Kind == api.KindNameCronJob && ref.Name == name {
relateJobs = append(relateJobs, job.Name)
}
}

}
pods, err := kubeClient.List(api.ResourceNamePod, namespace, labels.Everything().String())
if err != nil {
return nil, err
}
relatePod := make([]*apiv1.Pod, 0)
for _, obj := range pods {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return nil, fmt.Errorf("Convert pod obj (%v) error. ", obj)
}
for _, ref := range pod.OwnerReferences {
if ref.Kind == api.KindNameJob && slice.StrSliceContains(relateJobs, ref.Name) {
relatePod = append(relatePod, pod)
}
}

}

events, err := getPodsWarningEventsPage(kubeClient, relatePod)
if err != nil {
return nil, err
}

return pageResult(events, q), nil
}

func pageResult(events []common.Event, q *basecommon.QueryParam) *basecommon.Page {
commonObjs := make([]dataselector.DataCell, 0)
for _, event := range events {
commonObjs = append(commonObjs, ObjectCell(event))
}

sort.Slice(commonObjs, func(i, j int) bool {
return commonObjs[j].GetProperty(dataselector.CreationTimestampProperty).
Compare(commonObjs[i].GetProperty(dataselector.CreationTimestampProperty)) == -1
})

return dataselector.DataSelectPage(commonObjs, q)
}

// Returns filtered list of event objects.
// Event list object is filtered to get only warning events.
func getWarningEvents(events []*apiv1.Event) []*apiv1.Event {
Expand Down
Loading