Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version"
AnnotationInstanceTagged = apis.Group + "/tagged"
AnnotationInstanceProfile = apis.Group + "/instance-profile-name"
AnnotationInstanceInterrupted = apis.Group + "/instance-interrupted"

NodeClaimTagKey = coreapis.Group + "/nodeclaim"
NameTagKey = "Name"
Expand Down
27 changes: 27 additions & 0 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/utils"

Expand Down Expand Up @@ -249,6 +250,19 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)
if id := nodeClaim.Labels[cloudprovider.ReservationIDLabel]; id != "" && cloudprovider.IsNodeClaimNotFoundError(err) {
c.capacityReservationProvider.MarkTerminated(id)
}

if cloudprovider.IsNodeClaimNotFoundError(err) && isInterruptibleInstance(nodeClaim) {
capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey]
instanceInterrupted := nodeClaim.Annotations[v1.AnnotationInstanceInterrupted]
if instanceInterrupted == "" {
log.FromContext(ctx).Info("detected instance termination without interruption notification",
"capacity-type", capacityType)
interruption.MissedInterruptionTerminations.Inc(map[string]string{
"capacity_type": capacityType,
})
}
}

return err
}

Expand Down Expand Up @@ -482,3 +496,16 @@ func newTerminatingNodeClassError(name string) *errors.StatusError {
err.ErrStatus.Message = fmt.Sprintf("%s %q is terminating, treating as not found", qualifiedResource.String(), name)
return err
}

func isInterruptibleInstance(nodeClaim *karpv1.NodeClaim) bool {
capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey]
if capacityType == karpv1.CapacityTypeSpot {
return true
}
if capacityType == karpv1.CapacityTypeReserved {
if interruptibleLabel, ok := nodeClaim.Labels[v1.LabelCapacityReservationInterruptible]; ok && interruptibleLabel == "true" {
return true
}
}
return false
}
41 changes: 41 additions & 0 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
Expand Down Expand Up @@ -1517,4 +1518,44 @@ var _ = Describe("CloudProvider", func() {
Entry("when the capacity reservation type is capacity-block", v1.CapacityReservationTypeCapacityBlock, false),
)
})
Context("Interruption Metric Tracking", func() {
It("should increment MissedInterruptionTerminations metric depending on interruption annotation", func() {
nodePool.Spec.Template.Spec.Requirements = []karpv1.NodeSelectorRequirementWithMinValues{
{
Key: karpv1.CapacityTypeLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{karpv1.CapacityTypeSpot},
},
}
nodeClass.Spec.Kubelet = &v1.KubeletConfiguration{
MaxPods: aws.Int32(1),
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
for range 2 {
pod := coretest.UnschedulablePod()
ExpectApplied(ctx, env.Client, pod)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectScheduled(ctx, env.Client, pod)
}
ncs := ExpectNodeClaims(ctx, env.Client)
Expect(ncs).To(HaveLen(2))

// add annotation to only 1 nodeclaim
ncs[0].Annotations = lo.Assign(ncs[0].Annotations, map[string]string{
v1.AnnotationInstanceInterrupted: "true",
})
ExpectApplied(ctx, env.Client, ncs[0])

// mock underlying instance as already terminated
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(&ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{},
})
err1 := cloudProvider.Delete(ctx, ncs[0])
err2 := cloudProvider.Delete(ctx, ncs[1])
Expect(corecloudprovider.IsNodeClaimNotFoundError(err1)).To(BeTrue())
Expect(corecloudprovider.IsNodeClaimNotFoundError(err2)).To(BeTrue())
// one nodeclaim did not have the interrupted annotation
ExpectMetricCounterValue(interruption.MissedInterruptionTerminations, 1, map[string]string{"capacity_type": karpv1.CapacityTypeSpot})
})
})
})
11 changes: 11 additions & 0 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/awslabs/operatorpkg/reconciler"
"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -241,6 +243,15 @@ func (c *Controller) handleNodeClaim(ctx context.Context, msg messages.Message,
}

if action != NoAction {
stored := nodeClaim.DeepCopy()
nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{
v1.AnnotationInstanceInterrupted: "true",
})
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return fmt.Errorf("annotating nodeclaim as interrupted, %w", err)
}
}
return c.deleteNodeClaim(ctx, msg, nodeClaim, node)
}
return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/interruption/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ const (
)

var (
MissedInterruptionTerminations = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: interruptionSubsystem,
Name: "missed_termination_total",
Help: "Count of instance terminations that were not notified via SQS interruption messages",
},
[]string{metrics.CapacityTypeLabel},
)
ReceivedMessages = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Expand Down
4 changes: 4 additions & 0 deletions website/content/en/preview/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ Amount of time an interruption message is on the queue before it is processed by
Count of messages deleted from the SQS queue.
- Stability Level: STABLE

### `karpenter_interruption_missed_termination_total`
Count of instance terminations that were not notified via SQS interruption messages
- Stability Level: STABLE

## Cluster Metrics

### `karpenter_cluster_utilization_percent`
Expand Down