Skip to content

Commit 05bf3eb

Browse files
committed
Address comments.
1 parent a15dfd2 commit 05bf3eb

File tree

3 files changed

+57
-106
lines changed

3 files changed

+57
-106
lines changed

api/v1beta1/temporalcluster_types.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ type ServiceSpec struct {
116116
// Number of desired replicas for the service. Default to 1.
117117
// +kubebuilder:validation:Minimum=1
118118
// +optional
119-
Replicas *int32 `json:"replicas,omitempty"`
119+
Replicas *int32 `json:"replicas"`
120120
// Compute Resources required by this service.
121121
// More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
122122
// +optional
@@ -159,12 +159,8 @@ func (s *ServiceSpec) IsAutoscalingEnabled() bool {
159159
func (s *ServiceSpec) GetEffectiveReplicas() *int32 {
160160
// If autoscaling is configured, respect the current Replicas value which may be
161161
// updated by HPA coordination logic, but ensure it doesn't go below MinReplicas
162-
if s.IsAutoscalingEnabled() && s.Autoscaling.MinReplicas != nil {
163-
if s.Replicas != nil {
164-
replicas := max(*s.Autoscaling.MinReplicas, *s.Replicas)
165-
return &replicas
166-
}
167-
return s.Autoscaling.MinReplicas
162+
if s.IsAutoscalingEnabled() {
163+
return nil
168164
}
169165
if s.Replicas != nil {
170166
return s.Replicas

controllers/temporalcluster_controller.go

Lines changed: 52 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -169,21 +169,38 @@ func (r *TemporalClusterReconciler) reconcileResources(ctx context.Context, temp
169169
return fmt.Errorf("can't compute configmap hash: %w", err)
170170
}
171171

172-
// Sync replicas from current Deployments (HPA decisions) into the spec
173-
if err := r.coordinateWithHPA(ctx, temporalCluster); err != nil {
172+
builders, err := r.resourceBuilders(temporalCluster, configHash)
173+
if err != nil {
174174
return err
175175
}
176176

177-
builders, err := r.resourceBuilders(temporalCluster, configHash)
177+
// Separate deployment builders from others to handle SSA for Deployments
178+
var deploymentBuilders []resource.Builder
179+
var otherBuilders []resource.Builder
180+
181+
for _, builder := range builders {
182+
if _, isDeployment := builder.(*base.DeploymentBuilder); isDeployment {
183+
deploymentBuilders = append(deploymentBuilders, builder)
184+
} else {
185+
otherBuilders = append(otherBuilders, builder)
186+
}
187+
}
188+
189+
// Reconcile non-deployment resources normally
190+
objects, err := r.Reconciler.ReconcileBuilders(ctx, temporalCluster, otherBuilders)
178191
if err != nil {
179192
return err
180193
}
181194

182-
objects, err := r.Reconciler.ReconcileBuilders(ctx, temporalCluster, builders)
195+
// Handle deployments with SSA
196+
deploymentObjects, err := r.reconcileDeploymentsWithSSA(ctx, deploymentBuilders)
183197
if err != nil {
184198
return err
185199
}
186200

201+
// Combine all objects
202+
objects = append(objects, deploymentObjects...)
203+
187204
statuses, err := status.ReconciledObjectsToServiceStatuses(temporalCluster, objects)
188205
if err != nil {
189206
return err
@@ -281,110 +298,48 @@ func (r *TemporalClusterReconciler) handleErrorWithRequeue(cluster *v1beta1.Temp
281298
return reconcile.Result{RequeueAfter: requeueAfter}, err
282299
}
283300

284-
// coordinateWithHPA synchronizes the TemporalCluster spec with HPA scaling decisions.
285-
// When HPA scales a deployment, this method updates the corresponding ServiceSpec.Replicas
286-
// to reflect the HPA's decision, preventing the controller from overriding HPA scaling.
287-
func (r *TemporalClusterReconciler) coordinateWithHPA(ctx context.Context, cluster *v1beta1.TemporalCluster) error {
288-
logger := log.FromContext(ctx)
289-
needsUpdate := false
290-
291-
// Map of service names to their deployment objects (live state)
292-
deployments := make(map[string]*appsv1.Deployment)
293-
deployList := &appsv1.DeploymentList{}
294-
if err := r.List(ctx, deployList,
295-
client.InNamespace(cluster.Namespace),
296-
client.MatchingFields{ownerKey: cluster.Name},
297-
); err != nil {
298-
return fmt.Errorf("failed to list deployments for HPA coordination: %w", err)
299-
}
300-
for i := range deployList.Items {
301-
deployment := &deployList.Items[i]
302-
// Extract service name from deployment name (remove cluster prefix)
303-
serviceName := deployment.Name
304-
if clusterPrefix := cluster.Name + "-"; len(serviceName) > len(clusterPrefix) {
305-
serviceName = serviceName[len(clusterPrefix):]
306-
}
307-
deployments[serviceName] = deployment
308-
}
309-
310-
// Check each service that has autoscaling enabled
311-
services := []primitives.ServiceName{
312-
primitives.FrontendService,
313-
primitives.HistoryService,
314-
primitives.MatchingService,
315-
primitives.WorkerService,
316-
primitives.InternalFrontendService,
317-
}
301+
func (r *TemporalClusterReconciler) reconcileDeploymentsWithSSA(ctx context.Context, builders []resource.Builder) ([]client.Object, error) {
302+
objects := make([]client.Object, 0)
318303

319-
for _, serviceName := range services {
320-
serviceSpec, err := cluster.Spec.Services.GetServiceSpec(serviceName)
321-
if err != nil || serviceSpec == nil || !serviceSpec.IsAutoscalingEnabled() {
322-
continue // Skip services without autoscaling
323-
}
304+
for _, builder := range builders {
305+
deploymentBuilder := builder.(*base.DeploymentBuilder)
324306

325-
deployment, exists := deployments[string(serviceName)]
326-
if !exists {
327-
continue // Skip if deployment not found
328-
}
307+
// Build the desired deployment
308+
desiredObj := deploymentBuilder.Build()
309+
desired := desiredObj.(*appsv1.Deployment)
329310

330-
// Check if HPA has scaled the deployment
331-
currentReplicas := deployment.Spec.Replicas
332-
if currentReplicas == nil {
333-
continue // Skip if deployment has no replica spec
311+
// Update the desired deployment with the current configuration
312+
if err := deploymentBuilder.Update(desired); err != nil {
313+
return nil, fmt.Errorf("failed to update deployment configuration: %w", err)
334314
}
335-
336-
specReplicas := serviceSpec.Replicas
337-
if specReplicas == nil || *specReplicas != *currentReplicas {
338-
// HPA has changed the deployment replicas, sync the spec
339-
// Update the service spec to match what HPA set
340-
if err := r.updateServiceReplicas(cluster, serviceName, *currentReplicas); err != nil {
341-
return fmt.Errorf("failed to update %s service replicas: %w", serviceName, err)
342-
}
343-
needsUpdate = true
315+
// Apply all Deployments with SSA for consistency
316+
if err := r.applyDeploymentWithSSA(ctx, desired); err != nil {
317+
return nil, fmt.Errorf("failed to apply deployment %s with SSA: %w", desired.Name, err)
344318
}
345-
}
346-
347-
// Update the cluster spec if any replicas were changed
348-
if needsUpdate {
349-
if err := r.Update(ctx, cluster); err != nil {
350-
return fmt.Errorf("failed to update cluster spec with HPA coordination: %w", err)
319+
// Fetch the current state of the deployment after apply
320+
current := &appsv1.Deployment{}
321+
if err := r.Get(ctx, client.ObjectKeyFromObject(desired), current); err != nil {
322+
return nil, fmt.Errorf("failed to get deployment %s after SSA: %w", desired.Name, err)
351323
}
352-
logger.Info("Updated TemporalCluster spec to coordinate with HPA scaling decisions")
324+
objects = append(objects, current)
353325
}
354326

355-
return nil
327+
return objects, nil
356328
}
357329

358-
// updateServiceReplicas updates the replica count for a specific service in the cluster spec.
359-
func (r *TemporalClusterReconciler) updateServiceReplicas(cluster *v1beta1.TemporalCluster, serviceName primitives.ServiceName, replicas int32) error {
360-
switch serviceName {
361-
case primitives.FrontendService:
362-
if cluster.Spec.Services.Frontend == nil {
363-
cluster.Spec.Services.Frontend = &v1beta1.ServiceSpec{}
364-
}
365-
cluster.Spec.Services.Frontend.Replicas = &replicas
366-
case primitives.HistoryService:
367-
if cluster.Spec.Services.History == nil {
368-
cluster.Spec.Services.History = &v1beta1.ServiceSpec{}
369-
}
370-
cluster.Spec.Services.History.Replicas = &replicas
371-
case primitives.MatchingService:
372-
if cluster.Spec.Services.Matching == nil {
373-
cluster.Spec.Services.Matching = &v1beta1.ServiceSpec{}
374-
}
375-
cluster.Spec.Services.Matching.Replicas = &replicas
376-
case primitives.WorkerService:
377-
if cluster.Spec.Services.Worker == nil {
378-
cluster.Spec.Services.Worker = &v1beta1.ServiceSpec{}
379-
}
380-
cluster.Spec.Services.Worker.Replicas = &replicas
381-
case primitives.InternalFrontendService:
382-
if cluster.Spec.Services.InternalFrontend == nil {
383-
cluster.Spec.Services.InternalFrontend = &v1beta1.InternalFrontendServiceSpec{}
330+
// applyDeploymentWithSSA applies a deployment using Server-Side Apply with field management.
331+
func (r *TemporalClusterReconciler) applyDeploymentWithSSA(ctx context.Context, desired *appsv1.Deployment) error {
332+
desired.TypeMeta = metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"}
333+
334+
// replicas must be omitted when HPA is enabled (builder should already do this)
335+
// if autoscalingEnabled { desired.Spec.Replicas = nil }
336+
337+
if err := r.Patch(ctx, desired, client.Apply, client.FieldOwner("temporal-operator")); err != nil {
338+
if apierrors.IsConflict(err) {
339+
// Only if you’re intentionally handing over ownership:
340+
return r.Patch(ctx, desired, client.Apply, client.FieldOwner("temporal-operator"), client.ForceOwnership)
384341
}
385-
cluster.Spec.Services.InternalFrontend.Replicas = &replicas
386-
default:
387-
return fmt.Errorf("unknown service name: %s", serviceName)
342+
return err
388343
}
389344
return nil
390345
}

internal/resource/base/hpa_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
// Licensed to Amesh Fernando under one or more contributor
1+
// Licensed to Alexandre VILAIN under one or more contributor
22
// license agreements. See the NOTICE file distributed with
33
// this work for additional information regarding copyright
4-
// ownership. Amesh Fernando licenses this file to you under
4+
// ownership. Alexandre VILAIN licenses this file to you under
55
// the Apache License, Version 2.0 (the "License"); you may
66
// not use this file except in compliance with the License.
77
// You may obtain a copy of the License at

0 commit comments

Comments
 (0)