Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ EOF

```bash
export PEER_IMAGE=hyperledger/fabric-peer
export PEER_VERSION=3.0.0
export PEER_VERSION=3.1.0

export ORDERER_IMAGE=hyperledger/fabric-orderer
export ORDERER_VERSION=3.0.0
export ORDERER_VERSION=3.1.0

export CA_IMAGE=hyperledger/fabric-ca
export CA_VERSION=1.5.13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,8 @@ spec:
properties:
adminPort:
type: integer
certRenewalLeaseHeld:
type: boolean
conditions:
items:
properties:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/hlf.kungfusoftware.es_fabricpeers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,8 @@ spec:
type: object
status:
properties:
certRenewalLeaseHeld:
type: boolean
conditions:
items:
properties:
Expand Down
92 changes: 64 additions & 28 deletions controllers/ordnode/ordnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
"helm.sh/helm/v3/pkg/release"
"os"
"reflect"
"sort"
"strings"
"time"

"helm.sh/helm/v3/pkg/release"

"github.com/go-logr/logr"
"github.com/kfsoftware/hlf-operator/controllers/certs"
"github.com/kfsoftware/hlf-operator/controllers/hlfmetrics"
Expand Down Expand Up @@ -225,13 +226,58 @@ func (r *FabricOrdererNodeReconciler) Reconcile(ctx context.Context, req ctrl.Re
}
requeueAfter := time.Second * 10
log.Infof("Last time certs were updated: %v, they need to be renewed: %v", lastTimeCertsRenewed, certificatesNeedToBeRenewed)

// --- RELEASE LEASE IF HELD AND STATUS IS RUNNING ---
if fabricOrdererNode.Status.CertRenewalLeaseHeld && fabricOrdererNode.Status.Status == hlfv1alpha1.RunningStatus {
leaseName := "orderernode-cert-renewal-global-lock"
holderIdentity := os.Getenv("POD_NAME")
if holderIdentity == "" {
holderIdentity = fmt.Sprintf("orderernode-%s-lock", fabricOrdererNode.Name)
}
err := utils.ReleaseLease(ctx, clientSet, leaseName, ns, holderIdentity)
if err != nil {
log.Warnf("Error releasing lease: %v", err)
} else {
log.Infof("Released cert renewal lease for %s", fabricOrdererNode.Name)
}
fabricOrdererNode.Status.CertRenewalLeaseHeld = false
if err := r.Status().Update(ctx, fabricOrdererNode); err != nil {
log.Errorf("Error updating status after releasing lease: %v", err)
}
}

if certificatesNeedToBeRenewed {
// Lease-based lock for cert renewal (global lock)
leaseName := "orderernode-cert-renewal-global-lock"
holderIdentity := os.Getenv("POD_NAME")
if holderIdentity == "" {
holderIdentity = fmt.Sprintf("orderernode-%s-lock", fabricOrdererNode.Name)
}
leaseTTL := int32(120)
acquired := false
for i := 0; i < 5; i++ { // try for ~5 seconds
ok, err := utils.AcquireLease(ctx, clientSet, leaseName, ns, holderIdentity, leaseTTL)
if err != nil {
log.Warnf("Error acquiring lease: %v", err)
}
if ok {
acquired = true
break
}
time.Sleep(time.Second)
}
if !acquired {
log.Warnf("Could not acquire cert renewal lock for %s, skipping renewal", fabricOrdererNode.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// Set lease held flag
fabricOrdererNode.Status.CertRenewalLeaseHeld = true
if err := r.Status().Update(ctx, fabricOrdererNode); err != nil {
log.Errorf("Error updating status after acquiring lease: %v", err)
}
// must update the certificates and block until it's done
// scale down to zero replicas
// wait for the deployment to scale down
// update the certs
// scale up the peer
log.Infof("Trying to upgrade certs")
log.Infof("Trying to upgrade certs (lease acquired)")
r.setConditionStatus(ctx, fabricOrdererNode, hlfv1alpha1.UpdatingCertificates, false, nil, false)
err := r.updateCerts(req, fabricOrdererNode, clientSet, releaseName, ctx, cfg, ns)
if err != nil {
log.Errorf("Error renewing certs: %v", err)
Expand All @@ -247,7 +293,7 @@ func (r *FabricOrdererNodeReconciler) Reconcile(ctx context.Context, req ctrl.Re
if err != nil {
return ctrl.Result{}, err
}
err = r.upgradeChart(cfg, err, ns, releaseName, c)
err = r.upgradeChartWithWait(cfg, err, ns, releaseName, c, false, 5*time.Minute)
if err != nil {
r.setConditionStatus(ctx, fabricOrdererNode, hlfv1alpha1.FailedStatus, false, err, false)
return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricOrdererNode)
Expand Down Expand Up @@ -449,35 +495,25 @@ func (r *FabricOrdererNodeReconciler) updateCerts(req ctrl.Request, node *hlfv1a
log.Errorf("Error getting the config: %v", err)
return errors.Wrapf(err, "Error getting the config: %v", err)
}
//config.Replicas = 0
err = r.upgradeChart(cfg, err, ns, releaseName, config)
// Force Wait=true and Timeout=5m for cert renewal
wait := true
timeout := 5 * time.Minute
err = r.upgradeChartWithWait(cfg, err, ns, releaseName, config, wait, timeout)
if err != nil {
return errors.Wrapf(err, "Error upgrading the chart: %v", err)
}
dep, err := GetOrdererDeployment(
cfg,
r.Config,
releaseName,
req.Namespace,
)
if err != nil {
return errors.Wrapf(err, "Error getting the deployment: %v", err)
}
err = restartDeployment(
r.Config,
dep,
)
if err != nil {
return errors.Wrapf(err, "Error restarting the deployment: %v", err)
}
return nil
}
func (r *FabricOrdererNodeReconciler) upgradeChart(

// upgradeChartWithWait is like upgradeChart but allows overriding Wait/Timeout
func (r *FabricOrdererNodeReconciler) upgradeChartWithWait(
cfg *action.Configuration,
err error,
ns string,
releaseName string,
c *fabricOrdChart,
wait bool,
timeout time.Duration,
) error {
inrec, err := json.Marshal(c)
if err != nil {
Expand All @@ -502,8 +538,8 @@ func (r *FabricOrdererNodeReconciler) upgradeChart(
if err != nil {
return err
}
cmd.Wait = r.Wait
cmd.Timeout = r.Timeout
cmd.Wait = wait
cmd.Timeout = timeout
cmd.MaxHistory = r.MaxHistory

release, err := cmd.Run(releaseName, ch, inInterface)
Expand Down
71 changes: 50 additions & 21 deletions controllers/peer/peer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,58 @@ func (r *FabricPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
requeueAfter := time.Second * 10
log.Infof("Peer: Last time certs were updated: %v, they need to be renewed: %v", lastTimeCertsRenewed, certificatesNeedToBeRenewed)

// --- RELEASE LEASE IF HELD AND STATUS IS RUNNING ---
if fabricPeer.Status.CertRenewalLeaseHeld && fabricPeer.Status.Status == hlfv1alpha1.RunningStatus {
leaseName := "peer-cert-renewal-global-lock"
holderIdentity := os.Getenv("POD_NAME")
if holderIdentity == "" {
holderIdentity = fmt.Sprintf("peer-%s-lock", fabricPeer.Name)
}
err := utils.ReleaseLease(ctx, clientSet, leaseName, ns, holderIdentity)
if err != nil {
log.Warnf("Error releasing lease: %v", err)
} else {
log.Infof("Released cert renewal lease for %s", fabricPeer.Name)
}
fabricPeer.Status.CertRenewalLeaseHeld = false
if err := r.Status().Update(ctx, fabricPeer); err != nil {
log.Errorf("Error updating status after releasing lease: %v", err)
}
}

if certificatesNeedToBeRenewed {
// Lease-based lock for cert renewal (global lock)
leaseName := "peer-cert-renewal-global-lock"
holderIdentity := os.Getenv("POD_NAME")
if holderIdentity == "" {
holderIdentity = fmt.Sprintf("peer-%s-lock", fabricPeer.Name)
}
leaseTTL := int32(120)
acquired := false
for i := 0; i < 5; i++ { // try for ~5 seconds
ok, err := utils.AcquireLease(ctx, clientSet, leaseName, ns, holderIdentity, leaseTTL)
if err != nil {
log.Warnf("Error acquiring lease: %v", err)
}
if ok {
acquired = true
break
}
time.Sleep(time.Second)
}
if !acquired {
log.Warnf("Could not acquire cert renewal lock for %s, skipping renewal", fabricPeer.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// Set lease held flag
fabricPeer.Status.CertRenewalLeaseHeld = true
if err := r.Status().Update(ctx, fabricPeer); err != nil {
log.Errorf("Error updating status after acquiring lease: %v", err)
}
// must update the certificates and block until it's done
// scale down to zero replicas
// wait for the deployment to scale down
// update the certs
// scale up the peer
log.Infof("Trying to upgrade certs")
log.Infof("Trying to upgrade certs (lease acquired)")
r.setConditionStatus(ctx, fabricPeer, hlfv1alpha1.UpdatingCertificates, false, nil, false)
err := r.updateCerts(req, fabricPeer, clientSet, releaseName, svc, ctx, cfg, ns)
if err != nil {
log.Errorf("Error renewing certs: %v", err)
Expand Down Expand Up @@ -616,22 +661,6 @@ func (r *FabricPeerReconciler) updateCerts(req ctrl.Request, fPeer *hlfv1alpha1.
if err != nil {
return err
}
dep, err := GetPeerDeployment(
cfg,
r.Config,
releaseName,
req.Namespace,
)
if err != nil {
return err
}
err = restartDeployment(
r.Config,
dep,
)
if err != nil {
return err
}
return nil
}

Expand Down
87 changes: 87 additions & 0 deletions controllers/utils/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package utils

import (
"context"
"fmt"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// AcquireLease tries to acquire a Lease for distributed locking. Returns true if lock acquired, false if not.
func AcquireLease(ctx context.Context, clientset *kubernetes.Clientset, leaseName, namespace, holderIdentity string, ttlSeconds int32) (bool, error) {
leases := clientset.CoordinationV1().Leases(namespace)
lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
if err != nil {
// If not found, create it
lease = &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: leaseName,
Namespace: namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &holderIdentity,
AcquireTime: &metav1.MicroTime{Time: time.Now()},
RenewTime: &metav1.MicroTime{Time: time.Now()},
LeaseDurationSeconds: &ttlSeconds,
},
}
_, err := leases.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
return false, fmt.Errorf("failed to create lease: %w", err)
}
return true, nil
}
// If Lease exists, check if expired or held by us
if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" || leaseExpired(lease) {
lease.Spec.HolderIdentity = &holderIdentity
now := metav1.MicroTime{Time: time.Now()}
lease.Spec.AcquireTime = &now
lease.Spec.RenewTime = &now
lease.Spec.LeaseDurationSeconds = &ttlSeconds
_, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return false, fmt.Errorf("failed to update lease: %w", err)
}
return true, nil
}
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holderIdentity {
// Already held by us, renew
now := metav1.MicroTime{Time: time.Now()}
lease.Spec.RenewTime = &now
_, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return false, fmt.Errorf("failed to renew lease: %w", err)
}
return true, nil
}
// Held by someone else and not expired
return false, nil
}

func leaseExpired(lease *coordinationv1.Lease) bool {
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return true
}
expiry := lease.Spec.RenewTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)
return time.Now().After(expiry)
}

// ReleaseLease releases the Lease if held by holderIdentity
func ReleaseLease(ctx context.Context, clientset *kubernetes.Clientset, leaseName, namespace, holderIdentity string) error {
leases := clientset.CoordinationV1().Leases(namespace)
lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
if err != nil {
return nil // Already gone
}
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holderIdentity {
// Remove holder
empty := ""
lease.Spec.HolderIdentity = &empty
_, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
return err
}
return nil
}
6 changes: 4 additions & 2 deletions pkg/apis/hlf.kungfusoftware.es/v1alpha1/hlf_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ type FabricPeerStatus struct {
SignCACert string `json:"signCaCert"`
// +optional
NodePort int `json:"port"`
// +optional
CertRenewalLeaseHeld bool `json:"certRenewalLeaseHeld,omitempty"`
}
type OrdererService struct {
// +kubebuilder:validation:Enum=NodePort;ClusterIP;LoadBalancer
Expand Down Expand Up @@ -713,6 +715,8 @@ type FabricOrdererNodeStatus struct {
NodePort int `json:"port"`
// +optional
Message string `json:"message"`
// +optional
CertRenewalLeaseHeld bool `json:"certRenewalLeaseHeld,omitempty"`
}

type Cors struct {
Expand Down Expand Up @@ -1183,8 +1187,6 @@ type FabricOrdererNodeList struct {
// +kubebuilder:resource:scope=Namespaced,shortName=ca,singular=ca
// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.status"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +k8s:openapi-gen=true

// FabricCA is the Schema for the hlfs API
Expand Down
Loading