-
Notifications
You must be signed in to change notification settings - Fork 673
[Feature] Support recreate pods for RayCluster using RayClusterSpec.upgradeStrategy #4185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
710166a to
d261b0b
Compare
|
Hi @andrewsykim, I followed you previous comments to adding a spec.upgradeStrategy API to RayCluster. But for now. I'm concerned this approach may introduce some issues:
Maybe we can just add a feature gate instead of using spec.upgradeStrategy.type field in RayCluster to enable the recreate behavior. WDYT? |
Feature gates are used to gate features that are in early development and not ready for wider adoption, it shouldn't be used to change the behavior of RayCluster because it will eventually be on by default (and forced on). |
|
I think both of those concerns are valid, but I don't think this is a problem with separation of concerns as RayCluster is a building block for both RayService and RayJob. For those cases you mentioned, we should have validation to ensure RayCluster upgrade strategy cannot be set when used w/ RayJob and RayService |
05b8108 to
7109cf1
Compare
3d448e6 to
8bcce91
Compare
Signed-off-by: win5923 <[email protected]>
8bcce91 to
bf87764
Compare
c9d35b2 to
8d4c813
Compare
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- LGTM, but maybe we can add a follow-up to test scenario when change the ray's image version
cc @rueian to merge, thank you!
- I tested this manually between ray version 2.47.0, 2.49.2, and 2.52.0
# For examples with more realistic resource configuration, see
# ray-cluster.complete.large.yaml and
# ray-cluster.autoscaler.large.yaml.
# 2.47.0, 2.49.2, 2.52.0
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-kuberay
spec:
upgradeStrategy:
type: Recreate
rayVersion: '2.49.2' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# rayStartParams is optional with RayCluster CRD from KubeRay 1.4.0 or later but required in earlier versions.
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.49.2
resources:
limits:
cpu: 2
memory: 4G
requests:
cpu: 1
memory: 1G
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: workergroup
# rayStartParams is optional with RayCluster CRD from KubeRay 1.4.0 or later but required in earlier versions.
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray:2.49.2
resources:
limits:
cpu: 1
memory: 1G
requests:
cpu: 1
memory: 1G|
cc @win5923 to fix the test, maybe I accidentally make the CI fail |
Signed-off-by: win5923 <[email protected]>
Commented out the |
Signed-off-by: win5923 <[email protected]>
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @andrewsykim to take a look too if you have time
| } | ||
| } | ||
|
|
||
| func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is it necessary to have this wrapper function, or is it redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is for better readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really consider only the Pod template? I feel that most fields should actually be taken into account, with only a few exceptions in the worker group spec, such as ScaleStrategy, Suspend, Replicas, MinReplicas, MaxReplicas, and IdleTimeoutSeconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t have a strong preference since RayCluster is a custom resource.
For me, it’s a 51/49 decision, and I lean toward Rueian’s idea because those fields matter.
Also, in Kubernetes, both Deployments and StatefulSets primarily compare/check the Pod template.
source:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, most of the requirements raised by users at the moment are primarily related to image updates, which is why my starting point is at the Pod level rather than the RayCluster level.
But your point makes sense, there are many configurations that should trigger a Pod recreate (like HeadGroupSpec.Resources, RayClusterSpec.AutoscalerOptions). I think we could compare the entire RayClusterSpec directly and exclude certain settings (like rayStartParams, WorkerGroupSpec.Min/Max/Replicas, etc.). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can implement Rueian's advice, since RayCluster is a custom resource, therefore we have custom behavior make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and either we support which version, in the future, if we want to change to another version are all breaking change, so I would vote to Rueian's solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just talked with @win5923
we can use RayService's solution to achieve this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Future-Outlier, @rueian, @CheyuWu, @machichima
I followed the RayService approach to implement the UpgradeStrategy for RayCluster.
Currently, we compute a hash from the entire RayCluster.Spec (excluding these fields) and store it as an annotation on the head Pod.
During reconciliation, we only need to compare the hash on the head Pod to determine whether an upgrade is required. This allows us to avoid re-comparing the spec across all head and worker Pods, simplifying the upgrade detection logic and reducing unnecessary overhead.
|
cc @rueian to merge, thank you! |
Signed-off-by: win5923 <[email protected]>
3ef2fc8 to
40376cd
Compare
…ervice's solution Signed-off-by: win5923 <[email protected]>
40376cd to
4f7c460
Compare
| // If the KubeRay version has changed, skip recreation to avoid unnecessary pod recreation | ||
| if len(headPods.Items) == 1 { | ||
| headPod := headPods.Items[0] | ||
| podVersion := headPod.Annotations[utils.KubeRayVersion] | ||
| if podVersion != "" && podVersion != utils.KUBERAY_VERSION { | ||
| logger.Info("KubeRay version has changed, skipping pod recreation", "rayCluster", instance.Name) | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following #2320, add a new ray.io/kuberay-version annotation to the head Pod. This annotation is used to detect KubeRay version changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the KUBERAY_VERSION annotation (e.g., 1.5.0) is different from the KubeRay operator's KUBERAY_VERSION (e.g., 1.6.0), should we need to follow the RayService steps?
- The ray.io/hash-without-replicas-and-workers-to-delete annotation is updated.
- The KUBERAY_VERSION annotation is updated.
- After these updates, we can use the new ray.io/hash-without-replicas-and-workers-to-delete annotation to determine whether to trigger a zero downtime upgrade.
I’m asking this because RayService performs an Update on the active RayCluster, which can implicitly trigger a RayCluster recreation to achieve a zero-downtime upgrade. From the user’s perspective, the good thing is they are not required to manually recreate the RayService; the upgrade is handled transparently by the controller.
kuberay/ray-operator/controllers/ray/rayservice_controller.go
Lines 904 to 917 in dea5baa
| if shouldUpdateCluster(rayServiceInstance, activeRayCluster, true) { | |
| // TODO(kevin85421): We should not reconstruct the cluster to update it. This will cause issues if autoscaler is enabled. | |
| logger.Info("Updating the active RayCluster instance", "clusterName", activeRayCluster.Name) | |
| goalCluster, err := constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name, r.Scheme) | |
| if err != nil { | |
| return nil, nil, err | |
| } | |
| modifyRayCluster(ctx, activeRayCluster, goalCluster) | |
| if err = r.Update(ctx, activeRayCluster); err != nil { | |
| r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update the active RayCluster %s/%s: %v", activeRayCluster.Namespace, activeRayCluster.Name, err) | |
| return activeRayCluster, pendingRayCluster, err | |
| } | |
| r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated the active RayCluster %s/%s", activeRayCluster.Namespace, activeRayCluster.Name) | |
| } |
However, since RayCluster does not currently support zero-downtime upgrades, my approach is to avoid updating Pods when the KUBERAY_VERSION is different.
As a result, when the KubeRay operator is upgraded and a version mismatch is detected, the controller will not automatically trigger a RayCluster upgrade. Instead, users are required to manually delete and re-apply the RayCluster after upgrading the operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can simply just update the pod annotations?
// If the KubeRay version has changed, skip recreation to avoid unnecessary pod recreation
if len(headPods.Items) == 1 {
headPod := headPods.Items[0]
podVersion := headPod.Annotations[utils.KubeRayVersion]
if podVersion != "" && podVersion != utils.KUBERAY_VERSION {
logger.Info("KubeRay version has changed, skipping pod recreation", "rayCluster", instance.Name)
clusterHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(instance.Spec)
if err != nil {
logger.Error(err, "Failed to generate cluster spec hash for Recreate upgradeStrategy, skipping comparison")
return false
}
headPod.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] = clusterHash
headPod.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION
if err := r.Update(ctx, &headPod); err != nil {
logger.Error(err, "Failed to update head pod annotations after KubeRay version change", "pod", headPod.Name)
}
return false
}
}83c82bf to
643d7e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @win5923
Since RayService store the hash in the RayCluster's CR, can we store the hash in the RayCluster's CR instead of head pod?
|
Hi @Future-Outlier
RayService manages RayCluster CRs and already needs to Create/Update them, so setting the hash is free. But RayCluster manages Pods directly and doesn't normally update its own CR. If we store the hash in the RayCluster, this will make the hash update an "extra expensive operation". In contrast, storing the hash in Pod annotations is simpler: we write it once during pod creation, and it doesn't trigger reconciliation loops. |
Signed-off-by: win5923 <[email protected]>
643d7e7 to
fe87a41
Compare
Signed-off-by: Future-Outlier <[email protected]>
make sense to me, thank you! |
| // shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on RayClusterSpec changes | ||
| func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { | ||
| logger := ctrl.LoggerFrom(ctx) | ||
|
|
||
| if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil || *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate { | ||
| return false | ||
| } | ||
|
|
||
| headPods := corev1.PodList{} | ||
| if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { | ||
| logger.Error(err, "Failed to list head pods for upgrade check") | ||
| return false | ||
| } | ||
|
|
||
| // If the KubeRay version has changed, skip recreation to avoid unnecessary pod recreation | ||
| if len(headPods.Items) == 1 { | ||
| headPod := headPods.Items[0] | ||
| podVersion := headPod.Annotations[utils.KubeRayVersion] | ||
| if podVersion != "" && podVersion != utils.KUBERAY_VERSION { | ||
| logger.Info("KubeRay version has changed, skipping pod recreation", "rayCluster", instance.Name) | ||
| return false | ||
| } | ||
| } | ||
|
|
||
| expectedClusterHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(instance.Spec) | ||
| if err != nil { | ||
| logger.Error(err, "Failed to generate cluster spec hash for Recreate upgradeStrategy, skipping comparison") | ||
| return false | ||
| } | ||
|
|
||
| if len(headPods.Items) == 1 { | ||
| headPod := headPods.Items[0] | ||
| actualHash := headPod.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | ||
| if actualHash != "" && actualHash != expectedClusterHash { | ||
| logger.Info("RayCluster spec has changed, will recreate all pods", "rayCluster", instance.Name) | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| return false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on RayClusterSpec changes | |
| func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { | |
| logger := ctrl.LoggerFrom(ctx) | |
| if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil || *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate { | |
| return false | |
| } | |
| headPods := corev1.PodList{} | |
| if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { | |
| logger.Error(err, "Failed to list head pods for upgrade check") | |
| return false | |
| } | |
| // If the KubeRay version has changed, skip recreation to avoid unnecessary pod recreation | |
| if len(headPods.Items) == 1 { | |
| headPod := headPods.Items[0] | |
| podVersion := headPod.Annotations[utils.KubeRayVersion] | |
| if podVersion != "" && podVersion != utils.KUBERAY_VERSION { | |
| logger.Info("KubeRay version has changed, skipping pod recreation", "rayCluster", instance.Name) | |
| return false | |
| } | |
| } | |
| expectedClusterHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(instance.Spec) | |
| if err != nil { | |
| logger.Error(err, "Failed to generate cluster spec hash for Recreate upgradeStrategy, skipping comparison") | |
| return false | |
| } | |
| if len(headPods.Items) == 1 { | |
| headPod := headPods.Items[0] | |
| actualHash := headPod.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | |
| if actualHash != "" && actualHash != expectedClusterHash { | |
| logger.Info("RayCluster spec has changed, will recreate all pods", "rayCluster", instance.Name) | |
| return true | |
| } | |
| } | |
| return false | |
| } | |
| // shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on RayClusterSpec changes | |
| func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { | |
| logger := ctrl.LoggerFrom(ctx) | |
| if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil || *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate { | |
| return false | |
| } | |
| expectedClusterHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(instance.Spec) | |
| if err != nil { | |
| logger.Error(err, "Failed to generate cluster spec hash for Recreate upgradeStrategy, skipping comparison") | |
| return false | |
| } | |
| headPods := corev1.PodList{} | |
| if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { | |
| logger.Error(err, "Failed to list head pods for upgrade check") | |
| return false | |
| } | |
| // If the KubeRay version has changed, skip recreation to avoid unnecessary pod recreation | |
| if len(headPods.Items) == 1 { | |
| headPod := headPods.Items[0] | |
| podVersion := headPod.Annotations[utils.KubeRayVersion] | |
| if podVersion != "" && podVersion != utils.KUBERAY_VERSION { | |
| logger.Info("KubeRay version has changed, skipping pod recreation", "rayCluster", instance.Name) | |
| return false | |
| } | |
| actualHash := headPod.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | |
| if actualHash != "" && actualHash != expectedClusterHash { | |
| logger.Info("RayCluster spec has changed, will recreate all pods", "rayCluster", instance.Name) | |
| return true | |
| } | |
| } | |
| return false | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe len(headPods.Items) > 0 is better than len(headPods.Items) == 1
cc @andrewsykim to take a look, thank you! |
CheyuWu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Why are these changes needed?
Currently, when users update a RayCluster spec (e.g., update the image), users must re-create the cluster or manually set spec.suspend to true and after all Pods are deleted and then set it back to false which is not particularly convenient for users deploying with GitOps systems like ArgoCD.
Ref:
Design doc: https://docs.google.com/document/d/1xQLm0-WQWD-FkufxBJYklOJGvVn4RLk0_vPjLD5ax7o/edit?usp=sharing
Changes
spec.upgradeStrategyfield to RayCluster CRDRecreate: During upgrade, Recreate strategy will delete all existing pods before creating new ones.None: No new pod will be created while the strategy is set to NoneImplementation
- Store hash ofHeadGroupSpec.Templateto head pod andworkerGroup.Templateto worker pod annotations during creation withray.io/pod-template-hash- Compare stored hash with current head pod or worker pod template hash to detect changes and recreate all podsI only compare theHeadGroupSpec.TemplateandworkerGroup.Templatebecause these define the pod-related configurations. TheRayCluster.Speccontains many dynamic and component-specific settings (e.g., autoscaler configs, rayStartParams).Base on #4185 (comment), now we compute a hash from the entire RayCluster.Spec (excluding these fields) and store it as an annotation on the head Pod.
Example:
Related issue number
Closes #2534 #3905
Checks