Skip to content

Commit b129eab

Browse files
Support delayed registration for AWS KWOK
1 parent 79eeadc commit b129eab

File tree

1 file changed

+71
-23
lines changed

1 file changed

+71
-23
lines changed

kwok/ec2/ec2.go

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import (
3838
"k8s.io/client-go/rest"
3939
"k8s.io/client-go/util/workqueue"
4040
"k8s.io/utils/clock"
41+
"k8s.io/utils/set"
4142
"sigs.k8s.io/controller-runtime/pkg/client"
4243
"sigs.k8s.io/controller-runtime/pkg/log"
4344
"sigs.k8s.io/karpenter/kwok/apis/v1alpha1"
4445
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
45-
"sigs.k8s.io/karpenter/pkg/cloudprovider"
4646

4747
k8serrors "k8s.io/apimachinery/pkg/api/errors"
4848

@@ -64,7 +64,8 @@ type Client struct {
6464
subnets []ec2types.Subnet
6565
strategy strategy.Strategy
6666

67-
instances sync.Map
67+
instances sync.Map
68+
instanceLaunchCancel sync.Map
6869

6970
launchTemplates sync.Map
7071
launchTemplateNameToID sync.Map
@@ -111,7 +112,12 @@ func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvi
111112

112113
func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
113114
configMaps := &corev1.ConfigMapList{}
114-
lo.Must0(client.IgnoreNotFound(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace))))
115+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace)))
116+
117+
nodeList := &corev1.NodeList{}
118+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, nodeList, client.MatchingLabels{v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue}))
119+
120+
instanceIDs := set.New[string](lo.Map(nodeList.Items, func(n corev1.Node, _ int) string { return lo.Must(utils.ParseInstanceID(n.Spec.ProviderID)) })...)
115121

116122
configMaps.Items = lo.Filter(configMaps.Items, func(c corev1.ConfigMap, _ int) bool {
117123
return strings.Contains(c.Name, "kwok-aws-instances-")
@@ -123,6 +129,11 @@ func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
123129
lo.Must0(json.Unmarshal([]byte(cm.Data["instances"]), &instances))
124130
for _, instance := range instances {
125131
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
132+
// Register nodes immediately if we killed the KWOK controller before actually registering the node
133+
if !instanceIDs.Has(lo.FromPtr(instance.InstanceId)) {
134+
135+
lo.Must0(c.kubeClient.Create(ctx, c.toNode(ctx, instance)))
136+
}
126137
}
127138
total += len(instances)
128139
}
@@ -175,7 +186,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
175186
numConfigMaps := int(math.Ceil(float64(len(instances)) / float64(500)))
176187
if numConfigMaps < len(configMaps.Items) {
177188
errs := make([]error, numConfigMaps)
178-
workqueue.ParallelizeUntil(ctx, 10, len(configMaps.Items)-numConfigMaps, func(i int) {
189+
workqueue.ParallelizeUntil(ctx, len(configMaps.Items)-numConfigMaps, len(configMaps.Items)-numConfigMaps, func(i int) {
179190
if err := c.kubeClient.Delete(ctx, &configMaps.Items[len(configMaps.Items)-i-1]); client.IgnoreNotFound(err) != nil {
180191
errs[i] = fmt.Errorf("deleting configmap %q, %w", configMaps.Items[len(configMaps.Items)-i-1].Name, err)
181192
}
@@ -186,7 +197,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
186197
}
187198

188199
errs := make([]error, numConfigMaps)
189-
workqueue.ParallelizeUntil(ctx, 10, numConfigMaps, func(i int) {
200+
workqueue.ParallelizeUntil(ctx, numConfigMaps, numConfigMaps, func(i int) {
190201
cm := &corev1.ConfigMap{
191202
ObjectMeta: metav1.ObjectMeta{
192203
Name: fmt.Sprintf("kwok-aws-instances-%d", i),
@@ -586,15 +597,21 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
586597
VpcId: subnet.VpcId,
587598
}
588599
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
600+
launchCtx, cancel := context.WithCancel(ctx)
601+
c.instanceLaunchCancel.Store(lo.FromPtr(instance.InstanceId), cancel)
589602

590-
// Create the Node through the instance launch
591-
// TODO: Eventually support delayed registration
592-
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
593-
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
594-
})
595-
if err := c.kubeClient.Create(ctx, toNode(lo.FromPtr(instance.InstanceId), lo.FromPtr(nodePoolNameTag.Value), it, lo.FromPtr(subnet.AvailabilityZone), v1.CapacityTypeOnDemand)); err != nil {
596-
return nil, fmt.Errorf("creating node, %w", err)
597-
}
603+
go func() {
604+
select {
605+
case <-launchCtx.Done():
606+
return
607+
// This is meant to simulate instance startup time
608+
case <-c.clock.After(30 * time.Second):
609+
}
610+
if err := c.kubeClient.Create(launchCtx, c.toNode(ctx, instance)); err != nil {
611+
c.instances.Delete(lo.FromPtr(instance.InstanceId))
612+
c.instanceLaunchCancel.Delete(lo.FromPtr(instance.InstanceId))
613+
}
614+
}()
598615
fleetInstances = append(fleetInstances, ec2types.CreateFleetInstance{
599616
InstanceIds: []string{lo.FromPtr(instance.InstanceId)},
600617
InstanceType: instance.InstanceType,
@@ -644,6 +661,9 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
644661

645662
for _, id := range input.InstanceIds {
646663
c.instances.Delete(id)
664+
if cancel, ok := c.instanceLaunchCancel.LoadAndDelete(id); ok {
665+
cancel.(context.CancelFunc)()
666+
}
647667
}
648668
return &ec2.TerminateInstancesOutput{
649669
TerminatingInstances: lo.Map(input.InstanceIds, func(id string, _ int) ec2types.InstanceStateChange {
@@ -862,7 +882,35 @@ func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunch
862882
}, nil
863883
}
864884

865-
func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.InstanceType, zone, capacityType string) *corev1.Node {
885+
func (c *Client) toNode(ctx context.Context, instance ec2types.Instance) *corev1.Node {
886+
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
887+
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
888+
})
889+
subnet := lo.Must(lo.Find(c.subnets, func(s ec2types.Subnet) bool {
890+
return lo.FromPtr(s.SubnetId) == lo.FromPtr(instance.SubnetId)
891+
}))
892+
instanceTypeInfo := lo.Must(lo.Find(c.instanceTypes, func(i ec2types.InstanceTypeInfo) bool {
893+
return i.InstanceType == instance.InstanceType
894+
}))
895+
// TODO: We need to get the capacity and allocatable information from the userData
896+
it := instancetype.NewInstanceType(
897+
ctx,
898+
instanceTypeInfo,
899+
c.region,
900+
nil,
901+
nil,
902+
nil,
903+
nil,
904+
nil,
905+
nil,
906+
nil,
907+
nil,
908+
nil,
909+
nil,
910+
// TODO: Eventually support different AMIFamilies from userData
911+
"al2023",
912+
nil,
913+
)
866914
nodeName := fmt.Sprintf("%s-%d", strings.ReplaceAll(namesgenerator.GetRandomName(0), "_", "-"), rand.Uint32()) //nolint:gosec
867915
return &corev1.Node{
868916
ObjectMeta: metav1.ObjectMeta{
@@ -872,25 +920,25 @@ func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.Instanc
872920
},
873921
// TODO: We can eventually add all the labels from the userData but for now we just add the NodePool labels
874922
Labels: map[string]string{
875-
corev1.LabelInstanceTypeStable: instanceType.Name,
923+
corev1.LabelInstanceTypeStable: it.Name,
876924
corev1.LabelHostname: nodeName,
877-
corev1.LabelTopologyRegion: instanceType.Requirements.Get(corev1.LabelTopologyRegion).Any(),
878-
corev1.LabelTopologyZone: zone,
879-
v1.CapacityTypeLabelKey: capacityType,
880-
corev1.LabelArchStable: instanceType.Requirements.Get(corev1.LabelArchStable).Any(),
925+
corev1.LabelTopologyRegion: it.Requirements.Get(corev1.LabelTopologyRegion).Any(),
926+
corev1.LabelTopologyZone: lo.FromPtr(subnet.AvailabilityZone),
927+
v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand,
928+
corev1.LabelArchStable: it.Requirements.Get(corev1.LabelArchStable).Any(),
881929
corev1.LabelOSStable: string(corev1.Linux),
882-
v1.NodePoolLabelKey: nodePoolName,
930+
v1.NodePoolLabelKey: lo.FromPtr(nodePoolNameTag.Value),
883931
v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue,
884932
v1alpha1.KwokPartitionLabelKey: "a",
885933
},
886934
},
887935
Spec: corev1.NodeSpec{
888-
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", zone, instanceID),
936+
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", lo.FromPtr(subnet.AvailabilityZone), lo.FromPtr(instance.InstanceId)),
889937
Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint},
890938
},
891939
Status: corev1.NodeStatus{
892-
Capacity: instanceType.Capacity,
893-
Allocatable: instanceType.Allocatable(),
940+
Capacity: it.Capacity,
941+
Allocatable: it.Allocatable(),
894942
Phase: corev1.NodePending,
895943
},
896944
}

0 commit comments

Comments
 (0)