diff --git a/src/coordination/azure/Akka.Coordination.Azure.Tests/LeaseActorSpec.cs b/src/coordination/azure/Akka.Coordination.Azure.Tests/LeaseActorSpec.cs index 532f1d203..f29e6fbff 100644 --- a/src/coordination/azure/Akka.Coordination.Azure.Tests/LeaseActorSpec.cs +++ b/src/coordination/azure/Akka.Coordination.Azure.Tests/LeaseActorSpec.cs @@ -327,18 +327,13 @@ public void HeartBeatFailShouldSetGrantedToFalse() { RunTest(() => { - var failure = new LeaseException("Failed to communicate with API server"); AcquireLease(); ExpectHeartBeat(); Granted.Value.Should().BeTrue(); - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(failure)); - AwaitAssert(() => - { - Granted.Value.Should().BeFalse(); - }); + // With retry logic, multiple failures are needed to exhaust the TTL window + HeartBeatFailure(); + Granted.Value.Should().BeFalse(); }); } @@ -356,9 +351,8 @@ public void HeartBeatFailShouldCallLeaseLostCallback() ExpectHeartBeat(); Granted.Value.Should().BeTrue(); - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(failure)); + // With retry logic, drive failures until TTL expires and callback fires + DriveHeartBeatFailures(failure); AwaitAssert(() => { callbackCalled.Should().Be(failure); @@ -366,6 +360,98 @@ public void HeartBeatFailShouldCallLeaseLostCallback() }); } + [Fact(DisplayName = "transient heartbeat failure should retry and stay granted")] + public void TransientHeartBeatFailureShouldRetryAndStayGranted() + { + RunTest(() => + { + AcquireLease(); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // First heartbeat fails transiently + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure"))); + + // Should still be granted — actor retries within TTL window + Granted.Value.Should().BeTrue(); + + // Retry heartbeat succeeds + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + // Should still be granted and heartbeating normally + Granted.Value.Should().BeTrue(); + + // Normal heartbeat continues + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + }); + } + + [Fact(DisplayName = "multiple transient heartbeat failures should recover on success")] + public void MultipleTransientHeartBeatFailuresShouldRecover() + { + RunTest(() => + { + AcquireLease(); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // Multiple consecutive failures, all within TTL window + for (var i = 0; i < 3; i++) + { + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException($"Transient failure {i}"))); + Granted.Value.Should().BeTrue(); + } + + // Recovery: next heartbeat succeeds + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + // Lease is still held + Granted.Value.Should().BeTrue(); + }); + } + + [Fact(DisplayName = "heartbeat failure should not call lease lost callback during retry")] + public void HeartBeatFailureShouldNotCallLeaseLostCallbackDuringRetry() + { + RunTest(() => + { + var callbackCalled = false; + AcquireLease(e => + { + callbackCalled = true; + }); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // Single transient failure within TTL + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure"))); + + // Callback should NOT be called — TTL still valid + callbackCalled.Should().BeFalse(); + Granted.Value.Should().BeTrue(); + + // Recovery + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + callbackCalled.Should().BeFalse(); + }); + } + [Fact(DisplayName = "lock should be acquire-able after heart beat conflict")] public void LockShouldAcquireAfterHeartBeatConflict() { @@ -840,15 +926,28 @@ protected void HeartBeatConflict() }); } + /// + /// Drives heartbeat failures until the TTL window expires and the actor surrenders the lease. + /// With the retry logic, the actor retries heartbeats within the TTL window before surrendering. + /// protected void HeartBeatFailure() { - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(new LeaseException("Failed to communicate with API server"))); - AwaitAssert(() => + DriveHeartBeatFailures(new LeaseException("Failed to communicate with API server")); + } + + /// + /// Keeps replying with the given failure to all heartbeat retry attempts + /// until the actor exhausts the TTL window and surrenders the lease. + /// + protected void DriveHeartBeatFailures(Exception failure) + { + while (Granted.Value) { - Granted.Value.Should().BeFalse(); - }); + UpdateProbe.ExpectMsg<(string, ETag)>(TimeSpan.FromSeconds(2)); + UpdateProbe.Reply(new Status.Failure(failure)); + Task.Delay(10).Wait(); + } + AwaitAssert(() => Granted.Value.Should().BeFalse()); } } } diff --git a/src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs b/src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs index d0af26f44..36e9f8306 100644 --- a/src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs +++ b/src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs @@ -109,19 +109,22 @@ public OperationInProgress( public sealed class GrantedVersion: IData { - public GrantedVersion(ETag version, Action leaseLostCallback) + public GrantedVersion(ETag version, Action leaseLostCallback, DateTime? lastSuccessfulHeartbeat = null) { Version = version; LeaseLostCallback = leaseLostCallback; + LastSuccessfulHeartbeat = lastSuccessfulHeartbeat ?? DateTime.UtcNow; } public ETag Version { get; } public Action LeaseLostCallback { get; } + public DateTime LastSuccessfulHeartbeat { get; } - public GrantedVersion Copy(ETag? version = null, Action? leaseLostCallback = null) + public GrantedVersion Copy(ETag? version = null, Action? leaseLostCallback = null, DateTime? lastSuccessfulHeartbeat = null) => new GrantedVersion( version: version ?? Version, - leaseLostCallback: leaseLostCallback ?? LeaseLostCallback); + leaseLostCallback: leaseLostCallback ?? LeaseLostCallback, + lastSuccessfulHeartbeat: lastSuccessfulHeartbeat ?? LastSuccessfulHeartbeat); } public interface ICommand { } @@ -398,7 +401,7 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At if(_log.IsDebugEnabled) _log.Debug("Heartbeat: lease time updated: Version {0}", resource.Value.Version); Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval); - return Stay().Using(gv.Copy(version: resource.Value.Version)); + return Stay().Using(gv.Copy(version: resource.Value.Version, lastSuccessfulHeartbeat: DateTime.UtcNow)); case WriteResponse {Response: Left resource}: _log.Warning("Conflict during heartbeat to lease {0}. Lease assumed to be released.", resource.Value); @@ -407,8 +410,18 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At return GoTo(Idle.Instance).Using(ReadRequired.Instance); case Status.Failure failure: - // FIXME, retry if timeout far enough off: https://github.com/lightbend/akka-commercial-addons/issues/501 - _log.Warning(failure.Cause, "Failure during heartbeat to lease. Lease assumed to be released."); + var timeSinceLastHeartbeat = DateTime.UtcNow - gv.LastSuccessfulHeartbeat; + if (timeSinceLastHeartbeat < _timeoutOffset - settings.TimeoutSettings.HeartbeatInterval) + { + _log.Warning(failure.Cause, + "Transient failure during heartbeat to lease {0}. TTL still valid ({1:F0}s remaining). Retrying.", + leaseName, + (_timeoutOffset - timeSinceLastHeartbeat).TotalSeconds); + Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval); + return Stay(); + } + _log.Warning(failure.Cause, + "Failure during heartbeat to lease {0}. TTL window expired, releasing lease.", leaseName); localGranted.GetAndSet(false); ExecuteLeaseLockCallback(leaseLost, failure.Cause); return GoTo(Idle.Instance).Using(ReadRequired.Instance); diff --git a/src/coordination/kubernetes/Akka.Coordination.KubernetesApi.Tests/LeaseActorSpec.cs b/src/coordination/kubernetes/Akka.Coordination.KubernetesApi.Tests/LeaseActorSpec.cs index 07163ca82..733e27910 100644 --- a/src/coordination/kubernetes/Akka.Coordination.KubernetesApi.Tests/LeaseActorSpec.cs +++ b/src/coordination/kubernetes/Akka.Coordination.KubernetesApi.Tests/LeaseActorSpec.cs @@ -325,18 +325,13 @@ public void HeartBeatFailShouldSetGrantedToFalse() { RunTest(() => { - var failure = new LeaseException("Failed to communicate with API server"); AcquireLease(); ExpectHeartBeat(); Granted.Value.Should().BeTrue(); - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(failure)); - AwaitAssert(() => - { - Granted.Value.Should().BeFalse(); - }); + // With retry logic, multiple failures are needed to exhaust the TTL window + HeartBeatFailure(); + Granted.Value.Should().BeFalse(); }); } @@ -354,9 +349,8 @@ public void HeartBeatFailShouldCallLeaseLostCallback() ExpectHeartBeat(); Granted.Value.Should().BeTrue(); - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(failure)); + // With retry logic, drive failures until TTL expires and callback fires + DriveHeartBeatFailures(failure); AwaitAssert(() => { callbackCalled.Should().Be(failure); @@ -364,6 +358,98 @@ public void HeartBeatFailShouldCallLeaseLostCallback() }); } + [Fact(DisplayName = "transient heartbeat failure should retry and stay granted")] + public void TransientHeartBeatFailureShouldRetryAndStayGranted() + { + RunTest(() => + { + AcquireLease(); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // First heartbeat fails transiently + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure"))); + + // Should still be granted - actor retries within TTL window + Granted.Value.Should().BeTrue(); + + // Retry heartbeat succeeds + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + // Should still be granted and heartbeating normally + Granted.Value.Should().BeTrue(); + + // Normal heartbeat continues + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + }); + } + + [Fact(DisplayName = "multiple transient heartbeat failures should recover on success")] + public void MultipleTransientHeartBeatFailuresShouldRecover() + { + RunTest(() => + { + AcquireLease(); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // Multiple consecutive failures, all within TTL window + for (var i = 0; i < 3; i++) + { + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException($"Transient failure {i}"))); + Granted.Value.Should().BeTrue(); + } + + // Recovery: next heartbeat succeeds + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + // Lease is still held + Granted.Value.Should().BeTrue(); + }); + } + + [Fact(DisplayName = "heartbeat failure should not call lease lost callback during retry")] + public void HeartBeatFailureShouldNotCallLeaseLostCallbackDuringRetry() + { + RunTest(() => + { + var callbackCalled = false; + AcquireLease(e => + { + callbackCalled = true; + }); + ExpectHeartBeat(); + Granted.Value.Should().BeTrue(); + + // Single transient failure within TTL + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure"))); + + // Callback should NOT be called - TTL still valid + callbackCalled.Should().BeFalse(); + Granted.Value.Should().BeTrue(); + + // Recovery + UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); + IncrementVersion(); + UpdateProbe.Reply( + new Right( + new LeaseResource(OwnerName, CurrentVersion, CurrentTime))); + + callbackCalled.Should().BeFalse(); + }); + } + [Fact(DisplayName = "lock should be acquire-able after heart beat conflict")] public void LockShouldAcquireAfterHeartBeatConflict() { @@ -840,13 +926,18 @@ protected void HeartBeatConflict() protected void HeartBeatFailure() { - UpdateProbe.ExpectMsg((OwnerName, CurrentVersion)); - IncrementVersion(); - UpdateProbe.Reply(new Status.Failure(new LeaseException("Failed to communicate with API server"))); - AwaitAssert(() => + DriveHeartBeatFailures(new LeaseException("Failed to communicate with API server")); + } + + protected void DriveHeartBeatFailures(Exception failure) + { + while (Granted.Value) { - Granted.Value.Should().BeFalse(); - }); + UpdateProbe.ExpectMsg<(string, string)>(TimeSpan.FromSeconds(2)); + UpdateProbe.Reply(new Status.Failure(failure)); + Task.Delay(10).Wait(); + } + AwaitAssert(() => Granted.Value.Should().BeFalse()); } } -} \ No newline at end of file +} diff --git a/src/coordination/kubernetes/Akka.Coordination.KubernetesApi/LeaseActor.cs b/src/coordination/kubernetes/Akka.Coordination.KubernetesApi/LeaseActor.cs index 1d8fb57ce..b35fcfec1 100644 --- a/src/coordination/kubernetes/Akka.Coordination.KubernetesApi/LeaseActor.cs +++ b/src/coordination/kubernetes/Akka.Coordination.KubernetesApi/LeaseActor.cs @@ -108,19 +108,22 @@ public OperationInProgress( public sealed class GrantedVersion: IData { - public GrantedVersion(string version, Action leaseLostCallback) + public GrantedVersion(string version, Action leaseLostCallback, DateTime? lastSuccessfulHeartbeat = null) { Version = version; LeaseLostCallback = leaseLostCallback; + LastSuccessfulHeartbeat = lastSuccessfulHeartbeat ?? DateTime.UtcNow; } public string Version { get; } public Action LeaseLostCallback { get; } + public DateTime LastSuccessfulHeartbeat { get; } - public GrantedVersion Copy(string? version = null, Action? leaseLostCallback = null) + public GrantedVersion Copy(string? version = null, Action? leaseLostCallback = null, DateTime? lastSuccessfulHeartbeat = null) => new GrantedVersion( version: version ?? Version, - leaseLostCallback: leaseLostCallback ?? LeaseLostCallback); + leaseLostCallback: leaseLostCallback ?? LeaseLostCallback, + lastSuccessfulHeartbeat: lastSuccessfulHeartbeat ?? LastSuccessfulHeartbeat); } public interface ICommand { } @@ -392,7 +395,7 @@ public LeaseActor(IKubernetesApi client, LeaseSettings settings, string leaseNam throw new LeaseException($"response from API server has different owner for success: {resource}"); _log.Debug("Heartbeat: lease time updated: Version {0}", resource.Value.Version); Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval); - return Stay().Using(gv.Copy(version: resource.Value.Version)); + return Stay().Using(gv.Copy(version: resource.Value.Version, lastSuccessfulHeartbeat: DateTime.UtcNow)); case WriteResponse {Response: Left resource}: _log.Warning("Conflict during heartbeat to lease {0}. Lease assumed to be released.", resource.Value); @@ -401,8 +404,19 @@ public LeaseActor(IKubernetesApi client, LeaseSettings settings, string leaseNam return GoTo(Idle.Instance).Using(ReadRequired.Instance); case Status.Failure failure: - // FIXME, retry if timeout far enough off: https://github.com/lightbend/akka-commercial-addons/issues/501 - _log.Warning(failure.Cause, "Failure during heartbeat to lease. Lease assumed to be released."); + var timeSinceLastHeartbeat = DateTime.UtcNow - gv.LastSuccessfulHeartbeat; + var retryWindow = _heartbeatTimeout - _heartbeatOffset - settings.TimeoutSettings.HeartbeatInterval; + if (timeSinceLastHeartbeat < retryWindow) + { + _log.Warning(failure.Cause, + "Transient failure during heartbeat to lease {0}. TTL still valid ({1:F0}s remaining). Retrying.", + leaseName, + (retryWindow - timeSinceLastHeartbeat).TotalSeconds); + Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval); + return Stay(); + } + _log.Warning(failure.Cause, + "Failure during heartbeat to lease {0}. TTL window expired, releasing lease.", leaseName); localGranted.GetAndSet(false); ExecuteLeaseLockCallback(leaseLost, failure.Cause); return GoTo(Idle.Instance).Using(ReadRequired.Instance);