Skip to content

Commit a83ac74

Browse files
Support delayed registration for AWS KWOK
1 parent 79eeadc commit a83ac74

File tree

1 file changed

+70
-23
lines changed

1 file changed

+70
-23
lines changed

kwok/ec2/ec2.go

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import (
3838
"k8s.io/client-go/rest"
3939
"k8s.io/client-go/util/workqueue"
4040
"k8s.io/utils/clock"
41+
"k8s.io/utils/set"
4142
"sigs.k8s.io/controller-runtime/pkg/client"
4243
"sigs.k8s.io/controller-runtime/pkg/log"
4344
"sigs.k8s.io/karpenter/kwok/apis/v1alpha1"
4445
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
45-
"sigs.k8s.io/karpenter/pkg/cloudprovider"
4646

4747
k8serrors "k8s.io/apimachinery/pkg/api/errors"
4848

@@ -64,7 +64,8 @@ type Client struct {
6464
subnets []ec2types.Subnet
6565
strategy strategy.Strategy
6666

67-
instances sync.Map
67+
instances sync.Map
68+
instanceLaunchCancel sync.Map
6869

6970
launchTemplates sync.Map
7071
launchTemplateNameToID sync.Map
@@ -111,7 +112,12 @@ func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvi
111112

112113
func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
113114
configMaps := &corev1.ConfigMapList{}
114-
lo.Must0(client.IgnoreNotFound(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace))))
115+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace)))
116+
117+
nodeList := &corev1.NodeList{}
118+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, nodeList, client.MatchingLabels{v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue}))
119+
120+
instanceIDs := set.New[string](lo.Map(nodeList.Items, func(n corev1.Node, _ int) string { return lo.Must(utils.ParseInstanceID(n.Spec.ProviderID)) })...)
115121

116122
configMaps.Items = lo.Filter(configMaps.Items, func(c corev1.ConfigMap, _ int) bool {
117123
return strings.Contains(c.Name, "kwok-aws-instances-")
@@ -123,6 +129,10 @@ func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
123129
lo.Must0(json.Unmarshal([]byte(cm.Data["instances"]), &instances))
124130
for _, instance := range instances {
125131
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
132+
// Register nodes immediately if we killed the KWOK controller before actually registering the node
133+
if !instanceIDs.Has(lo.FromPtr(instance.InstanceId)) {
134+
lo.Must0(c.kubeClient.Create(ctx, c.toNode(ctx, instance)))
135+
}
126136
}
127137
total += len(instances)
128138
}
@@ -175,7 +185,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
175185
numConfigMaps := int(math.Ceil(float64(len(instances)) / float64(500)))
176186
if numConfigMaps < len(configMaps.Items) {
177187
errs := make([]error, numConfigMaps)
178-
workqueue.ParallelizeUntil(ctx, 10, len(configMaps.Items)-numConfigMaps, func(i int) {
188+
workqueue.ParallelizeUntil(ctx, len(configMaps.Items)-numConfigMaps, len(configMaps.Items)-numConfigMaps, func(i int) {
179189
if err := c.kubeClient.Delete(ctx, &configMaps.Items[len(configMaps.Items)-i-1]); client.IgnoreNotFound(err) != nil {
180190
errs[i] = fmt.Errorf("deleting configmap %q, %w", configMaps.Items[len(configMaps.Items)-i-1].Name, err)
181191
}
@@ -186,7 +196,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
186196
}
187197

188198
errs := make([]error, numConfigMaps)
189-
workqueue.ParallelizeUntil(ctx, 10, numConfigMaps, func(i int) {
199+
workqueue.ParallelizeUntil(ctx, numConfigMaps, numConfigMaps, func(i int) {
190200
cm := &corev1.ConfigMap{
191201
ObjectMeta: metav1.ObjectMeta{
192202
Name: fmt.Sprintf("kwok-aws-instances-%d", i),
@@ -586,15 +596,21 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
586596
VpcId: subnet.VpcId,
587597
}
588598
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
599+
launchCtx, cancel := context.WithCancel(ctx)
600+
c.instanceLaunchCancel.Store(lo.FromPtr(instance.InstanceId), cancel)
589601

590-
// Create the Node through the instance launch
591-
// TODO: Eventually support delayed registration
592-
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
593-
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
594-
})
595-
if err := c.kubeClient.Create(ctx, toNode(lo.FromPtr(instance.InstanceId), lo.FromPtr(nodePoolNameTag.Value), it, lo.FromPtr(subnet.AvailabilityZone), v1.CapacityTypeOnDemand)); err != nil {
596-
return nil, fmt.Errorf("creating node, %w", err)
597-
}
602+
go func() {
603+
select {
604+
case <-launchCtx.Done():
605+
return
606+
// This is meant to simulate instance startup time
607+
case <-c.clock.After(30 * time.Second):
608+
}
609+
if err := c.kubeClient.Create(launchCtx, c.toNode(ctx, instance)); err != nil {
610+
c.instances.Delete(lo.FromPtr(instance.InstanceId))
611+
c.instanceLaunchCancel.Delete(lo.FromPtr(instance.InstanceId))
612+
}
613+
}()
598614
fleetInstances = append(fleetInstances, ec2types.CreateFleetInstance{
599615
InstanceIds: []string{lo.FromPtr(instance.InstanceId)},
600616
InstanceType: instance.InstanceType,
@@ -644,6 +660,9 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
644660

645661
for _, id := range input.InstanceIds {
646662
c.instances.Delete(id)
663+
if cancel, ok := c.instanceLaunchCancel.LoadAndDelete(id); ok {
664+
cancel.(context.CancelFunc)()
665+
}
647666
}
648667
return &ec2.TerminateInstancesOutput{
649668
TerminatingInstances: lo.Map(input.InstanceIds, func(id string, _ int) ec2types.InstanceStateChange {
@@ -862,7 +881,35 @@ func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunch
862881
}, nil
863882
}
864883

865-
func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.InstanceType, zone, capacityType string) *corev1.Node {
884+
func (c *Client) toNode(ctx context.Context, instance ec2types.Instance) *corev1.Node {
885+
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
886+
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
887+
})
888+
subnet := lo.Must(lo.Find(c.subnets, func(s ec2types.Subnet) bool {
889+
return lo.FromPtr(s.SubnetId) == lo.FromPtr(instance.SubnetId)
890+
}))
891+
instanceTypeInfo := lo.Must(lo.Find(c.instanceTypes, func(i ec2types.InstanceTypeInfo) bool {
892+
return i.InstanceType == instance.InstanceType
893+
}))
894+
// TODO: We need to get the capacity and allocatable information from the userData
895+
it := instancetype.NewInstanceType(
896+
ctx,
897+
instanceTypeInfo,
898+
c.region,
899+
nil,
900+
nil,
901+
nil,
902+
nil,
903+
nil,
904+
nil,
905+
nil,
906+
nil,
907+
nil,
908+
nil,
909+
// TODO: Eventually support different AMIFamilies from userData
910+
"al2023",
911+
nil,
912+
)
866913
nodeName := fmt.Sprintf("%s-%d", strings.ReplaceAll(namesgenerator.GetRandomName(0), "_", "-"), rand.Uint32()) //nolint:gosec
867914
return &corev1.Node{
868915
ObjectMeta: metav1.ObjectMeta{
@@ -872,25 +919,25 @@ func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.Instanc
872919
},
873920
// TODO: We can eventually add all the labels from the userData but for now we just add the NodePool labels
874921
Labels: map[string]string{
875-
corev1.LabelInstanceTypeStable: instanceType.Name,
922+
corev1.LabelInstanceTypeStable: it.Name,
876923
corev1.LabelHostname: nodeName,
877-
corev1.LabelTopologyRegion: instanceType.Requirements.Get(corev1.LabelTopologyRegion).Any(),
878-
corev1.LabelTopologyZone: zone,
879-
v1.CapacityTypeLabelKey: capacityType,
880-
corev1.LabelArchStable: instanceType.Requirements.Get(corev1.LabelArchStable).Any(),
924+
corev1.LabelTopologyRegion: it.Requirements.Get(corev1.LabelTopologyRegion).Any(),
925+
corev1.LabelTopologyZone: lo.FromPtr(subnet.AvailabilityZone),
926+
v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand,
927+
corev1.LabelArchStable: it.Requirements.Get(corev1.LabelArchStable).Any(),
881928
corev1.LabelOSStable: string(corev1.Linux),
882-
v1.NodePoolLabelKey: nodePoolName,
929+
v1.NodePoolLabelKey: lo.FromPtr(nodePoolNameTag.Value),
883930
v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue,
884931
v1alpha1.KwokPartitionLabelKey: "a",
885932
},
886933
},
887934
Spec: corev1.NodeSpec{
888-
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", zone, instanceID),
935+
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", lo.FromPtr(subnet.AvailabilityZone), lo.FromPtr(instance.InstanceId)),
889936
Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint},
890937
},
891938
Status: corev1.NodeStatus{
892-
Capacity: instanceType.Capacity,
893-
Allocatable: instanceType.Allocatable(),
939+
Capacity: it.Capacity,
940+
Allocatable: it.Allocatable(),
894941
Phase: corev1.NodePending,
895942
},
896943
}

0 commit comments

Comments
 (0)