Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (

"github.com/patrickmn/go-cache"
"knative.dev/pkg/logging"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
)

var (
spotKey = key("", "", karpv1.CapacityTypeSpot)
)

// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
Expand Down Expand Up @@ -56,11 +51,6 @@ func NewUnavailableOfferings() *UnavailableOfferings {

// IsUnavailable returns true if the offering appears in the cache
func (u *UnavailableOfferings) IsUnavailable(instanceType, zone, capacityType string) bool {
if capacityType == karpv1.CapacityTypeSpot {
if _, found := u.cache.Get(spotKey); found {
return true
}
}
_, found := u.cache.Get(key(instanceType, zone, capacityType))
return found
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nil
}

func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("interruption").
For(&corev1.Node{}).
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import (
)

type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
// Keeps track of successful reconciles for more aggressive requeueing near the start of the controller.
successfulCount uint64
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
Expand Down
106 changes: 73 additions & 33 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type DefaultProvider struct {

instanceTypesInfo []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType

muInstanceTypesOfferings sync.RWMutex
instanceTypesOfferings map[string]sets.Set[string]
muInstanceTypesOfferings sync.RWMutex
instanceTypesOfferings map[string]sets.Set[string]
spotInstanceTypesOfferings map[string]sets.Set[string]

instanceTypesCache *cache.Cache

Expand All @@ -83,17 +84,18 @@ func NewDefaultProvider(region string, kubeClient client.Client, ecsClient *ecsc
instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings,
pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
ackProvider: ackProvider,
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
instanceTypesOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
unavailableOfferings: unavailableOfferingsCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
ackProvider: ackProvider,
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
instanceTypesOfferings: map[string]sets.Set[string]{},
spotInstanceTypesOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
unavailableOfferings: unavailableOfferingsCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
}
}

Expand All @@ -111,6 +113,9 @@ func (p *DefaultProvider) validateState(nodeClass *v1alpha1.ECSNodeClass) error
if len(p.instanceTypesOfferings) == 0 {
return errors.New("no instance types offerings found")
}
if len(p.spotInstanceTypesOfferings) == 0 {
return errors.New("no spot instance types offerings found")
}
if len(nodeClass.Status.VSwitches) == 0 {
return errors.New("no vswitches found")
}
Expand Down Expand Up @@ -182,13 +187,15 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData {
if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) {
return ZoneData{
ID: zoneID,
Available: false,
ID: zoneID,
Available: false,
SpotAvailable: false,
}
}
return ZoneData{
ID: zoneID,
Available: true,
ID: zoneID,
Available: true,
SpotAvailable: p.spotInstanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID),
}
})

Expand Down Expand Up @@ -271,7 +278,40 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
log.FromContext(ctx).Error(err, "failed to get instance type offerings")
return err
}
if err := processAvailableResourcesResponse(resp, instanceTypesOfferings); err != nil {
log.FromContext(ctx).Error(err, "failed to process available resource response")
return err
}

if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypesOfferings = instanceTypesOfferings

spotInstanceTypesOfferings := map[string]sets.Set[string]{}
describeAvailableResourceRequest = &ecsclient.DescribeAvailableResourceRequest{
RegionId: tea.String(p.region),
DestinationResource: tea.String("InstanceType"),
SpotStrategy: tea.String("SpotAsPriceGo"),
}
resp, err = p.ecsClient.DescribeAvailableResourceWithOptions(
describeAvailableResourceRequest, &util.RuntimeOptions{})
if err != nil {
log.FromContext(ctx).Error(err, "failed to get spot instance type offerings")
return err
}
if err := processAvailableResourcesResponse(resp, spotInstanceTypesOfferings); err != nil {
log.FromContext(ctx).Error(err, "failed to process spot instance type offerings")
return err
}
p.spotInstanceTypesOfferings = spotInstanceTypesOfferings
return nil
}

func processAvailableResourcesResponse(resp *ecsclient.DescribeAvailableResourceResponse, offerings map[string]sets.Set[string]) error {
if resp == nil || resp.Body == nil {
return errors.New("DescribeAvailableResourceWithOptions failed to return any instance types")
} else if resp.Body.AvailableZones == nil || len(resp.Body.AvailableZones.AvailableZone) == 0 {
Expand All @@ -280,18 +320,12 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error

for _, az := range resp.Body.AvailableZones.AvailableZone {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *az.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
processAvailableResources(az, instanceTypesOfferings)
// WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if *az.StatusCategory != "WithStock" {
continue
}
processAvailableResources(az, offerings)
}

if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypesOfferings = instanceTypesOfferings
return nil
}

Expand All @@ -307,12 +341,14 @@ func processAvailableResources(az *ecsclient.DescribeAvailableResourceResponseBo

for _, sr := range ar.SupportedResources.SupportedResource {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *sr.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if _, ok := instanceTypesOfferings[*sr.Value]; !ok {
instanceTypesOfferings[*sr.Value] = sets.New[string]()
}
instanceTypesOfferings[*sr.Value].Insert(*az.ZoneId)
// WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if *sr.StatusCategory != "WithStock" {
continue
}
if _, ok := instanceTypesOfferings[*sr.Value]; !ok {
instanceTypesOfferings[*sr.Value] = sets.New[string]()
}
instanceTypesOfferings[*sr.Value].Insert(*az.ZoneId)
}
}
}
Expand Down Expand Up @@ -365,6 +401,10 @@ func getAllInstanceTypes(client *ecsclient.Client) ([]*ecsclient.DescribeInstanc
func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string, zones []ZoneData) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for _, zone := range zones {
if !zone.Available {
continue
}

odPrice, odOK := p.pricingProvider.OnDemandPrice(instanceType)
spotPrice, spotOK := p.pricingProvider.SpotPrice(instanceType, zone.ID)

Expand All @@ -375,7 +415,7 @@ func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string
offerings = append(offerings, p.createOffering(zone.ID, karpv1.CapacityTypeOnDemand, odPrice, offeringAvailable))
}

if spotOK {
if spotOK && zone.SpotAvailable {
isUnavailable := p.unavailableOfferings.IsUnavailable(instanceType, zone.ID, karpv1.CapacityTypeSpot)
offeringAvailable := !isUnavailable && zone.Available

Expand Down
7 changes: 4 additions & 3 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ const (
)

type ZoneData struct {
ID string
Available bool
ID string
Available bool
SpotAvailable bool
}

func calculateResourceOverhead(pods, cpuM, memoryMi int64) corev1.ResourceList {
Expand All @@ -79,7 +80,7 @@ var thresholds = [...]struct {
overhead float64
}{
{1000, 0.06},
{3000, 0.01},
{2000, 0.01},
{3000, 0.005},
{4000, 0.005},
}
Expand Down
Loading