@@ -35,14 +35,13 @@ import (
3535 corev1 "k8s.io/api/core/v1"
3636 "k8s.io/apimachinery/pkg/api/equality"
3737 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38- "k8s.io/client-go/rest"
3938 "k8s.io/client-go/util/workqueue"
4039 "k8s.io/utils/clock"
40+ "k8s.io/utils/set"
4141 "sigs.k8s.io/controller-runtime/pkg/client"
4242 "sigs.k8s.io/controller-runtime/pkg/log"
4343 "sigs.k8s.io/karpenter/kwok/apis/v1alpha1"
4444 v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
45- "sigs.k8s.io/karpenter/pkg/cloudprovider"
4645
4746 k8serrors "k8s.io/apimachinery/pkg/api/errors"
4847
@@ -64,13 +63,16 @@ type Client struct {
6463 subnets []ec2types.Subnet
6564 strategy strategy.Strategy
6665
67- instances sync.Map
66+ instances sync.Map
67+ instanceLaunchCancels sync.Map
68+
69+ readBackupCompleted chan struct {}
6870
6971 launchTemplates sync.Map
7072 launchTemplateNameToID sync.Map
7173}
7274
73- func NewClient (region , namespace string , ec2Client * ec2.Client , rateLimiterProvider RateLimiterProvider , strategy strategy.Strategy , kubeClient client.Client , clk clock.Clock , cfg * rest. Config ) * Client {
75+ func NewClient (region , namespace string , ec2Client * ec2.Client , rateLimiterProvider RateLimiterProvider , strategy strategy.Strategy , kubeClient client.Client , clk clock.Clock ) * Client {
7476 var instanceTypes []ec2types.InstanceTypeInfo
7577 instanceTypesPaginator := ec2 .NewDescribeInstanceTypesPaginator (ec2Client , & ec2.DescribeInstanceTypesInput {
7678 MaxResults : aws .Int32 (100 ),
@@ -100,18 +102,25 @@ func NewClient(region, namespace string, ec2Client *ec2.Client, rateLimiterProvi
100102 subnets : subnets ,
101103 strategy : strategy ,
102104
103- instances : sync.Map {},
105+ instances : sync.Map {},
106+ instanceLaunchCancels : sync.Map {},
107+
108+ readBackupCompleted : make (chan struct {}),
104109
105110 launchTemplates : sync.Map {},
106111 launchTemplateNameToID : sync.Map {},
107112 }
108- c .readBackup (context .Background (), cfg )
109113 return c
110114}
111115
112- func (c * Client ) readBackup (ctx context.Context , cfg * rest. Config ) {
116+ func (c * Client ) ReadBackup (ctx context.Context ) {
113117 configMaps := & corev1.ConfigMapList {}
114- lo .Must0 (client .IgnoreNotFound (lo .Must (client .New (cfg , client.Options {})).List (ctx , configMaps , client .InNamespace (c .namespace ))))
118+ lo .Must0 (c .kubeClient .List (ctx , configMaps , client .InNamespace (c .namespace )))
119+
120+ nodeList := & corev1.NodeList {}
121+ lo .Must0 (c .kubeClient .List (ctx , nodeList , client.MatchingLabels {v1alpha1 .KwokLabelKey : v1alpha1 .KwokLabelValue }))
122+
123+ instanceIDs := set .New [string ](lo .Map (nodeList .Items , func (n corev1.Node , _ int ) string { return lo .Must (utils .ParseInstanceID (n .Spec .ProviderID )) })... )
115124
116125 configMaps .Items = lo .Filter (configMaps .Items , func (c corev1.ConfigMap , _ int ) bool {
117126 return strings .Contains (c .Name , "kwok-aws-instances-" )
@@ -123,11 +132,17 @@ func (c *Client) readBackup(ctx context.Context, cfg *rest.Config) {
123132 lo .Must0 (json .Unmarshal ([]byte (cm .Data ["instances" ]), & instances ))
124133 for _ , instance := range instances {
125134 c .instances .Store (lo .FromPtr (instance .InstanceId ), instance )
135+ // Register nodes immediately if we killed the KWOK controller before actually registering the node
136+ if ! instanceIDs .Has (lo .FromPtr (instance .InstanceId )) {
137+ log .FromContext (ctx ).WithValues ("instance-id" , lo .FromPtr (instance .InstanceId )).Info ("creating node for instance id" )
138+ lo .Must0 (c .kubeClient .Create (ctx , c .toNode (ctx , instance )))
139+ }
126140 }
127141 total += len (instances )
128142 }
129143 }
130144 log .FromContext (ctx ).WithValues ("count" , total ).Info ("loaded instances from backup" )
145+ close (c .readBackupCompleted )
131146}
132147
133148//nolint:gocyclo
@@ -175,7 +190,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
175190 numConfigMaps := int (math .Ceil (float64 (len (instances )) / float64 (500 )))
176191 if numConfigMaps < len (configMaps .Items ) {
177192 errs := make ([]error , numConfigMaps )
178- workqueue .ParallelizeUntil (ctx , 10 , len (configMaps .Items )- numConfigMaps , func (i int ) {
193+ workqueue .ParallelizeUntil (ctx , len ( configMaps . Items ) - numConfigMaps , len (configMaps .Items )- numConfigMaps , func (i int ) {
179194 if err := c .kubeClient .Delete (ctx , & configMaps .Items [len (configMaps .Items )- i - 1 ]); client .IgnoreNotFound (err ) != nil {
180195 errs [i ] = fmt .Errorf ("deleting configmap %q, %w" , configMaps .Items [len (configMaps .Items )- i - 1 ].Name , err )
181196 }
@@ -186,7 +201,7 @@ func (c *Client) backupInstances(ctx context.Context) error {
186201 }
187202
188203 errs := make ([]error , numConfigMaps )
189- workqueue .ParallelizeUntil (ctx , 10 , numConfigMaps , func (i int ) {
204+ workqueue .ParallelizeUntil (ctx , numConfigMaps , numConfigMaps , func (i int ) {
190205 cm := & corev1.ConfigMap {
191206 ObjectMeta : metav1.ObjectMeta {
192207 Name : fmt .Sprintf ("kwok-aws-instances-%d" , i ),
@@ -218,13 +233,14 @@ func (c *Client) backupInstances(ctx context.Context) error {
218233
219234// StartBackupThread initiates the thread that is responsible for storing instances in ConfigMaps on the cluster
220235func (c * Client ) StartBackupThread (ctx context.Context ) {
236+ <- c .readBackupCompleted
221237 for {
222238 if err := c .backupInstances (ctx ); err != nil {
223239 log .FromContext (ctx ).Error (err , "unable to backup instances" )
224240 continue
225241 }
226242 select {
227- case <- time .After (time .Second * 5 ):
243+ case <- time .After (time .Second ):
228244 case <- ctx .Done ():
229245 return
230246 }
@@ -233,6 +249,7 @@ func (c *Client) StartBackupThread(ctx context.Context) {
233249
234250// StartKillNodeThread initiates the thread that is responsible for killing nodes on the cluster that no longer have an instance representation (similar to CCM)
235251func (c * Client ) StartKillNodeThread (ctx context.Context ) {
252+ <- c .readBackupCompleted
236253 for {
237254 nodes := & corev1.NodeList {}
238255 if err := c .kubeClient .List (ctx , nodes , client.MatchingLabels {v1alpha1 .KwokLabelKey : v1alpha1 .KwokLabelValue }); err != nil {
@@ -276,6 +293,7 @@ func removeNullFields(bytes []byte) []byte {
276293
277294//nolint:gocyclo
278295func (c * Client ) DescribeLaunchTemplates (_ context.Context , input * ec2.DescribeLaunchTemplatesInput , _ ... func (* ec2.Options )) (* ec2.DescribeLaunchTemplatesOutput , error ) {
296+ <- c .readBackupCompleted
279297 if ! c .rateLimiterProvider .DescribeLaunchTemplates ().TryAccept () {
280298 return nil , & smithy.GenericAPIError {
281299 Code : errors .RateLimitingErrorCode ,
@@ -372,6 +390,7 @@ func (c *Client) DescribeLaunchTemplates(_ context.Context, input *ec2.DescribeL
372390
373391//nolint:gocyclo
374392func (c * Client ) CreateFleet (ctx context.Context , input * ec2.CreateFleetInput , _ ... func (* ec2.Options )) (* ec2.CreateFleetOutput , error ) {
393+ <- c .readBackupCompleted
375394 if ! c .rateLimiterProvider .CreateFleet ().TryAccept () {
376395 return nil , & smithy.GenericAPIError {
377396 Code : errors .RateLimitingErrorCode ,
@@ -450,28 +469,9 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
450469 instanceTypeInfo := lo .Must (lo .Find (c .instanceTypes , func (i ec2types.InstanceTypeInfo ) bool {
451470 return i .InstanceType == selectedOverride .InstanceType
452471 }))
453- // TODO: We need to get the capacity and allocatable information from the userData
454- it := instancetype .NewInstanceType (
455- ctx ,
456- instanceTypeInfo ,
457- c .region ,
458- nil ,
459- nil ,
460- nil ,
461- nil ,
462- nil ,
463- nil ,
464- nil ,
465- nil ,
466- nil ,
467- nil ,
468- // TODO: Eventually support different AMIFamilies from userData
469- "al2023" ,
470- nil ,
471- )
472472 instance := ec2types.Instance {
473473 AmiLaunchIndex : nil ,
474- Architecture : lo .Ternary (it . Requirements . Get ( corev1 . LabelArchStable ). Any () == v1 . ArchitectureAmd64 , ec2types .ArchitectureValuesX8664 , ec2types .ArchitectureValuesArm64 ),
474+ Architecture : lo .Ternary (lo . Contains ( instanceTypeInfo . ProcessorInfo . SupportedArchitectures , ec2types . ArchitectureTypeX8664 ) , ec2types .ArchitectureValuesX8664 , ec2types .ArchitectureValuesArm64 ),
475475 // TODO: The block device mappings here don't have any data on the ephemeral storage size
476476 BlockDeviceMappings : lo .Map (lt .B .BlockDeviceMappings , func (b ec2types.LaunchTemplateBlockDeviceMappingRequest , _ int ) ec2types.InstanceBlockDeviceMapping {
477477 return ec2types.InstanceBlockDeviceMapping {
@@ -586,15 +586,21 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
586586 VpcId : subnet .VpcId ,
587587 }
588588 c .instances .Store (lo .FromPtr (instance .InstanceId ), instance )
589+ launchCtx , cancel := context .WithCancel (ctx )
590+ c .instanceLaunchCancels .Store (lo .FromPtr (instance .InstanceId ), cancel )
589591
590- // Create the Node through the instance launch
591- // TODO: Eventually support delayed registration
592- nodePoolNameTag , _ := lo .Find (instance .Tags , func (t ec2types.Tag ) bool {
593- return lo .FromPtr (t .Key ) == v1 .NodePoolLabelKey
594- })
595- if err := c .kubeClient .Create (ctx , toNode (lo .FromPtr (instance .InstanceId ), lo .FromPtr (nodePoolNameTag .Value ), it , lo .FromPtr (subnet .AvailabilityZone ), v1 .CapacityTypeOnDemand )); err != nil {
596- return nil , fmt .Errorf ("creating node, %w" , err )
597- }
592+ go func () {
593+ select {
594+ case <- launchCtx .Done ():
595+ return
596+ // This is meant to simulate instance startup time
597+ case <- c .clock .After (30 * time .Second ):
598+ }
599+ if err := c .kubeClient .Create (launchCtx , c .toNode (ctx , instance )); err != nil {
600+ c .instances .Delete (lo .FromPtr (instance .InstanceId ))
601+ c .instanceLaunchCancels .Delete (lo .FromPtr (instance .InstanceId ))
602+ }
603+ }()
598604 fleetInstances = append (fleetInstances , ec2types.CreateFleetInstance {
599605 InstanceIds : []string {lo .FromPtr (instance .InstanceId )},
600606 InstanceType : instance .InstanceType ,
@@ -628,6 +634,7 @@ func (c *Client) CreateFleet(ctx context.Context, input *ec2.CreateFleetInput, _
628634}
629635
630636func (c * Client ) TerminateInstances (_ context.Context , input * ec2.TerminateInstancesInput , _ ... func (* ec2.Options )) (* ec2.TerminateInstancesOutput , error ) {
637+ <- c .readBackupCompleted
631638 if ! c .rateLimiterProvider .TerminateInstances ().TryAccept () {
632639 return nil , & smithy.GenericAPIError {
633640 Code : errors .RateLimitingErrorCode ,
@@ -644,6 +651,9 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
644651
645652 for _ , id := range input .InstanceIds {
646653 c .instances .Delete (id )
654+ if cancel , ok := c .instanceLaunchCancels .LoadAndDelete (id ); ok {
655+ cancel .(context.CancelFunc )()
656+ }
647657 }
648658 return & ec2.TerminateInstancesOutput {
649659 TerminatingInstances : lo .Map (input .InstanceIds , func (id string , _ int ) ec2types.InstanceStateChange {
@@ -663,6 +673,7 @@ func (c *Client) TerminateInstances(_ context.Context, input *ec2.TerminateInsta
663673}
664674
665675func (c * Client ) DescribeInstances (_ context.Context , input * ec2.DescribeInstancesInput , _ ... func (* ec2.Options )) (* ec2.DescribeInstancesOutput , error ) {
676+ <- c .readBackupCompleted
666677 if ! c .rateLimiterProvider .DescribeInstances ().TryAccept () {
667678 return nil , & smithy.GenericAPIError {
668679 Code : errors .RateLimitingErrorCode ,
@@ -713,6 +724,7 @@ func (c *Client) DescribeInstances(_ context.Context, input *ec2.DescribeInstanc
713724}
714725
715726func (c * Client ) RunInstances (_ context.Context , input * ec2.RunInstancesInput , _ ... func (* ec2.Options )) (* ec2.RunInstancesOutput , error ) {
727+ <- c .readBackupCompleted
716728 if ! c .rateLimiterProvider .RunInstances ().TryAccept () {
717729 return nil , & smithy.GenericAPIError {
718730 Code : errors .RateLimitingErrorCode ,
@@ -733,6 +745,7 @@ func (c *Client) RunInstances(_ context.Context, input *ec2.RunInstancesInput, _
733745
734746//nolint:gocyclo
735747func (c * Client ) CreateTags (_ context.Context , input * ec2.CreateTagsInput , _ ... func (* ec2.Options )) (* ec2.CreateTagsOutput , error ) {
748+ <- c .readBackupCompleted
736749 if ! c .rateLimiterProvider .CreateTags ().TryAccept () {
737750 return nil , & smithy.GenericAPIError {
738751 Code : errors .RateLimitingErrorCode ,
@@ -791,6 +804,7 @@ func (c *Client) CreateTags(_ context.Context, input *ec2.CreateTagsInput, _ ...
791804}
792805
793806func (c * Client ) CreateLaunchTemplate (_ context.Context , input * ec2.CreateLaunchTemplateInput , _ ... func (* ec2.Options )) (* ec2.CreateLaunchTemplateOutput , error ) {
807+ <- c .readBackupCompleted
794808 if ! c .rateLimiterProvider .CreateLaunchTemplate ().TryAccept () {
795809 return nil , & smithy.GenericAPIError {
796810 Code : errors .RateLimitingErrorCode ,
@@ -823,6 +837,7 @@ func (c *Client) CreateLaunchTemplate(_ context.Context, input *ec2.CreateLaunch
823837}
824838
825839func (c * Client ) DeleteLaunchTemplate (_ context.Context , input * ec2.DeleteLaunchTemplateInput , _ ... func (* ec2.Options )) (* ec2.DeleteLaunchTemplateOutput , error ) {
840+ <- c .readBackupCompleted
826841 if ! c .rateLimiterProvider .DeleteLaunchTemplate ().TryAccept () {
827842 return nil , & smithy.GenericAPIError {
828843 Code : errors .RateLimitingErrorCode ,
@@ -862,7 +877,35 @@ func (c *Client) DeleteLaunchTemplate(_ context.Context, input *ec2.DeleteLaunch
862877 }, nil
863878}
864879
865- func toNode (instanceID , nodePoolName string , instanceType * cloudprovider.InstanceType , zone , capacityType string ) * corev1.Node {
880+ func (c * Client ) toNode (ctx context.Context , instance ec2types.Instance ) * corev1.Node {
881+ nodePoolNameTag , _ := lo .Find (instance .Tags , func (t ec2types.Tag ) bool {
882+ return lo .FromPtr (t .Key ) == v1 .NodePoolLabelKey
883+ })
884+ subnet := lo .Must (lo .Find (c .subnets , func (s ec2types.Subnet ) bool {
885+ return lo .FromPtr (s .SubnetId ) == lo .FromPtr (instance .SubnetId )
886+ }))
887+ instanceTypeInfo := lo .Must (lo .Find (c .instanceTypes , func (i ec2types.InstanceTypeInfo ) bool {
888+ return i .InstanceType == instance .InstanceType
889+ }))
890+ // TODO: We need to get the capacity and allocatable information from the userData
891+ it := instancetype .NewInstanceType (
892+ ctx ,
893+ instanceTypeInfo ,
894+ c .region ,
895+ nil ,
896+ nil ,
897+ nil ,
898+ nil ,
899+ nil ,
900+ nil ,
901+ nil ,
902+ nil ,
903+ nil ,
904+ nil ,
905+ // TODO: Eventually support different AMIFamilies from userData
906+ "al2023" ,
907+ nil ,
908+ )
866909 nodeName := fmt .Sprintf ("%s-%d" , strings .ReplaceAll (namesgenerator .GetRandomName (0 ), "_" , "-" ), rand .Uint32 ()) //nolint:gosec
867910 return & corev1.Node {
868911 ObjectMeta : metav1.ObjectMeta {
@@ -872,25 +915,25 @@ func toNode(instanceID, nodePoolName string, instanceType *cloudprovider.Instanc
872915 },
873916 // TODO: We can eventually add all the labels from the userData but for now we just add the NodePool labels
874917 Labels : map [string ]string {
875- corev1 .LabelInstanceTypeStable : instanceType .Name ,
918+ corev1 .LabelInstanceTypeStable : it .Name ,
876919 corev1 .LabelHostname : nodeName ,
877- corev1 .LabelTopologyRegion : instanceType .Requirements .Get (corev1 .LabelTopologyRegion ).Any (),
878- corev1 .LabelTopologyZone : zone ,
879- v1 .CapacityTypeLabelKey : capacityType ,
880- corev1 .LabelArchStable : instanceType .Requirements .Get (corev1 .LabelArchStable ).Any (),
920+ corev1 .LabelTopologyRegion : it .Requirements .Get (corev1 .LabelTopologyRegion ).Any (),
921+ corev1 .LabelTopologyZone : lo . FromPtr ( subnet . AvailabilityZone ) ,
922+ v1 .CapacityTypeLabelKey : v1 . CapacityTypeOnDemand ,
923+ corev1 .LabelArchStable : it .Requirements .Get (corev1 .LabelArchStable ).Any (),
881924 corev1 .LabelOSStable : string (corev1 .Linux ),
882- v1 .NodePoolLabelKey : nodePoolName ,
925+ v1 .NodePoolLabelKey : lo . FromPtr ( nodePoolNameTag . Value ) ,
883926 v1alpha1 .KwokLabelKey : v1alpha1 .KwokLabelValue ,
884927 v1alpha1 .KwokPartitionLabelKey : "a" ,
885928 },
886929 },
887930 Spec : corev1.NodeSpec {
888- ProviderID : fmt .Sprintf ("kwok-aws:///%s/%s" , zone , instanceID ),
931+ ProviderID : fmt .Sprintf ("kwok-aws:///%s/%s" , lo . FromPtr ( subnet . AvailabilityZone ), lo . FromPtr ( instance . InstanceId ) ),
889932 Taints : []corev1.Taint {v1 .UnregisteredNoExecuteTaint },
890933 },
891934 Status : corev1.NodeStatus {
892- Capacity : instanceType .Capacity ,
893- Allocatable : instanceType .Allocatable (),
935+ Capacity : it .Capacity ,
936+ Allocatable : it .Allocatable (),
894937 Phase : corev1 .NodePending ,
895938 },
896939 }
0 commit comments