Skip to content

Commit

Permalink
Remediate unhealthy MachinePool machines
Browse files Browse the repository at this point in the history
  • Loading branch information
AndiDog committed Nov 7, 2024
1 parent 2bb2099 commit a7fc58f
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 23 deletions.
34 changes: 34 additions & 0 deletions config/crd/bases/cluster.x-k8s.io_machinepools.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions exp/api/v1beta1/machinepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type MachinePoolSpec struct {
// failureDomains is the list of failure domains this MachinePool should be attached to.
// +optional
FailureDomains []string `json:"failureDomains,omitempty"`

// The strategy for replacing existing machines with
// new ones.
// +optional
Strategy *MachinePoolStrategy `json:"strategy,omitempty"`
}

// ANCHOR_END: MachinePoolSpec
Expand Down Expand Up @@ -161,6 +166,21 @@ type MachinePoolV1Beta2Status struct {

// ANCHOR_END: MachinePoolStatus

// ANCHOR: MachinePoolStrategy

// MachinePoolStrategy defines how to replace existing machines
// with new ones.
type MachinePoolStrategy struct {
// Remediation controls the strategy of remediating unhealthy machines
// as marked by a MachineHealthCheck. This only applies to infrastructure
// providers supporting "MachinePool Machines". For other providers,
// no remediation is done.
// +optional
Remediation *clusterv1.RemediationStrategy `json:"remediation,omitempty"`
}

// ANCHOR_END: MachinePoolStrategy

// MachinePoolPhase is a string representation of a MachinePool Phase.
//
// This type is a high-level indicator of the status of the MachinePool as it is provisioned,
Expand Down
25 changes: 25 additions & 0 deletions exp/api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

154 changes: 142 additions & 12 deletions exp/internal/controllers/machinepool_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"context"
"fmt"
"reflect"
"sort"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -44,6 +47,7 @@ import (
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/cluster-api/util/labels"
Expand Down Expand Up @@ -294,15 +298,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
// Get the nodeRefsMap from the cluster.
s.nodeRefMap, getNodeRefsErr = r.getNodeRefMap(ctx, clusterClient)

err = r.reconcileMachines(ctx, s, infraConfig)
res := ctrl.Result{}

reconcileMachinesRes, err := r.reconcileMachines(ctx, s, infraConfig)
res = util.LowestNonZeroResult(res, reconcileMachinesRes)

if err != nil || getNodeRefsErr != nil {
return ctrl.Result{}, kerrors.NewAggregate([]error{errors.Wrapf(err, "failed to reconcile Machines for MachinePool %s", klog.KObj(mp)), errors.Wrapf(getNodeRefsErr, "failed to get nodeRefs for MachinePool %s", klog.KObj(mp))})
}

if !mp.Status.InfrastructureReady {
log.Info("Infrastructure provider is not yet ready", infraConfig.GetKind(), klog.KObj(infraConfig))
return ctrl.Result{}, nil
return res, nil
}

var providerIDList []string
Expand All @@ -321,7 +328,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *

if len(providerIDList) == 0 && mp.Status.Replicas != 0 {
log.Info("Retrieved empty spec.providerIDList from infrastructure provider but status.replicas is not zero.", "replicas", mp.Status.Replicas)
return ctrl.Result{}, nil
return res, nil
}

if !reflect.DeepEqual(mp.Spec.ProviderIDList, providerIDList) {
Expand All @@ -331,7 +338,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
mp.Status.UnavailableReplicas = mp.Status.Replicas
}

return ctrl.Result{}, nil
return res, nil
}

// reconcileMachines reconciles Machines associated with a MachinePool.
Expand All @@ -341,18 +348,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
// infrastructure is created accordingly.
// Note: When supported by the cloud provider implementation of the MachinePool, machines will provide a means to interact
// with the corresponding infrastructure (e.g. delete a specific machine in case MachineHealthCheck detects it is unhealthy).
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, infraMachinePool *unstructured.Unstructured) error {
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, infraMachinePool *unstructured.Unstructured) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
mp := s.machinePool

var infraMachineKind string
if err := util.UnstructuredUnmarshalField(infraMachinePool, &infraMachineKind, "status", "infrastructureMachineKind"); err != nil {
if errors.Is(err, util.ErrUnstructuredFieldNotFound) {
log.V(4).Info("MachinePool Machines not supported, no infraMachineKind found")
return nil
return ctrl.Result{}, nil
}

return errors.Wrapf(err, "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s", klog.KObj(mp))
return ctrl.Result{}, errors.Wrapf(err, "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s", klog.KObj(mp))
}

infraMachineSelector := metav1.LabelSelector{
Expand All @@ -369,7 +376,7 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,
infraMachineList.SetAPIVersion(infraMachinePool.GetAPIVersion())
infraMachineList.SetKind(infraMachineKind + "List")
if err := r.Client.List(ctx, &infraMachineList, client.InNamespace(mp.Namespace), client.MatchingLabels(infraMachineSelector.MatchLabels)); err != nil {
return errors.Wrapf(err, "failed to list infra machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
return ctrl.Result{}, errors.Wrapf(err, "failed to list infra machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

// Add watcher for infraMachine, if there isn't one already; this will allow this controller to reconcile
Expand All @@ -380,21 +387,26 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,

// Add watcher for infraMachine, if there isn't one already.
if err := r.externalTracker.Watch(log, sampleInfraMachine, handler.EnqueueRequestsFromMapFunc(r.infraMachineToMachinePoolMapper)); err != nil {
return err
return ctrl.Result{}, err
}

// Get the list of machines managed by this controller, and align it with the infra machines managed by
// the InfraMachinePool controller.
machineList := &clusterv1.MachineList{}
if err := r.Client.List(ctx, machineList, client.InNamespace(mp.Namespace), client.MatchingLabels(infraMachineSelector.MatchLabels)); err != nil {
return err
return ctrl.Result{}, err
}

if err := r.createOrUpdateMachines(ctx, s, machineList.Items, infraMachineList.Items); err != nil {
return errors.Wrapf(err, "failed to create machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
return ctrl.Result{}, errors.Wrapf(err, "failed to create machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

return nil
res, err := r.reconcileUnhealthyMachines(ctx, s, machineList.Items)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to reconcile unhealthy machines for MachinePool %s", klog.KObj(mp))
}

return res, nil
}

// createOrUpdateMachines creates a MachinePool Machine for each infraMachine if it doesn't already exist and sets the owner reference and infraRef.
Expand Down Expand Up @@ -594,3 +606,121 @@ func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, c client.Clie

return nodeRefsMap, nil
}

func (r *MachinePoolReconciler) reconcileUnhealthyMachines(ctx context.Context, s *scope, machines []clusterv1.Machine) (ctrl.Result, error) {
if len(machines) == 0 {
return ctrl.Result{}, nil
}

log := ctrl.LoggerFrom(ctx)
mp := s.machinePool

machinesWithHealthCheck := slices.DeleteFunc(slices.Clone(machines), func(machine clusterv1.Machine) bool {
return !conditions.Has(&machine, clusterv1.MachineHealthCheckSucceededCondition)
})
if len(machinesWithHealthCheck) == 0 {
// This means there is no MachineHealthCheck selecting any machines
// of this machine pool. In this case, do not requeue so often,
// but still check regularly in case a MachineHealthCheck became
// deployed or activated. This long interval shouldn't be a problem
// at cluster creation, since newly-created nodes should anyway
// trigger MachinePool reconciliation as the infrastructure provider
// creates the InfraMachines.
log.V(4).Info("Skipping reconciliation of unhealthy MachinePool machines because there are no health-checked machines")
return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil
}

unhealthyMachines := slices.DeleteFunc(slices.Clone(machines), func(machine clusterv1.Machine) bool {
return !collections.IsUnhealthyAndOwnerRemediated(&machine)
})
log.V(4).Info("Reconciling unhealthy MachinePool machines", "unhealthyMachines", len(unhealthyMachines))

// Calculate how many in flight machines we should remediate.
// By default, we allow all machines to be remediated at the same time.
maxInFlight := len(unhealthyMachines)
if mp.Spec.Strategy != nil && mp.Spec.Strategy.Remediation != nil {
if mp.Spec.Strategy.Remediation.MaxInFlight != nil {
var err error
replicas := int(ptr.Deref(mp.Spec.Replicas, 1))
maxInFlight, err = intstr.GetScaledValueFromIntOrPercent(mp.Spec.Strategy.Remediation.MaxInFlight, replicas, true)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to calculate maxInFlight to remediate machines: %v", err)
}
log = log.WithValues("maxInFlight", maxInFlight, "replicas", replicas)
}
}

machinesToRemediate := make([]*clusterv1.Machine, 0, len(unhealthyMachines))
inFlight := 0
for _, m := range unhealthyMachines {
if !m.DeletionTimestamp.IsZero() {
if conditions.IsTrue(&m, clusterv1.MachineOwnerRemediatedCondition) {
// Machine has been remediated by this controller and still in flight.
inFlight++
}
continue
}
if conditions.IsFalse(&m, clusterv1.MachineOwnerRemediatedCondition) {
machinesToRemediate = append(machinesToRemediate, &m)
}
}
log = log.WithValues("inFlight", inFlight)

if len(machinesToRemediate) == 0 {
// There's a MachineHealthCheck monitoring the machines, but currently
// no action to be taken. A machine could require remediation at any
// time, so use a short interval until next reconciliation.
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

if inFlight >= maxInFlight {
log.V(3).Info("Remediation strategy is set, and maximum in flight has been reached", "machinesToBeRemediated", len(machinesToRemediate))

// Check soon again whether the already-remediating (= deleting) machines are gone
// so that more machines can be remediated
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

// Sort the machines from newest to oldest.
// We are trying to remediate machines failing to come up first because
// there is a chance that they are not hosting any workloads (minimize disruption).
sort.SliceStable(machinesToRemediate, func(i, j int) bool {
return machinesToRemediate[i].CreationTimestamp.After(machinesToRemediate[j].CreationTimestamp.Time)
})

haveMoreMachinesToRemediate := false
if len(machinesToRemediate) > (maxInFlight - inFlight) {
haveMoreMachinesToRemediate = true
log.V(5).Info("Remediation strategy is set, limiting in flight operations", "machinesToBeRemediated", len(machinesToRemediate))
machinesToRemediate = machinesToRemediate[:(maxInFlight - inFlight)]
}

// Remediate unhealthy machines by deleting them
var errs []error
for _, m := range machinesToRemediate {
log.Info("Deleting unhealthy Machine", "Machine", klog.KObj(m))
patch := client.MergeFrom(m.DeepCopy())
if err := r.Client.Delete(ctx, m); err != nil {
if apierrors.IsNotFound(err) {
continue
}
errs = append(errs, errors.Wrapf(err, "failed to delete Machine %s", klog.KObj(m)))
continue
}
conditions.MarkTrue(m, clusterv1.MachineOwnerRemediatedCondition)
if err := r.Client.Status().Patch(ctx, m, patch); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrapf(err, "failed to update status of Machine %s", klog.KObj(m)))
}
}

if len(errs) > 0 {
return ctrl.Result{}, errors.Wrapf(kerrors.NewAggregate(errs), "failed to delete unhealthy Machines")
}

if haveMoreMachinesToRemediate {
// More machines need remediation, so reconcile again sooner
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
Loading

0 comments on commit a7fc58f

Please sign in to comment.