Skip to content

Commit 7f65303

Browse files
committed
fix isUnavailable
Signed-off-by: Vacant2333 <vacant2333@gmail.com>
1 parent 9fb2451 commit 7f65303

File tree

6 files changed

+5969
-666
lines changed

6 files changed

+5969
-666
lines changed

pkg/cache/unavailableofferings.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ import (
2222

2323
"github.com/patrickmn/go-cache"
2424
"knative.dev/pkg/logging"
25-
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
26-
)
27-
28-
var (
29-
spotKey = key("", "", karpv1.CapacityTypeSpot)
3025
)
3126

3227
// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
@@ -56,11 +51,6 @@ func NewUnavailableOfferings() *UnavailableOfferings {
5651

5752
// IsUnavailable returns true if the offering appears in the cache
5853
func (u *UnavailableOfferings) IsUnavailable(instanceType, zone, capacityType string) bool {
59-
if capacityType == karpv1.CapacityTypeSpot {
60-
if _, found := u.cache.Get(spotKey); found {
61-
return true
62-
}
63-
}
6454
_, found := u.cache.Get(key(instanceType, zone, capacityType))
6555
return found
6656
}

pkg/controllers/interruption/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
8585
return reconcile.Result{}, nil
8686
}
8787

88-
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
88+
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
8989
return controllerruntime.NewControllerManagedBy(m).
9090
Named("interruption").
9191
For(&corev1.Node{}).

pkg/controllers/nodeclaim/garbagecollection/controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ import (
3939
)
4040

4141
type Controller struct {
42-
kubeClient client.Client
43-
cloudProvider cloudprovider.CloudProvider
44-
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
42+
kubeClient client.Client
43+
cloudProvider cloudprovider.CloudProvider
44+
// Keeps track of successful reconciles for more aggressive requeueing near the start of the controller.
45+
successfulCount uint64
4546
}
4647

4748
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {

pkg/providers/instancetype/instancetype.go

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ type DefaultProvider struct {
6666

6767
instanceTypesInfo []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType
6868

69-
muInstanceTypesOfferings sync.RWMutex
70-
instanceTypesOfferings map[string]sets.Set[string]
69+
muInstanceTypesOfferings sync.RWMutex
70+
instanceTypesOfferings map[string]sets.Set[string]
71+
spotInstanceTypesOfferings map[string]sets.Set[string]
7172

7273
instanceTypesCache *cache.Cache
7374

@@ -83,17 +84,18 @@ func NewDefaultProvider(region string, kubeClient client.Client, ecsClient *ecsc
8384
instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings,
8485
pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider {
8586
return &DefaultProvider{
86-
kubeClient: kubeClient,
87-
ecsClient: ecsClient,
88-
region: region,
89-
pricingProvider: pricingProvider,
90-
ackProvider: ackProvider,
91-
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
92-
instanceTypesOfferings: map[string]sets.Set[string]{},
93-
instanceTypesCache: instanceTypesCache,
94-
unavailableOfferings: unavailableOfferingsCache,
95-
cm: pretty.NewChangeMonitor(),
96-
instanceTypesSeqNum: 0,
87+
kubeClient: kubeClient,
88+
ecsClient: ecsClient,
89+
region: region,
90+
pricingProvider: pricingProvider,
91+
ackProvider: ackProvider,
92+
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
93+
instanceTypesOfferings: map[string]sets.Set[string]{},
94+
spotInstanceTypesOfferings: map[string]sets.Set[string]{},
95+
instanceTypesCache: instanceTypesCache,
96+
unavailableOfferings: unavailableOfferingsCache,
97+
cm: pretty.NewChangeMonitor(),
98+
instanceTypesSeqNum: 0,
9799
}
98100
}
99101

@@ -111,6 +113,9 @@ func (p *DefaultProvider) validateState(nodeClass *v1alpha1.ECSNodeClass) error
111113
if len(p.instanceTypesOfferings) == 0 {
112114
return errors.New("no instance types offerings found")
113115
}
116+
if len(p.spotInstanceTypesOfferings) == 0 {
117+
return errors.New("no spot instance types offerings found")
118+
}
114119
if len(nodeClass.Status.VSwitches) == 0 {
115120
return errors.New("no vswitches found")
116121
}
@@ -182,13 +187,15 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
182187
zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData {
183188
if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) {
184189
return ZoneData{
185-
ID: zoneID,
186-
Available: false,
190+
ID: zoneID,
191+
Available: false,
192+
SpotAvailable: false,
187193
}
188194
}
189195
return ZoneData{
190-
ID: zoneID,
191-
Available: true,
196+
ID: zoneID,
197+
Available: true,
198+
SpotAvailable: p.spotInstanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID),
192199
}
193200
})
194201

@@ -271,7 +278,42 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
271278
log.FromContext(ctx).Error(err, "failed to get instance type offerings")
272279
return err
273280
}
281+
err = processAvailableResourcesResponse(resp, instanceTypesOfferings)
282+
if err != nil {
283+
log.FromContext(ctx).Error(err, "failed to process available resource response")
284+
return err
285+
}
286+
287+
if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
288+
// Only update instanceTypesSeqNun with the instance type offerings have been changed
289+
// This is to not create new keys with duplicate instance type offerings option
290+
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
291+
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
292+
}
293+
p.instanceTypesOfferings = instanceTypesOfferings
294+
295+
spotInstanceTypesOfferings := map[string]sets.Set[string]{}
296+
describeAvailableResourceRequest = &ecsclient.DescribeAvailableResourceRequest{
297+
RegionId: tea.String(p.region),
298+
DestinationResource: tea.String("InstanceType"),
299+
SpotStrategy: tea.String("SpotAsPriceGo"),
300+
}
301+
resp, err = p.ecsClient.DescribeAvailableResourceWithOptions(
302+
describeAvailableResourceRequest, &util.RuntimeOptions{})
303+
if err != nil {
304+
log.FromContext(ctx).Error(err, "failed to get spot instance type offerings")
305+
return err
306+
}
307+
err = processAvailableResourcesResponse(resp, spotInstanceTypesOfferings)
308+
if err != nil {
309+
log.FromContext(ctx).Error(err, "failed to process spot instance type offerings")
310+
return err
311+
}
312+
p.spotInstanceTypesOfferings = spotInstanceTypesOfferings
313+
return nil
314+
}
274315

316+
func processAvailableResourcesResponse(resp *ecsclient.DescribeAvailableResourceResponse, offerings map[string]sets.Set[string]) error {
275317
if resp == nil || resp.Body == nil {
276318
return errors.New("DescribeAvailableResourceWithOptions failed to return any instance types")
277319
} else if resp.Body.AvailableZones == nil || len(resp.Body.AvailableZones.AvailableZone) == 0 {
@@ -280,18 +322,11 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
280322

281323
for _, az := range resp.Body.AvailableZones.AvailableZone {
282324
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
283-
if *az.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
284-
processAvailableResources(az, instanceTypesOfferings)
325+
// WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
326+
if *az.StatusCategory == "WithStock" {
327+
processAvailableResources(az, offerings)
285328
}
286329
}
287-
288-
if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
289-
// Only update instanceTypesSeqNun with the instance type offerings have been changed
290-
// This is to not create new keys with duplicate instance type offerings option
291-
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
292-
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
293-
}
294-
p.instanceTypesOfferings = instanceTypesOfferings
295330
return nil
296331
}
297332

@@ -365,6 +400,10 @@ func getAllInstanceTypes(client *ecsclient.Client) ([]*ecsclient.DescribeInstanc
365400
func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string, zones []ZoneData) []cloudprovider.Offering {
366401
var offerings []cloudprovider.Offering
367402
for _, zone := range zones {
403+
if !zone.Available {
404+
continue
405+
}
406+
368407
odPrice, odOK := p.pricingProvider.OnDemandPrice(instanceType)
369408
spotPrice, spotOK := p.pricingProvider.SpotPrice(instanceType, zone.ID)
370409

@@ -375,7 +414,7 @@ func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string
375414
offerings = append(offerings, p.createOffering(zone.ID, karpv1.CapacityTypeOnDemand, odPrice, offeringAvailable))
376415
}
377416

378-
if spotOK {
417+
if spotOK && zone.SpotAvailable {
379418
isUnavailable := p.unavailableOfferings.IsUnavailable(instanceType, zone.ID, karpv1.CapacityTypeSpot)
380419
offeringAvailable := !isUnavailable && zone.Available
381420

pkg/providers/instancetype/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ const (
5353
)
5454

5555
type ZoneData struct {
56-
ID string
57-
Available bool
56+
ID string
57+
Available bool
58+
SpotAvailable bool
5859
}
5960

6061
func calculateResourceOverhead(pods, cpuM, memoryMi int64) corev1.ResourceList {
@@ -79,7 +80,7 @@ var thresholds = [...]struct {
7980
overhead float64
8081
}{
8182
{1000, 0.06},
82-
{3000, 0.01},
83+
{2000, 0.01},
8384
{3000, 0.005},
8485
{4000, 0.005},
8586
}

0 commit comments

Comments
 (0)