Skip to content

Commit 229c7e6

Browse files
Support delayed registration for AWS KWOK
1 parent 79eeadc commit 229c7e6

File tree

2 files changed

+76
-27
lines changed

2 files changed

+76
-27
lines changed

kwok/ec2/ec2.go

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ import (
3838
"k8s.io/client-go/rest"
3939
"k8s.io/client-go/util/workqueue"
4040
"k8s.io/utils/clock"
41+
"k8s.io/utils/set"
4142
"sigs.k8s.io/controller-runtime/pkg/client"
4243
"sigs.k8s.io/controller-runtime/pkg/log"
4344
"sigs.k8s.io/karpenter/kwok/apis/v1alpha1"
4445
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
45-
"sigs.k8s.io/karpenter/pkg/cloudprovider"
4646

4747
k8serrors "k8s.io/apimachinery/pkg/api/errors"
4848

@@ -64,13 +64,14 @@ type Client struct {
6464
subnets []ec2types.Subnet
6565
strategy strategy.Strategy
6666

67-
instances sync.Map
67+
instances sync.Map
68+
instanceLaunchCancel sync.Map
6869

6970
launchTemplates sync.Map
7071
launchTemplateNameToID sync.Map
7172
}
7273

73-
func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvider RateLimiterProvider, strategy strategy.Strategy, kubeClient client.Client, clk clock.Clock, cfg *rest.Config) *Client {
74+
func NewClient(ctx context.Context, region, namespace string, ec2Client *ec2.Client, rateLimiterProvider RateLimiterProvider, strategy strategy.Strategy, kubeClient client.Client, clk clock.Clock, cfg *rest.Config) *Client {
7475
var instanceTypes []ec2types.InstanceTypeInfo
7576
instanceTypesPaginator := ec2.NewDescribeInstanceTypesPaginator(ec2Client, &ec2.DescribeInstanceTypesInput{
7677
MaxResults: aws.Int32(100),
@@ -105,13 +106,18 @@ func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvi
105106
launchTemplates: sync.Map{},
106107
launchTemplateNameToID: sync.Map{},
107108
}
108-
c.readBackup(context.Background(), cfg)
109+
c.readBackup(ctx, cfg)
109110
return c
110111
}
111112

112113
func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
113114
configMaps := &corev1.ConfigMapList{}
114-
lo.Must0(client.IgnoreNotFound(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace))))
115+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, configMaps, client.InNamespace(c.namespace)))
116+
117+
nodeList := &corev1.NodeList{}
118+
lo.Must0(lo.Must(client.New(cfg, client.Options{})).List(ctx, nodeList, client.MatchingLabels{v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue}))
119+
120+
instanceIDs := set.New[string](lo.Map(nodeList.Items, func(n corev1.Node, _ int) string { return lo.Must(utils.ParseInstanceID(n.Spec.ProviderID)) })...)
115121

116122
configMaps.Items = lo.Filter(configMaps.Items, func(c corev1.ConfigMap, _ int) bool {
117123
return strings.Contains(c.Name, "kwok-aws-instances-")
@@ -123,6 +129,10 @@ func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
123129
lo.Must0(json.Unmarshal([]byte(cm.Data["instances"]), &instances))
124130
for _, instance := range instances {
125131
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
132+
// Register nodes immediately if we killed the KWOK controller before actually registering the node
133+
if !instanceIDs.Has(lo.FromPtr(instance.InstanceId)) {
134+
lo.Must0(c.kubeClient.Create(ctx, c.toNode(ctx, instance)))
135+
}
126136
}
127137
total += len(instances)
128138
}
@@ -175,7 +185,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
175185
numConfigMaps := int(math.Ceil(float64(len(instances)) / float64(500)))
176186
if numConfigMaps < len(configMaps.Items) {
177187
errs := make([]error, numConfigMaps)
178-
workqueue.ParallelizeUntil(ctx, 10, len(configMaps.Items)-numConfigMaps, func(i int) {
188+
workqueue.ParallelizeUntil(ctx, len(configMaps.Items)-numConfigMaps, len(configMaps.Items)-numConfigMaps, func(i int) {
179189
if err := c.kubeClient.Delete(ctx, &configMaps.Items[len(configMaps.Items)-i-1]); client.IgnoreNotFound(err) != nil {
180190
errs[i] = fmt.Errorf("deleting configmap %q, %w", configMaps.Items[len(configMaps.Items)-i-1].Name, err)
181191
}
@@ -186,7 +196,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
186196
}
187197

188198
errs := make([]error, numConfigMaps)
189-
workqueue.ParallelizeUntil(ctx, 10, numConfigMaps, func(i int) {
199+
workqueue.ParallelizeUntil(ctx, numConfigMaps, numConfigMaps, func(i int) {
190200
cm := &corev1.ConfigMap{
191201
ObjectMeta: metav1.ObjectMeta{
192202
Name: fmt.Sprintf("kwok-aws-instances-%d", i),
@@ -224,7 +234,7 @@ func (c *Client) StartBackupThread(ctx context.Context) {
224234
continue
225235
}
226236
select {
227-
case <-time.After(time.Second * 5):
237+
case <-time.After(time.Second):
228238
case <-ctx.Done():
229239
return
230240
}
@@ -586,15 +596,21 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
586596
VpcId: subnet.VpcId,
587597
}
588598
c.instances.Store(lo.FromPtr(instance.InstanceId), instance)
599+
launchCtx, cancel := context.WithCancel(ctx)
600+
c.instanceLaunchCancel.Store(lo.FromPtr(instance.InstanceId), cancel)
589601

590-
// Create the Node through the instance launch
591-
// TODO: Eventually support delayed registration
592-
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
593-
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
594-
})
595-
if err := c.kubeClient.Create(ctx, toNode(lo.FromPtr(instance.InstanceId), lo.FromPtr(nodePoolNameTag.Value), it, lo.FromPtr(subnet.AvailabilityZone), v1.CapacityTypeOnDemand)); err != nil {
596-
return nil, fmt.Errorf("creating node, %w", err)
597-
}
602+
go func() {
603+
select {
604+
case <-launchCtx.Done():
605+
return
606+
// This is meant to simulate instance startup time
607+
case <-c.clock.After(30 * time.Second):
608+
}
609+
if err := c.kubeClient.Create(launchCtx, c.toNode(ctx, instance)); err != nil {
610+
c.instances.Delete(lo.FromPtr(instance.InstanceId))
611+
c.instanceLaunchCancel.Delete(lo.FromPtr(instance.InstanceId))
612+
}
613+
}()
598614
fleetInstances = append(fleetInstances, ec2types.CreateFleetInstance{
599615
InstanceIds: []string{lo.FromPtr(instance.InstanceId)},
600616
InstanceType: instance.InstanceType,
@@ -644,6 +660,9 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
644660

645661
for _, id := range input.InstanceIds {
646662
c.instances.Delete(id)
663+
if cancel, ok := c.instanceLaunchCancel.LoadAndDelete(id); ok {
664+
cancel.(context.CancelFunc)()
665+
}
647666
}
648667
return &ec2.TerminateInstancesOutput{
649668
TerminatingInstances: lo.Map(input.InstanceIds, func(id string, _ int) ec2types.InstanceStateChange {
@@ -862,7 +881,37 @@ func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunch
862881
}, nil
863882
}
864883

865-
func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.InstanceType, zone, capacityType string) *corev1.Node {
884+
func (c *Client) toNode(ctx context.Context, instance ec2types.Instance) *corev1.Node {
885+
nodePoolNameTag, _ := lo.Find(instance.Tags, func(t ec2types.Tag) bool {
886+
return lo.FromPtr(t.Key) == v1.NodePoolLabelKey
887+
})
888+
subnet := lo.Must(lo.Find(c.subnets, func(s ec2types.Subnet) bool {
889+
return lo.FromPtr(s.SubnetId) == lo.FromPtr(instance.SubnetId)
890+
}))
891+
instanceTypeInfo := lo.Must(lo.Find(c.instanceTypes, func(i ec2types.InstanceTypeInfo) bool {
892+
return i.InstanceType == instance.InstanceType
893+
}))
894+
fmt.Println(instance.InstanceType)
895+
fmt.Println(instanceTypeInfo.InstanceType)
896+
// TODO: We need to get the capacity and allocatable information from the userData
897+
it := instancetype.NewInstanceType(
898+
ctx,
899+
instanceTypeInfo,
900+
c.region,
901+
nil,
902+
nil,
903+
nil,
904+
nil,
905+
nil,
906+
nil,
907+
nil,
908+
nil,
909+
nil,
910+
nil,
911+
// TODO: Eventually support different AMIFamilies from userData
912+
"al2023",
913+
nil,
914+
)
866915
nodeName := fmt.Sprintf("%s-%d", strings.ReplaceAll(namesgenerator.GetRandomName(0), "_", "-"), rand.Uint32()) //nolint:gosec
867916
return &corev1.Node{
868917
ObjectMeta: metav1.ObjectMeta{
@@ -872,25 +921,25 @@ func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.Instanc
872921
},
873922
// TODO: We can eventually add all the labels from the userData but for now we just add the NodePool labels
874923
Labels: map[string]string{
875-
corev1.LabelInstanceTypeStable: instanceType.Name,
924+
corev1.LabelInstanceTypeStable: it.Name,
876925
corev1.LabelHostname: nodeName,
877-
corev1.LabelTopologyRegion: instanceType.Requirements.Get(corev1.LabelTopologyRegion).Any(),
878-
corev1.LabelTopologyZone: zone,
879-
v1.CapacityTypeLabelKey: capacityType,
880-
corev1.LabelArchStable: instanceType.Requirements.Get(corev1.LabelArchStable).Any(),
926+
corev1.LabelTopologyRegion: it.Requirements.Get(corev1.LabelTopologyRegion).Any(),
927+
corev1.LabelTopologyZone: lo.FromPtr(subnet.AvailabilityZone),
928+
v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand,
929+
corev1.LabelArchStable: it.Requirements.Get(corev1.LabelArchStable).Any(),
881930
corev1.LabelOSStable: string(corev1.Linux),
882-
v1.NodePoolLabelKey: nodePoolName,
931+
v1.NodePoolLabelKey: lo.FromPtr(nodePoolNameTag.Value),
883932
v1alpha1.KwokLabelKey: v1alpha1.KwokLabelValue,
884933
v1alpha1.KwokPartitionLabelKey: "a",
885934
},
886935
},
887936
Spec: corev1.NodeSpec{
888-
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", zone, instanceID),
937+
ProviderID: fmt.Sprintf("kwok-aws:///%s/%s", lo.FromPtr(subnet.AvailabilityZone), lo.FromPtr(instance.InstanceId)),
889938
Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint},
890939
},
891940
Status: corev1.NodeStatus{
892-
Capacity: instanceType.Capacity,
893-
Allocatable: instanceType.Allocatable(),
941+
Capacity: it.Capacity,
942+
Allocatable: it.Allocatable(),
894943
Phase: corev1.NodePending,
895944
},
896945
}

kwok/operator/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
103103
region := lo.Must(imds.NewFromConfig(cfg).GetRegion(ctx, nil))
104104
cfg.Region = region.Region
105105
}
106-
ec2api := kwokec2.NewClient(cfg.Region, option.MustGetEnv("SYSTEM_NAMESPACE"), ec2.NewFromConfig(cfg), kwokec2.NewNopRateLimiterProvider(), strategy.NewLowestPrice(pricing.NewAPI(cfg), ec2.NewFromConfig(cfg), cfg.Region), operator.GetClient(), operator.Clock, operator.GetConfig())
106+
ec2api := kwokec2.NewClient(ctx, cfg.Region, option.MustGetEnv("SYSTEM_NAMESPACE"), ec2.NewFromConfig(cfg), kwokec2.NewNopRateLimiterProvider(), strategy.NewLowestPrice(pricing.NewAPI(cfg), ec2.NewFromConfig(cfg), cfg.Region), operator.GetClient(), operator.Clock, operator.GetConfig())
107107

108108
eksapi := eks.NewFromConfig(cfg)
109109
log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")

0 commit comments

Comments
 (0)