Skip to content

Commit 501287b

Browse files
feat: Add grace period for do-not-disrupt annotation (kubernetes-sigs#2874)
1 parent e618e90 commit 501287b

File tree

25 files changed

+823
-92
lines changed

25 files changed

+823
-92
lines changed

designs/do-not-disrupt-grace-period.md

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ Extend the existing `karpenter.sh/do-not-disrupt` annotation to accept duration
7979
#### Annotation Values
8080

8181
- `"true"`: Indefinite protection (existing behavior, backward compatible)
82-
- Duration string: Protection for the specified duration from pod binding time (e.g., `"4h"`, `"30m"`, `"1h30m"`)
82+
- Duration string: Protection for the specified duration from pod start time (e.g., `"4h"`, `"30m"`, `"1h30m"`)
8383
- Follows Go's `time.Duration` format
8484
- Common examples: `"30m"`, `"1h"`, `"4h"`, `"24h"`, `"1h30m"`
8585

@@ -100,19 +100,19 @@ spec:
100100

101101
1. **Indefinite Protection**: If `karpenter.sh/do-not-disrupt: "true"` is set, the behavior remains the same as today - indefinite protection (backward compatible)
102102
2. **Time-Limited Protection**: If set to a duration value (e.g., `"4h"`):
103-
- The pod is protected from disruption for the specified duration starting from the pod's binding time
103+
- The pod is protected from disruption for the specified duration starting from the pod's start time
104104
- After the grace period expires, the pod is treated as if it doesn't have the do-not-disrupt annotation
105105
- The node becomes eligible for disruption if no other constraints prevent it
106-
3. **Invalid Values**: If the value cannot be parsed as either `"true"` or a valid duration, it is treated as indefinite protection (backward compatible, fail-safe behavior)
106+
3. **Invalid Values**: If the value cannot be parsed as either `"true"` or a valid duration, the pod is treated as if it doesn't have the do-not-disrupt annotation, maintaining Karpenter's current behavior
107107

108108
### Time Calculation
109109

110110
The grace period expiration time is calculated as:
111111
```
112-
expiration_time = pod.BoundTimestamp + parsed_duration
112+
expiration_time = pod.Status.StartTime + parsed_duration
113113
```
114114

115-
For example, if a pod is bound at `2024-01-01T10:00:00Z` with `karpenter.sh/do-not-disrupt: "4h"`, it will be protected until `2024-01-01T14:00:00Z`.
115+
For example, if a pod starts at `2024-01-01T10:00:00Z` with `karpenter.sh/do-not-disrupt: "4h"`, it will be protected until `2024-01-01T14:00:00Z`.
116116

117117
### Implementation Details
118118

@@ -127,13 +127,13 @@ func parseDoNotDisrupt(value string) (indefinite bool, duration time.Duration, e
127127

128128
d, err := time.ParseDuration(value)
129129
if err != nil {
130-
// Invalid format - treat as indefinite (fail-safe)
131-
return true, 0, nil
130+
// Invalid format - treat as if annotation doesn't exist
131+
return false, 0, fmt.Errorf("failed to parse %q as a duration: %w", value, err)
132132
}
133133

134134
if d <= 0 {
135-
// Zero or negative - treat as indefinite (fail-safe)
136-
return true, 0, nil
135+
// Zero or negative - treat as if annotation doesn't exist
136+
return false, 0, fmt.Errorf("duration %q must be positive", value)
137137
}
138138

139139
return false, d, nil
@@ -175,7 +175,7 @@ spec:
175175
```
176176

177177
**Behavior**:
178-
- For the first 2 hours after pod binding, the node hosting this job will not be disrupted
178+
- For the first 2 hours after pod start, the node hosting this job will not be disrupted
179179
- After 2 hours, if the job hasn't completed, the node becomes eligible for disruption
180180
- This provides protection for normal execution while ensuring the node can eventually be disrupted
181181

@@ -332,7 +332,7 @@ The annotation controls **when** a pod can be disrupted. Pod's `terminationGrace
332332

333333
Users can determine the grace period status by:
334334
1. Checking pod annotations for the grace period value
335-
2. Comparing current time against `pod.BindingTimestamp + grace_period`
335+
2. Comparing current time against `pod.Status.StartTime + grace_period`
336336
3. Observing Karpenter events when nodes become eligible for disruption
337337

338338
Potential future enhancements:
@@ -399,7 +399,6 @@ While ideally this would be a Kubernetes-native PDB feature, the immediate probl
399399

400400
This feature is fully backward compatible:
401401
- Existing pods with only `karpenter.sh/do-not-disrupt: "true"` continue to work as before
402-
- Invalid grace period values default to indefinite protection
403402
- No changes to existing API contracts
404403
405404
## Rollout Plan

pkg/controllers/controllers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func NewControllers(
158158

159159
if options.FromContext(ctx).FeatureGates.StaticCapacity {
160160
controllers = append(controllers, staticprovisioning.NewController(kubeClient, cluster, recorder, cloudProvider, p, clock))
161-
controllers = append(controllers, staticdeprovisioning.NewController(kubeClient, cluster, cloudProvider, clock))
161+
controllers = append(controllers, staticdeprovisioning.NewController(kubeClient, cluster, cloudProvider, clock, recorder))
162162
}
163163

164164
if options.FromContext(ctx).FeatureGates.NodeOverlay {

pkg/controllers/disruption/consolidation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
136136
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
137137
var err error
138138
// Run scheduling simulation to compute consolidation option
139-
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
139+
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, c.clock, c.recorder, candidates...)
140140
if err != nil {
141141
// if a candidate node is now deleting, just retry
142142
if errors.Is(err, errCandidateDeleting) {

pkg/controllers/disruption/consolidation_test.go

Lines changed: 139 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,7 +2018,7 @@ var _ = Describe("Consolidation", func() {
20182018
Entry("if the candidate is on-demand node", false),
20192019
Entry("if the candidate is spot node", true),
20202020
)
2021-
DescribeTable("can replace nodes, considers karpenter.sh/do-not-disrupt on nodes",
2021+
DescribeTable("can replace nodes, considers karpenter.sh/do-not-disrupt set to true on nodes",
20222022
func(spotToSpot bool) {
20232023
nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim)
20242024
node = lo.Ternary(spotToSpot, spotNode, node)
@@ -2108,7 +2108,7 @@ var _ = Describe("Consolidation", func() {
21082108
Entry("if the candidate is on-demand node", false),
21092109
Entry("if the candidate is spot node", true),
21102110
)
2111-
DescribeTable("can replace nodes, considers karpenter.sh/do-not-disrupt on pods",
2111+
DescribeTable("can replace nodes, considers karpenter.sh/do-not-disrupt set to true on pods",
21122112
func(spotToSpot bool) {
21132113
nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim)
21142114
node = lo.Ternary(spotToSpot, spotNode, node)
@@ -2627,7 +2627,7 @@ var _ = Describe("Consolidation", func() {
26272627
// eviction
26282628
ExpectNotFound(ctx, env.Client, nodeClaims[0], nodes[0])
26292629
})
2630-
It("can delete nodes, considers karpenter.sh/do-not-disrupt on nodes", func() {
2630+
It("can delete nodes, considers karpenter.sh/do-not-disrupt set to true on nodes", func() {
26312631
// create our RS so we can link a pod to it
26322632
rs := test.ReplicaSet()
26332633
ExpectApplied(ctx, env.Client, rs)
@@ -2669,7 +2669,7 @@ var _ = Describe("Consolidation", func() {
26692669
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
26702670
ExpectNotFound(ctx, env.Client, nodeClaims[0], nodes[0])
26712671
})
2672-
It("can delete nodes, considers karpenter.sh/do-not-disrupt on pods", func() {
2672+
It("can delete nodes, considers karpenter.sh/do-not-disrupt set to true on pods", func() {
26732673
// create our RS so we can link a pod to it
26742674
rs := test.ReplicaSet()
26752675
ExpectApplied(ctx, env.Client, rs)
@@ -2712,7 +2712,7 @@ var _ = Describe("Consolidation", func() {
27122712
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
27132713
ExpectNotFound(ctx, env.Client, nodeClaims[0], nodes[0])
27142714
})
2715-
It("does not consolidate nodes with karpenter.sh/do-not-disrupt on pods when the NodePool's TerminationGracePeriod is not nil", func() {
2715+
It("does not consolidate nodes with karpenter.sh/do-not-disrupt set to true on pods when the NodePool's TerminationGracePeriod is not nil", func() {
27162716
// create our RS so we can link a pod to it
27172717
rs := test.ReplicaSet()
27182718
ExpectApplied(ctx, env.Client, rs)
@@ -2807,6 +2807,138 @@ var _ = Describe("Consolidation", func() {
28072807
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2))
28082808
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2))
28092809
})
2810+
It("does not consolidate nodes with pods that have a duration-based do-not-disrupt annotation that is still active", func() {
2811+
rs := test.ReplicaSet()
2812+
ExpectApplied(ctx, env.Client, rs)
2813+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
2814+
2815+
pods := test.Pods(3, test.PodOptions{
2816+
ObjectMeta: metav1.ObjectMeta{Labels: labels,
2817+
OwnerReferences: []metav1.OwnerReference{
2818+
{
2819+
APIVersion: "apps/v1",
2820+
Kind: "ReplicaSet",
2821+
Name: rs.Name,
2822+
UID: rs.UID,
2823+
Controller: lo.ToPtr(true),
2824+
BlockOwnerDeletion: lo.ToPtr(true),
2825+
},
2826+
}}})
2827+
// Set a 2m duration-based do-not-disrupt annotation on pod[2]
2828+
pods[2].Annotations = lo.Assign(pods[2].Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "2m"})
2829+
pods[2].Status.StartTime = &metav1.Time{Time: fakeClock.Now()}
2830+
2831+
ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], nodePool)
2832+
ExpectApplied(ctx, env.Client, nodeClaims[0], nodes[0], nodeClaims[1], nodes[1])
2833+
2834+
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
2835+
ExpectManualBinding(ctx, env.Client, pods[1], nodes[0])
2836+
ExpectManualBinding(ctx, env.Client, pods[2], nodes[1])
2837+
2838+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1]})
2839+
ExpectSingletonReconciled(ctx, disruptionController)
2840+
2841+
// Grace period is still active on pod[2], so nodes[1] should not be consolidated
2842+
// Only nodes[0] (no annotated pods) can be consolidated
2843+
cmds := queue.GetCommands()
2844+
Expect(cmds).To(HaveLen(1))
2845+
ExpectObjectReconciled(ctx, env.Client, queue, cmds[0].Candidates[0].NodeClaim)
2846+
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims[0])
2847+
2848+
// nodes[0] deleted, nodes[1] still protected
2849+
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
2850+
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
2851+
ExpectNotFound(ctx, env.Client, nodeClaims[0], nodes[0])
2852+
})
2853+
It("can consolidate nodes after duration-based do-not-disrupt annotation expires", func() {
2854+
rs := test.ReplicaSet()
2855+
ExpectApplied(ctx, env.Client, rs)
2856+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
2857+
2858+
pods := test.Pods(3, test.PodOptions{
2859+
ObjectMeta: metav1.ObjectMeta{Labels: labels,
2860+
OwnerReferences: []metav1.OwnerReference{
2861+
{
2862+
APIVersion: "apps/v1",
2863+
Kind: "ReplicaSet",
2864+
Name: rs.Name,
2865+
UID: rs.UID,
2866+
Controller: lo.ToPtr(true),
2867+
BlockOwnerDeletion: lo.ToPtr(true),
2868+
},
2869+
}}})
2870+
// All pods have a 2m duration-based do-not-disrupt annotation
2871+
for _, p := range pods {
2872+
p.Annotations = lo.Assign(p.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "2m"})
2873+
p.Status.StartTime = &metav1.Time{Time: fakeClock.Now()}
2874+
}
2875+
2876+
ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], nodePool)
2877+
ExpectApplied(ctx, env.Client, nodeClaims[0], nodes[0], nodeClaims[1], nodes[1])
2878+
2879+
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
2880+
ExpectManualBinding(ctx, env.Client, pods[1], nodes[0])
2881+
ExpectManualBinding(ctx, env.Client, pods[2], nodes[1])
2882+
2883+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1]})
2884+
ExpectSingletonReconciled(ctx, disruptionController)
2885+
2886+
// All pods have active grace periods, no consolidation should happen
2887+
cmds := queue.GetCommands()
2888+
Expect(cmds).To(HaveLen(0))
2889+
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2))
2890+
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2))
2891+
2892+
// Advance clock past the 2m grace period
2893+
fakeClock.Step(3 * time.Minute)
2894+
2895+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1]})
2896+
ExpectSingletonReconciled(ctx, disruptionController)
2897+
2898+
// Grace periods expired, consolidation should now proceed
2899+
cmds = queue.GetCommands()
2900+
Expect(cmds).To(HaveLen(1))
2901+
})
2902+
It("can delete nodes, considers invalid do-not-disrupt annotation format as not blocking", func() {
2903+
rs := test.ReplicaSet()
2904+
ExpectApplied(ctx, env.Client, rs)
2905+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
2906+
2907+
pods := test.Pods(3, test.PodOptions{
2908+
ObjectMeta: metav1.ObjectMeta{Labels: labels,
2909+
OwnerReferences: []metav1.OwnerReference{
2910+
{
2911+
APIVersion: "apps/v1",
2912+
Kind: "ReplicaSet",
2913+
Name: rs.Name,
2914+
UID: rs.UID,
2915+
Controller: lo.ToPtr(true),
2916+
BlockOwnerDeletion: lo.ToPtr(true),
2917+
},
2918+
}}})
2919+
// Set an invalid format annotation - should not block consolidation
2920+
pods[2].Annotations = lo.Assign(pods[2].Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "invalid-format"})
2921+
2922+
ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], nodePool)
2923+
ExpectApplied(ctx, env.Client, nodeClaims[0], nodes[0], nodeClaims[1], nodes[1])
2924+
2925+
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
2926+
ExpectManualBinding(ctx, env.Client, pods[1], nodes[0])
2927+
ExpectManualBinding(ctx, env.Client, pods[2], nodes[1])
2928+
2929+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1]})
2930+
ExpectSingletonReconciled(ctx, disruptionController)
2931+
2932+
// Invalid annotation format should not block consolidation
2933+
cmds := queue.GetCommands()
2934+
Expect(cmds).To(HaveLen(1))
2935+
ExpectObjectReconciled(ctx, env.Client, queue, cmds[0].Candidates[0].NodeClaim)
2936+
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims[1])
2937+
2938+
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
2939+
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
2940+
ExpectNotFound(ctx, env.Client, nodeClaims[1], nodes[1])
2941+
})
28102942
It("can delete nodes, evicts pods without an ownerRef", func() {
28112943
// create our RS so we can link a pod to it
28122944
rs := test.ReplicaSet()
@@ -3443,7 +3575,7 @@ var _ = Describe("Consolidation", func() {
34433575
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
34443576
ExpectExists(ctx, env.Client, nodeClaims[0])
34453577
})
3446-
It("should not replace node if a pod schedules with karpenter.sh/do-not-disrupt during the TTL wait", func() {
3578+
It("should not replace node if a pod schedules with karpenter.sh/do-not-disrupt set to true during the TTL wait", func() {
34473579
pod := test.Pod()
34483580
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
34493581

@@ -3515,7 +3647,7 @@ var _ = Describe("Consolidation", func() {
35153647
},
35163648
)
35173649
})
3518-
It("should not delete node if pods schedule with karpenter.sh/do-not-disrupt during the TTL wait", func() {
3650+
It("should not delete node if pods schedule with karpenter.sh/do-not-disrupt set to true during the TTL wait", func() {
35193651
pods := test.Pods(2, test.PodOptions{})
35203652
ExpectApplied(ctx, env.Client, nodePool, nodeClaims[0], nodes[0], nodeClaims[1], nodes[1], pods[0], pods[1])
35213653

pkg/controllers/disruption/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func NewMethods(clk clock.Clock, cluster *state.Cluster, kubeClient client.Clien
103103
// Terminate and create replacement for drifted NodeClaims in Static NodePool
104104
NewStaticDrift(cluster, provisioner, cp),
105105
// Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule.
106-
NewDrift(kubeClient, cluster, provisioner, recorder),
106+
NewDrift(kubeClient, cluster, provisioner, recorder, clk),
107107
// Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn
108108
NewMultiNodeConsolidation(c),
109109
// And finally fall back our single NodeClaim consolidation to further reduce cluster cost.

pkg/controllers/disruption/drift.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sort"
2424

2525
"github.com/samber/lo"
26+
"k8s.io/utils/clock"
2627
"sigs.k8s.io/controller-runtime/pkg/client"
2728

2829
"sigs.k8s.io/karpenter/pkg/utils/pretty"
@@ -40,14 +41,16 @@ type Drift struct {
4041
cluster *state.Cluster
4142
provisioner *provisioning.Provisioner
4243
recorder events.Recorder
44+
clock clock.Clock
4345
}
4446

45-
func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder) *Drift {
47+
func NewDrift(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder, clk clock.Clock) *Drift {
4648
return &Drift{
4749
kubeClient: kubeClient,
4850
cluster: cluster,
4951
provisioner: provisioner,
5052
recorder: recorder,
53+
clock: clk,
5154
}
5255
}
5356

@@ -78,7 +81,7 @@ func (d *Drift) ComputeCommands(ctx context.Context, disruptionBudgetMapping map
7881
continue
7982
}
8083
// Check if we need to create any NodeClaims.
81-
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate)
84+
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, d.clock, d.recorder, candidate)
8285
if err != nil {
8386
// if a candidate is now deleting, just retry
8487
if errors.Is(err, errCandidateDeleting) {

0 commit comments

Comments
 (0)