Skip to content

Commit

Permalink
final final review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 21, 2025
1 parent 6e9426e commit 74040ed
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 72 deletions.
6 changes: 3 additions & 3 deletions designs/capacity-reservations.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ Karpenter doesn't currently support reasoning about this capacity type. Karpente
3. Karpenter should add logic to its scheduler to reason about this availability as an `int` -- ensuring that the scheduler never schedules more offerings of an instance type for a capacity type than are available
4. Karpenter should extend its CloudProvider [InstanceType](https://github.com/kubernetes-sigs/karpenter/blob/35d6197e38e64cd6abfef71a082aee80e38d09fd/pkg/cloudprovider/types.go#L75) struct to allow offerings to represent availability of an offering as an `int` rather than a `bool` -- allowing Cloud Providers to represent the constrained capacity of `reserved`
5. Karpenter should consolidate between `on-demand` and/or `spot` instance types to `reserved` when the capacity type is available
6. Karpenter should introduce a feature flag `FEATURE_FLAG=CapacityReservations` to gate this new feature in `ALPHA` when it's introduced
6. Karpenter should introduce a feature flag `FEATURE_FLAG=ReservedCapacity` to gate this new feature in `ALPHA` when it's introduced

### `karpenter.sh/capacity-type` API

_Note: Some excerpts taken from [`aws/karpenter-provider-aws` RFC](https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md#nodepool-api)._

This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.
This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.

_Note: This option requires any applications (pods) that are using node selection on `karpenter.sh/capacity-type: "on-demand"` to expand their selection to include `reserved` or to update it to perform a `NotIn` node affinity on `karpenter.sh/capacity-type: spot`_

Expand Down Expand Up @@ -140,4 +140,4 @@ In practice, this means that if a user has two capacity reservation offerings av

## Appendix

1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
10 changes: 9 additions & 1 deletion pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,16 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
func getCandidatePrices(candidates []*Candidate) (float64, error) {
var price float64
for _, c := range candidates {
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
compatibleOfferings := c.instanceType.Offerings.Compatible(reqs)
if len(compatibleOfferings) == 0 {
// It's expected that offerings may no longer exist for capacity reservations once a NodeClass stops selecting on
// them (or they are no longer considered for some other reason on by the cloudprovider). By definition though,
// reserved capacity is free. By modeling it as free, consolidation won't be able to succeed, but the node should be
// disrupted via drift regardless.
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeReserved) {
return 0.0, nil
}
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
price += compatibleOfferings.Cheapest().Price
Expand Down
132 changes: 88 additions & 44 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4415,40 +4415,90 @@ var _ = Describe("Consolidation", func() {
},
})
reservedNodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
})
It("can consolidate from one reserved offering to another", func() {
leastExpensiveReservationID := fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
cloudprovider.ReservationIDLabel,
corev1.NodeSelectorOpIn,
leastExpensiveReservationID,
))
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
Price: leastExpensiveOffering.Price / 1_000_000.0,
Available: true,
ReservationCapacity: 10,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
v1alpha1.LabelReservationID: leastExpensiveReservationID,
}),
})

// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
},
}})
ExpectApplied(ctx, env.Client, rs, pod, reservedNode, reservedNodeClaim, nodePool)

// bind pods to node
ExpectManualBinding(ctx, env.Client, pod, reservedNode)

// inform cluster state about nodes and nodeClaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{reservedNode}, []*v1.NodeClaim{reservedNodeClaim})

fakeClock.Step(10 * time.Minute)

// consolidation won't delete the old nodeclaim until the new nodeclaim is ready
var wg sync.WaitGroup
ExpectToWait(fakeClock, &wg)
ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
ExpectSingletonReconciled(ctx, disruptionController)
wg.Wait()

// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, reservedNodeClaim)

// should create a new nodeclaim as there is a cheaper one that can hold the pod
nodeClaims := ExpectNodeClaims(ctx, env.Client)
nodes := ExpectNodes(ctx, env.Client)
Expect(nodeClaims).To(HaveLen(1))
Expect(nodes).To(HaveLen(1))

Expect(nodeClaims[0].Name).ToNot(Equal(reservedNodeClaim.Name))

// We should have consolidated into the same instance type, just into reserved.
Expect(nodes[0].Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, leastExpensiveInstance.Name))
Expect(nodes[0].Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved))
Expect(nodes[0].Labels).To(HaveKeyWithValue(cloudprovider.ReservationIDLabel, leastExpensiveReservationID))

// and delete the old one
ExpectNotFound(ctx, env.Client, reservedNodeClaim, reservedNode)
})
DescribeTable(
"can replace node",
"can consolidate into reserved capacity for the same instance pool",
func(initialCapacityType string) {
nodeClaim = lo.Switch[string, *v1.NodeClaim](initialCapacityType).
Case(v1.CapacityTypeOnDemand, nodeClaim).
Case(v1.CapacityTypeSpot, spotNodeClaim).
Default(reservedNodeClaim)
node = lo.Switch[string, *corev1.Node](initialCapacityType).
Case(v1.CapacityTypeOnDemand, node).
Case(v1.CapacityTypeSpot, spotNode).
Default(reservedNode)

// If the capacity type is reserved, we will need a cheaper reserved instance to consolidat into
var leastExpensiveReservationID string
if initialCapacityType == v1.CapacityTypeReserved {
leastExpensiveReservationID = fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
cloudprovider.ReservationIDLabel,
corev1.NodeSelectorOpIn,
leastExpensiveReservationID,
))
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
Price: leastExpensiveOffering.Price / 1_000_000.0,
Available: true,
ReservationCapacity: 10,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
v1alpha1.LabelReservationID: leastExpensiveReservationID,
}),
})
if initialCapacityType == v1.CapacityTypeSpot {
nodeClaim = spotNodeClaim
node = spotNode
}

// create our RS so we can link a pod to it
Expand Down Expand Up @@ -4499,23 +4549,17 @@ var _ = Describe("Consolidation", func() {
Expect(nodes).To(HaveLen(1))

Expect(nodeClaims[0].Name).ToNot(Equal(nodeClaim.Name))
// If the original capacity type was OD or spot, we should be able to consolidate into the reserved offering of the
// same type.
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(corev1.LabelInstanceTypeStable)).To(BeTrue())
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(corev1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(Equal(initialCapacityType != v1.CapacityTypeReserved))
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(cloudprovider.ReservationIDLabel)).To(BeTrue())
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(cloudprovider.ReservationIDLabel).Any()).To(Equal(lo.Ternary(
initialCapacityType == v1.CapacityTypeReserved,
leastExpensiveReservationID,
mostExpensiveReservationID,
)))

// We should have consolidated into the same instance type, just into reserved.
Expect(nodes[0].Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, mostExpensiveInstance.Name))
Expect(nodes[0].Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved))
Expect(nodes[0].Labels).To(HaveKeyWithValue(cloudprovider.ReservationIDLabel, mostExpensiveReservationID))

// and delete the old one
ExpectNotFound(ctx, env.Client, nodeClaim, node)
},
Entry("on-demand", v1.CapacityTypeOnDemand),
Entry("spot", v1.CapacityTypeSpot),
Entry("reserved", v1.CapacityTypeReserved),
Entry("from on-demand", v1.CapacityTypeOnDemand),
Entry("from spot", v1.CapacityTypeSpot),
)
})
})
5 changes: 1 addition & 4 deletions pkg/controllers/nodeclaim/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,10 @@ var _ = Describe("Drift", func() {
}))
}

ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
})
// This is required to support cloudproviders dynamically updating the capacity type based on reservation expirations
It("should drift reserved nodeclaim if the capacity type label has been updated", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -568,7 +567,6 @@ var _ = Describe("Drift", func() {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
})
It("should drift reserved nodeclaims if there are no reserved offerings available for the nodepool", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -582,7 +580,6 @@ var _ = Describe("Drift", func() {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
})
It("should drift reserved nodeclaims if an offering with the reservation ID is no longer available for the nodepool", func() {
cp.Drifted = ""
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (n *NodeClaim) reserveOfferings(
instanceTypes []*cloudprovider.InstanceType,
nodeClaimRequirements scheduling.Requirements,
) (cloudprovider.Offerings, error) {
if !opts.FromContext(ctx).FeatureGates.CapacityReservations {
if !opts.FromContext(ctx).FeatureGates.ReservedCapacity {
return nil, nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3819,7 +3819,7 @@ var _ = Context("Scheduling", func() {
Price: fake.PriceFromResources(it.Capacity) / 100_000.0,
})
}
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
})
It("shouldn't fallback to on-demand or spot when compatible reserved offerings are available", func() {
// With the pessimistic nature of scheduling reservations, we'll only be able to provision one instance per loop if a
Expand Down Expand Up @@ -3864,6 +3864,8 @@ var _ = Context("Scheduling", func() {
return bindings.Get(p) == nil
})

// Finally, we schedule the final pod. Since both capacity reservations are now exhausted and their offerings are
// marked as unavailable, we will fall back to either OD or spot.
bindings = ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
Expect(len(bindings)).To(Equal(1))
node = lo.Values(bindings)[0].Node
Expand Down Expand Up @@ -3987,9 +3989,8 @@ var _ = Context("Scheduling", func() {
})
})

// Even though the pods schedule to separate NodePools, those NodePools share a capacity reservation for the
// selected instance type. Karpenter should successfully provision a reserved instance for one pod, but fail
// to provision anything for the second since it won't fallback to OD or spot.
// Since each pod can only schedule to one of the NodePools, and each NodePool has a distinct capacity reservation,
// we should be able to schedule both pods simultaneously despite them selecting on the same instance pool.
bindings := lo.Values(ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...))
Expect(len(bindings)).To(Equal(2))
for _, binding := range bindings {
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ type optionsKey struct{}
type FeatureGates struct {
inputStr string

SpotToSpotConsolidation bool
NodeRepair bool
CapacityReservations bool
ReservedCapacity bool
SpotToSpotConsolidation bool
}

// Options contains all CLI flags / env vars for karpenter-core. It adheres to the options.Injectable interface.
Expand Down Expand Up @@ -99,7 +99,7 @@ func (o *Options) AddFlags(fs *FlagSet) {
fs.StringVar(&o.LogErrorOutputPaths, "log-error-output-paths", env.WithDefaultString("LOG_ERROR_OUTPUT_PATHS", "stderr"), "Optional comma separated paths for logging error output")
fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.")
fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.")
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "CapacityReservations=false,NodeRepair=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: CapacityReservations, NodeRepair, and SpotToSpotConsolidation")
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "NodeRepair=false,ReservedCapacity=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: NodeRepair, ReservedCapacity, and SpotToSpotConsolidation")
}

func (o *Options) Parse(fs *FlagSet, args ...string) error {
Expand Down Expand Up @@ -140,8 +140,8 @@ func ParseFeatureGates(gateStr string) (FeatureGates, error) {
if val, ok := gateMap["SpotToSpotConsolidation"]; ok {
gates.SpotToSpotConsolidation = val
}
if val, ok := gateMap["CapacityReservations"]; ok {
gates.CapacityReservations = val
if val, ok := gateMap["ReservedCapacity"]; ok {
gates.ReservedCapacity = val
}

return gates, nil
Expand Down
Loading

0 comments on commit 74040ed

Please sign in to comment.