Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 120 additions & 39 deletions pkg/healthcheck/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package healthcheck

import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/openshift/microshift/pkg/config"
Expand All @@ -25,25 +28,29 @@ type NamespaceWorkloads struct {
}

func waitForWorkloads(ctx context.Context, timeout time.Duration, workloads map[string]NamespaceWorkloads) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(config.DataDir, "resources", string(config.KubeAdmin), "kubeconfig"))
kubeconfigPath := filepath.Join(config.DataDir, "resources", string(config.KubeAdmin), "kubeconfig")
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return fmt.Errorf("failed to create restConfig: %v", err)
return fmt.Errorf("failed to load kubeconfig from %s: %v", kubeconfigPath, err)
}
client, err := appsclientv1.NewForConfig(rest.AddUserAgent(restConfig, "healthcheck"))
if err != nil {
return fmt.Errorf("failed to create client: %v", err)
return fmt.Errorf("unable to create Kubernetes client: %v", err)
}

interval := max(timeout/30, 1*time.Second)
klog.Infof("API Server will be queried every %v", interval)

aeg := &util.AllErrGroup{}
for ns, wls := range workloads {
for _, deploy := range wls.Deployments {
aeg.Go(func() error { return waitForDeployment(ctx, client, timeout, ns, deploy) })
aeg.Go(func() error { return waitForDeployment(ctx, client, timeout, interval, ns, deploy) })
}
for _, ds := range wls.DaemonSets {
aeg.Go(func() error { return waitForDaemonSet(ctx, client, timeout, ns, ds) })
aeg.Go(func() error { return waitForDaemonSet(ctx, client, timeout, interval, ns, ds) })
}
for _, sts := range wls.StatefulSets {
aeg.Go(func() error { return waitForStatefulSet(ctx, client, timeout, ns, sts) })
aeg.Go(func() error { return waitForStatefulSet(ctx, client, timeout, interval, ns, sts) })
}
}
errs := aeg.Wait()
Expand All @@ -54,60 +61,84 @@ func waitForWorkloads(ctx context.Context, timeout time.Duration, workloads map[
return nil
}

func waitForDaemonSet(ctx context.Context, client *appsclientv1.AppsV1Client, timeout time.Duration, namespace, name string) error {
func waitForDaemonSet(ctx context.Context, client *appsclientv1.AppsV1Client, timeout, interval time.Duration, namespace, name string) error {
klog.Infof("Waiting %v for daemonset/%s in %s", timeout, name, namespace)
err := wait.PollUntilContextTimeout(ctx, 10*time.Second, timeout, true, func(ctx context.Context) (done bool, err error) {
ds, err := client.DaemonSets(namespace).Get(ctx, name, v1.GetOptions{})
var lastHumanReadableErr error
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (done bool, err error) {
getctx, cancel := context.WithTimeout(ctx, interval/2)
defer cancel()

ds, err := client.DaemonSets(namespace).Get(getctx, name, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// Resources created by an operator might not exist yet.
// We allow for full timeout duration to be created and become ready.
// Always return 'false, nil' to keep retrying until timeout.

if commonErr := commonGetErrors(err); commonErr != nil {
lastHumanReadableErr = commonErr
return false, nil
}
if isDeadlineExceededError(err) {
return false, nil
}
klog.Errorf("Error getting daemonset/%s in %q: %v", name, namespace, err)
// Ignore errors, give chance until timeout

klog.Errorf("Unexpected error while getting daemonset %q in %q (ignoring): %v", name, namespace, err)
return false, nil
}
klog.V(3).Infof("Status of daemonset/%s in %s: %+v", name, namespace, ds.Status)
klog.V(3).Infof("Status of DaemonSet %q in %q: %+v", name, namespace, ds.Status)

// Borrowed and adjusted from k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go
if ds.Generation > ds.Status.ObservedGeneration {
lastHumanReadableErr = fmt.Errorf("daemonset is still being processed by the controller (generation %d > observed %d)", ds.Generation, ds.Status.ObservedGeneration)
return false, nil
}
if ds.Status.UpdatedNumberScheduled < ds.Status.DesiredNumberScheduled {
lastHumanReadableErr = fmt.Errorf("only %d of %d nodes have the updated daemonset pods", ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
return false, nil
}
if ds.Status.NumberAvailable < ds.Status.DesiredNumberScheduled {
lastHumanReadableErr = fmt.Errorf("only %d of %d daemonset pods are ready across all nodes", ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("Failed waiting for daemonset/%s in %s: %v", name, namespace, err)
if isDeadlineExceededError(err) {
klog.Errorf("DaemonSet %q in %q namespace didn't become ready in %v: %v", name, namespace, timeout, lastHumanReadableErr)
return fmt.Errorf("daemonset '%s' in namespace '%s' failed to become ready within %v. Last status: %v", name, namespace, timeout, lastHumanReadableErr)
}
klog.Errorf("Failed waiting for DaemonSet %q in namespace %q: %v", name, namespace, err)
return err
}
klog.Infof("Daemonset/%s in %s is ready", name, namespace)
klog.Infof("DaemonSet %q in namespace %q is ready", name, namespace)
return nil
}

func waitForDeployment(ctx context.Context, client *appsclientv1.AppsV1Client, timeout time.Duration, namespace, name string) error {
func waitForDeployment(ctx context.Context, client *appsclientv1.AppsV1Client, timeout, interval time.Duration, namespace, name string) error {
klog.Infof("Waiting %v for deployment/%s in %s", timeout, name, namespace)
err := wait.PollUntilContextTimeout(ctx, 10*time.Second, timeout, true, func(ctx context.Context) (done bool, err error) {
deployment, err := client.Deployments(namespace).Get(ctx, name, v1.GetOptions{})
var lastHumanReadableErr error
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (done bool, err error) {
getctx, cancel := context.WithTimeout(ctx, interval/2)
defer cancel()

deployment, err := client.Deployments(namespace).Get(getctx, name, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// Resources created by an operator might not exist yet.
// We allow for full timeout duration to be created and become ready.
// Always return 'false, nil' to keep retrying until timeout.

if commonErr := commonGetErrors(err); commonErr != nil {
lastHumanReadableErr = commonErr
return false, nil
}
if isDeadlineExceededError(err) {
return false, nil
}
klog.Errorf("Error getting deployment/%s in %q: %v", name, namespace, err)
// Ignore errors, give chance until timeout

klog.Errorf("Unexpected error while getting deployment %q in %q (ignoring): %v", name, namespace, err)
return false, nil
}
klog.V(3).Infof("Status of deployment/%s in %s: %+v", name, namespace, deployment.Status)
klog.V(3).Infof("Status of Deployment %q in %q: %+v", name, namespace, deployment.Status)

// Borrowed and adjusted from k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go
if deployment.Generation > deployment.Status.ObservedGeneration {
lastHumanReadableErr = fmt.Errorf("deployment is still being processed by the controller (generation %d > observed %d)", deployment.Generation, deployment.Status.ObservedGeneration)
return false, nil
}
// 'rollout status' command would check the 'Progressing' condition and if the reason is 'ProgressDeadlineExceeded',
Expand All @@ -117,64 +148,114 @@ func waitForDeployment(ctx context.Context, client *appsclientv1.AppsV1Client, t
// - we want to give full timeout duration for the Deployment to become ready, no early exits.

if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
lastHumanReadableErr = fmt.Errorf("only %d of %d pods have been updated with the latest configuration", deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas)
return false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
lastHumanReadableErr = fmt.Errorf("%d pods are still running the old configuration while %d are updated", deployment.Status.Replicas-deployment.Status.UpdatedReplicas, deployment.Status.UpdatedReplicas)
return false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
lastHumanReadableErr = fmt.Errorf("only %d of %d updated pods are ready", deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("Failed waiting for deployment/%s in %s: %v", name, namespace, err)
if isDeadlineExceededError(err) {
klog.Errorf("Deployment/%s in %s didn't become ready in %v: %v", name, namespace, timeout, lastHumanReadableErr)
return fmt.Errorf("deployment '%s' in namespace '%s' failed to become ready within %v. Last status: %v", name, namespace, timeout, lastHumanReadableErr)
}
klog.Errorf("Failed waiting for Deployment %q in namespace %q: %v", name, namespace, err)
return err
}
klog.Infof("Deployment/%s in %s is ready", name, namespace)
klog.Infof("Deployment %q in namespace %q is ready", name, namespace)
return nil
}

func waitForStatefulSet(ctx context.Context, client *appsclientv1.AppsV1Client, timeout time.Duration, namespace, name string) error {
func waitForStatefulSet(ctx context.Context, client *appsclientv1.AppsV1Client, timeout, interval time.Duration, namespace, name string) error {
klog.Infof("Waiting %v for statefulset/%s in %s", timeout, name, namespace)
err := wait.PollUntilContextTimeout(ctx, 10*time.Second, timeout, true, func(ctx context.Context) (done bool, err error) {
sts, err := client.StatefulSets(namespace).Get(ctx, name, v1.GetOptions{})
var lastHumanReadableErr error
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (done bool, err error) {
getctx, cancel := context.WithTimeout(ctx, interval/2)
defer cancel()

sts, err := client.StatefulSets(namespace).Get(getctx, name, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// Resources created by an operator might not exist yet.
// We allow for full timeout duration to be created and become ready.
// Always return 'false, nil' to keep retrying until timeout.

if commonErr := commonGetErrors(err); commonErr != nil {
lastHumanReadableErr = commonErr
return false, nil
}
if isDeadlineExceededError(err) {
return false, nil
}
klog.Errorf("Error getting statefulset/%s in %s: %v", name, namespace, err)
// Ignore errors, give chance until timeout

klog.Errorf("Unexpected error while getting statefulset %q in %q (ignoring): %v", name, namespace, err)
return false, nil
}
klog.V(3).Infof("Status of statefulset/%s in %s: %+v", name, namespace, sts.Status)
klog.V(3).Infof("Status of StatefulSet %q in %q: %+v", name, namespace, sts.Status)

// Borrowed and adjusted from k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
lastHumanReadableErr = fmt.Errorf("statefulset is still being processed by the controller (generation %d > observed %d)", sts.Generation, sts.Status.ObservedGeneration)
return false, nil
}
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
lastHumanReadableErr = fmt.Errorf("only %d of %d replicas are ready", sts.Status.ReadyReplicas, *sts.Spec.Replicas)
return false, nil
}
if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
lastHumanReadableErr = fmt.Errorf("only %d of %d replicas have been updated (partition: %d)", sts.Status.UpdatedReplicas, *sts.Spec.Replicas, *sts.Spec.UpdateStrategy.RollingUpdate.Partition)
return false, nil
}
}
return true, nil
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
lastHumanReadableErr = fmt.Errorf("update revision (%s) differs from current revision (%s)", sts.Status.UpdateRevision, sts.Status.CurrentRevision)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("Failed waiting for statefulset/%s in %s: %v", name, namespace, err)
if isDeadlineExceededError(err) {
klog.Errorf("Statefulset/%s in %s didn't become ready in %v: %v", name, namespace, timeout, lastHumanReadableErr)
return fmt.Errorf("statefulset '%s' in namespace '%s' failed to become ready within %v. Last status: %v", name, namespace, timeout, lastHumanReadableErr)
}
klog.Errorf("Failed waiting for StatefulSet %q in namespace %q: %v", name, namespace, err)
return err
}
klog.Infof("StatefulSet/%s in %s is ready", name, namespace)
klog.Infof("StatefulSet %q in namespace %q is ready", name, namespace)
return nil
}

func isDeadlineExceededError(err error) bool {
if strings.Contains(err.Error(), "would exceed context deadline") {
return true
}

// 'client rate limiter Wait returned an error: context deadline exceeded' -> drop the wrapping errors
if errors.Is(err, context.DeadlineExceeded) {
return true
}

return false
}

func commonGetErrors(err error) error {
if apierrors.IsNotFound(err) {
// Resources created by an operator might not exist yet.
// We allow for full timeout duration to be created and become ready.
return fmt.Errorf("resource does not exist yet")
}

if errors.Is(err, syscall.ECONNREFUSED) {
return fmt.Errorf("cannot connect to API server")
}

return nil
}