Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
if err != nil {
return nil, err
}
out.Status.ProviderID = strings.ReplaceAll(out.Status.ProviderID, "aws", "kwok-aws")
out.Status.ProviderID = strings.Replace(out.Status.ProviderID, "aws", "kwok-aws", 1)
return out, nil
}

Expand All @@ -69,7 +69,7 @@ func (c *CloudProvider) List(ctx context.Context) ([]*karpv1.NodeClaim, error) {
return nil, err
}
for _, elem := range out {
elem.Status.ProviderID = strings.ReplaceAll(elem.Status.ProviderID, "aws", "kwok-aws")
elem.Status.ProviderID = strings.Replace(elem.Status.ProviderID, "aws", "kwok-aws", 1)
}
return out, nil
}
Expand All @@ -79,6 +79,6 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
if err != nil {
return nil, err
}
elem.Status.ProviderID = strings.ReplaceAll(elem.Status.ProviderID, "aws", "kwok-aws")
elem.Status.ProviderID = strings.Replace(elem.Status.ProviderID, "aws", "kwok-aws", 1)
return elem, nil
}
141 changes: 92 additions & 49 deletions kwok/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"k8s.io/utils/set"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/karpenter/kwok/apis/v1alpha1"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"

k8serrors "k8s.io/apimachinery/pkg/api/errors"

Expand All @@ -64,13 +63,16 @@ type Client struct {
subnets []ec2types.Subnet
strategy strategy.Strategy

instances sync.Map
instances sync.Map
instanceLaunchCancels sync.Map

readBackupCompleted chan struct{}

launchTemplates sync.Map
launchTemplateNameToID sync.Map
}

func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvider RateLimiterProvider, strategy strategy.Strategy, kubeClient client.Client, clk clock.Clock, cfg *rest.Config) *Client {
func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvider RateLimiterProvider, strategy strategy.Strategy, kubeClient client.Client, clk clock.Clock) *Client {
var instanceTypes []ec2types.InstanceTypeInfo
instanceTypesPaginator := ec2.NewDescribeInstanceTypesPaginator(ec2Client, &ec2.DescribeInstanceTypesInput{
MaxResults: aws.Int32(100),
Expand Down Expand Up @@ -100,18 +102,25 @@ func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvi
subnets: subnets,
strategy: strategy,

instances: sync.Map{},
instances: sync.Map{},
instanceLaunchCancels: sync.Map{},

readBackupCompleted: make(chan struct{}),

launchTemplates: sync.Map{},
launchTemplateNameToID: sync.Map{},
}
c.readBackup(context.Background(), cfg)
return c
}

func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
func (c *Client) ReadBackup(ctx context.Context) {
configMaps := &corev1.ConfigMapList{}
lo.Must0(client.IgnoreNotFound(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace))))
lo.Must0(c.kubeClient.List(ctx, configMaps, client.InNamespace(c.namespace)))

nodeList := &corev1.NodeList{}
lo.Must0(c.kubeClient.List(ctx, nodeList, client.MatchingLabels{v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue}))

instanceIDs := set.New[string](lo.Map(nodeList.Items, func(n corev1.Node, _ int) string { return lo.Must(utils.ParseInstanceID(n.Spec.ProviderID)) })...)

configMaps.Items = lo.Filter(configMaps.Items, func(c corev1.ConfigMap, _ int) bool {
return strings.Contains(c.Name, "kwok-aws-instances-")
Expand All @@ -123,11 +132,17 @@ func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
lo.Must0(json.Unmarshal([]byte(cm.Data["instances"]), &instances))
for _, instance := range instances {
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
// Register nodes immediately if we killed the KWOK controller before actually registering the node
if !instanceIDs.Has(lo.FromPtr(instance.InstanceId)) {
log.FromContext(ctx).WithValues("instance-id", lo.FromPtr(instance.InstanceId)).Info("creating node for instance id")
lo.Must0(c.kubeClient.Create(ctx, c.toNode(ctx, instance)))
}
}
total += len(instances)
}
}
log.FromContext(ctx).WithValues("count", total).Info("loaded instances from backup")
close(c.readBackupCompleted)
}

//nolint:gocyclo
Expand Down Expand Up @@ -175,7 +190,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
numConfigMaps := int(math.Ceil(float64(len(instances)) / float64(500)))
if numConfigMaps < len(configMaps.Items) {
errs := make([]error, numConfigMaps)
workqueue.ParallelizeUntil(ctx, 10, len(configMaps.Items)-numConfigMaps, func(i int) {
workqueue.ParallelizeUntil(ctx, len(configMaps.Items)-numConfigMaps, len(configMaps.Items)-numConfigMaps, func(i int) {
if err := c.kubeClient.Delete(ctx, &configMaps.Items[len(configMaps.Items)-i-1]); client.IgnoreNotFound(err) != nil {
errs[i] = fmt.Errorf("deleting configmap %q, %w", configMaps.Items[len(configMaps.Items)-i-1].Name, err)
}
Expand All @@ -186,7 +201,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
}

errs := make([]error, numConfigMaps)
workqueue.ParallelizeUntil(ctx, 10, numConfigMaps, func(i int) {
workqueue.ParallelizeUntil(ctx, numConfigMaps, numConfigMaps, func(i int) {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("kwok-aws-instances-%d", i),
Expand Down Expand Up @@ -218,13 +233,14 @@ func (c *Client) backupInstances(ctx context.Context) error {

// StartBackupThread initiates the thread that is responsible for storing instances in ConfigMaps on the cluster
func (c *Client) StartBackupThread(ctx context.Context) {
<-c.readBackupCompleted
for {
if err := c.backupInstances(ctx); err != nil {
log.FromContext(ctx).Error(err, "unable to backup instances")
continue
}
select {
case <-time.After(time.Second * 5):
case <-time.After(time.Second):
case <-ctx.Done():
return
}
Expand All @@ -233,6 +249,7 @@ func (c *Client) StartBackupThread(ctx context.Context) {

// StartKillNodeThread initiates the thread that is responsible for killing nodes on the cluster that no longer have an instance representation (similar to CCM)
func (c *Client) StartKillNodeThread(ctx context.Context) {
<-c.readBackupCompleted
for {
nodes := &corev1.NodeList{}
if err := c.kubeClient.List(ctx, nodes, client.MatchingLabels{v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue}); err != nil {
Expand Down Expand Up @@ -276,6 +293,7 @@ func removeNullFields(bytes []byte) []byte {

//nolint:gocyclo
func (c *Client) DescribeLaunchTemplates(_ context.Context, input *ec2.DescribeLaunchTemplatesInput, _ ...func(*ec2.Options)) (*ec2.DescribeLaunchTemplatesOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.DescribeLaunchTemplates().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -372,6 +390,7 @@ func (c *Client) DescribeLaunchTemplates(_ context.Context, input *ec2.DescribeL

//nolint:gocyclo
func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _ ...func(*ec2.Options)) (*ec2.CreateFleetOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.CreateFleet().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -450,28 +469,9 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
instanceTypeInfo := lo.Must(lo.Find(c.instanceTypes, func(i ec2types.InstanceTypeInfo) bool {
return i.InstanceType == selectedOverride.InstanceType
}))
// TODO: We need to get the capacity and allocatable information from the userData
it := instancetype.NewInstanceType(
ctx,
instanceTypeInfo,
c.region,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
// TODO: Eventually support different AMIFamilies from userData
"al2023",
nil,
)
instance := ec2types.Instance{
AmiLaunchIndex: nil,
Architecture: lo.Ternary(it.Requirements.Get(corev1.LabelArchStable).Any() == v1.ArchitectureAmd64, ec2types.ArchitectureValuesX8664, ec2types.ArchitectureValuesArm64),
Architecture: lo.Ternary(lo.Contains(instanceTypeInfo.ProcessorInfo.SupportedArchitectures, ec2types.ArchitectureTypeX8664), ec2types.ArchitectureValuesX8664, ec2types.ArchitectureValuesArm64),
// TODO: The block device mappings here don't have any data on the ephemeral storage size
BlockDeviceMappings: lo.Map(lt.B.BlockDeviceMappings, func(b ec2types.LaunchTemplateBlockDeviceMappingRequest, _ int) ec2types.InstanceBlockDeviceMapping {
return ec2types.InstanceBlockDeviceMapping{
Expand Down Expand Up @@ -586,15 +586,21 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
VpcId: subnet.VpcId,
}
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
launchCtx, cancel := context.WithCancel(ctx)
c.instanceLaunchCancels.Store(lo.FromPtr(instance.InstanceId), cancel)

// Create the Node through the instance launch
// TODO: Eventually support delayed registration
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
})
if err := c.kubeClient.Create(ctx, toNode(lo.FromPtr(instance.InstanceId), lo.FromPtr(nodePoolNameTag.Value), it, lo.FromPtr(subnet.AvailabilityZone), v1.CapacityTypeOnDemand)); err != nil {
return nil, fmt.Errorf("creating node, %w", err)
}
go func() {
select {
case <-launchCtx.Done():
return
// This is meant to simulate instance startup time
case <-c.clock.After(30 * time.Second):
}
if err := c.kubeClient.Create(launchCtx, c.toNode(ctx, instance)); err != nil {
c.instances.Delete(lo.FromPtr(instance.InstanceId))
c.instanceLaunchCancels.Delete(lo.FromPtr(instance.InstanceId))
}
}()
fleetInstances = append(fleetInstances, ec2types.CreateFleetInstance{
InstanceIds: []string{lo.FromPtr(instance.InstanceId)},
InstanceType: instance.InstanceType,
Expand Down Expand Up @@ -628,6 +634,7 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
}

func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInstancesInput, _ ...func(*ec2.Options)) (*ec2.TerminateInstancesOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.TerminateInstances().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand All @@ -644,6 +651,9 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta

for _, id := range input.InstanceIds {
c.instances.Delete(id)
if cancel, ok := c.instanceLaunchCancels.LoadAndDelete(id); ok {
cancel.(context.CancelFunc)()
}
}
return &ec2.TerminateInstancesOutput{
TerminatingInstances: lo.Map(input.InstanceIds, func(id string, _ int) ec2types.InstanceStateChange {
Expand All @@ -663,6 +673,7 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
}

func (c *Client) DescribeInstances(_ context.Context, input *ec2.DescribeInstancesInput, _ ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.DescribeInstances().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -713,6 +724,7 @@ func (c *Client) DescribeInstances(_ context.Context, input *ec2.DescribeInstanc
}

func (c *Client) RunInstances(_ context.Context, input *ec2.RunInstancesInput, _ ...func(*ec2.Options)) (*ec2.RunInstancesOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.RunInstances().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand All @@ -733,6 +745,7 @@ func (c *Client) RunInstances(_ context.Context, input *ec2.RunInstancesInput, _

//nolint:gocyclo
func (c *Client) CreateTags(_ context.Context, input *ec2.CreateTagsInput, _ ...func(*ec2.Options)) (*ec2.CreateTagsOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.CreateTags().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -791,6 +804,7 @@ func (c *Client) CreateTags(_ context.Context, input *ec2.CreateTagsInput, _ ...
}

func (c *Client) CreateLaunchTemplate(_ context.Context, input *ec2.CreateLaunchTemplateInput, _ ...func(*ec2.Options)) (*ec2.CreateLaunchTemplateOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.CreateLaunchTemplate().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -823,6 +837,7 @@ func (c *Client) CreateLaunchTemplate(_ context.Context, input *ec2.CreateLaunch
}

func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunchTemplateInput, _ ...func(*ec2.Options)) (*ec2.DeleteLaunchTemplateOutput, error) {
<-c.readBackupCompleted
if !c.rateLimiterProvider.DeleteLaunchTemplate().TryAccept() {
return nil, &smithy.GenericAPIError{
Code: errors.RateLimitingErrorCode,
Expand Down Expand Up @@ -862,7 +877,35 @@ func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunch
}, nil
}

func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.InstanceType, zone, capacityType string) *corev1.Node {
func (c *Client) toNode(ctx context.Context, instance ec2types.Instance) *corev1.Node {
Comment thread
jonathan-innis marked this conversation as resolved.
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
})
subnet := lo.Must(lo.Find(c.subnets, func(s ec2types.Subnet) bool {
return lo.FromPtr(s.SubnetId) == lo.FromPtr(instance.SubnetId)
}))
instanceTypeInfo := lo.Must(lo.Find(c.instanceTypes, func(i ec2types.InstanceTypeInfo) bool {
return i.InstanceType == instance.InstanceType
}))
// TODO: We need to get the capacity and allocatable information from the userData
it := instancetype.NewInstanceType(
ctx,
instanceTypeInfo,
c.region,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
// TODO: Eventually support different AMIFamilies from userData
"al2023",
nil,
)
nodeName := fmt.Sprintf("%s-%d", strings.ReplaceAll(namesgenerator.GetRandomName(0), "_", "-"), rand.Uint32()) //nolint:gosec
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -872,25 +915,25 @@ func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.Instanc
},
// TODO: We can eventually add all the labels from the userData but for now we just add the NodePool labels
Labels: map[string]string{
corev1.LabelInstanceTypeStable: instanceType.Name,
corev1.LabelInstanceTypeStable: it.Name,
corev1.LabelHostname: nodeName,
corev1.LabelTopologyRegion: instanceType.Requirements.Get(corev1.LabelTopologyRegion).Any(),
corev1.LabelTopologyZone: zone,
v1.CapacityTypeLabelKey: capacityType,
corev1.LabelArchStable: instanceType.Requirements.Get(corev1.LabelArchStable).Any(),
corev1.LabelTopologyRegion: it.Requirements.Get(corev1.LabelTopologyRegion).Any(),
corev1.LabelTopologyZone: lo.FromPtr(subnet.AvailabilityZone),
v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand,
corev1.LabelArchStable: it.Requirements.Get(corev1.LabelArchStable).Any(),
corev1.LabelOSStable: string(corev1.Linux),
v1.NodePoolLabelKey: nodePoolName,
v1.NodePoolLabelKey: lo.FromPtr(nodePoolNameTag.Value),
v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue,
v1alpha1.KwokPartitionLabelKey: "a",
},
},
Spec: corev1.NodeSpec{
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", zone, instanceID),
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", lo.FromPtr(subnet.AvailabilityZone), lo.FromPtr(instance.InstanceId)),
Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint},
},
Status: corev1.NodeStatus{
Capacity: instanceType.Capacity,
Allocatable: instanceType.Allocatable(),
Capacity: it.Capacity,
Allocatable: it.Allocatable(),
Phase: corev1.NodePending,
},
}
Expand Down
6 changes: 6 additions & 0 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ func main() {
<-op.Elected()
op.EC2API.StartKillNodeThread(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
<-op.Elected()
op.EC2API.ReadBackup(ctx)
}()

op.
WithControllers(ctx, corecontrollers.NewControllers(
Expand Down
2 changes: 1 addition & 1 deletion kwok/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
region := lo.Must(imds.NewFromConfig(cfg).GetRegion(ctx, nil))
cfg.Region = region.Region
}
ec2api := kwokec2.NewClient(cfg.Region, option.MustGetEnv("SYSTEM_NAMESPACE"), ec2.NewFromConfig(cfg), kwokec2.NewNopRateLimiterProvider(), strategy.NewLowestPrice(pricing.NewAPI(cfg), ec2.NewFromConfig(cfg), cfg.Region), operator.GetClient(), operator.Clock, operator.GetConfig())
ec2api := kwokec2.NewClient(cfg.Region, option.MustGetEnv("SYSTEM_NAMESPACE"), ec2.NewFromConfig(cfg), kwokec2.NewNopRateLimiterProvider(), strategy.NewLowestPrice(pricing.NewAPI(cfg), ec2.NewFromConfig(cfg), cfg.Region), operator.GetClient(), operator.Clock)

eksapi := eks.NewFromConfig(cfg)
log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
Expand Down
Loading