Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 116 additions & 17 deletions src/coordination/azure/Akka.Coordination.Azure.Tests/LeaseActorSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,13 @@ public void HeartBeatFailShouldSetGrantedToFalse()
{
RunTest(() =>
{
var failure = new LeaseException("Failed to communicate with API server");
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(failure));
AwaitAssert(() =>
{
Granted.Value.Should().BeFalse();
});
// With retry logic, multiple failures are needed to exhaust the TTL window
HeartBeatFailure();
Granted.Value.Should().BeFalse();
});
}

Expand All @@ -356,16 +351,107 @@ public void HeartBeatFailShouldCallLeaseLostCallback()
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(failure));
// With retry logic, drive failures until TTL expires and callback fires
DriveHeartBeatFailures(failure);
AwaitAssert(() =>
{
callbackCalled.Should().Be(failure);
});
});
}

[Fact(DisplayName = "transient heartbeat failure should retry and stay granted")]
public void TransientHeartBeatFailureShouldRetryAndStayGranted()
{
RunTest(() =>
{
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// First heartbeat fails transiently
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure")));

// Should still be granted — actor retries within TTL window
Granted.Value.Should().BeTrue();

// Retry heartbeat succeeds
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

// Should still be granted and heartbeating normally
Granted.Value.Should().BeTrue();

// Normal heartbeat continues
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
});
}

[Fact(DisplayName = "multiple transient heartbeat failures should recover on success")]
public void MultipleTransientHeartBeatFailuresShouldRecover()
{
RunTest(() =>
{
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// Multiple consecutive failures, all within TTL window
for (var i = 0; i < 3; i++)
{
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException($"Transient failure {i}")));
Granted.Value.Should().BeTrue();
}

// Recovery: next heartbeat succeeds
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

// Lease is still held
Granted.Value.Should().BeTrue();
});
}

[Fact(DisplayName = "heartbeat failure should not call lease lost callback during retry")]
public void HeartBeatFailureShouldNotCallLeaseLostCallbackDuringRetry()
{
RunTest(() =>
{
var callbackCalled = false;
AcquireLease(e =>
{
callbackCalled = true;
});
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// Single transient failure within TTL
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure")));

// Callback should NOT be called — TTL still valid
callbackCalled.Should().BeFalse();
Granted.Value.Should().BeTrue();

// Recovery
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

callbackCalled.Should().BeFalse();
});
}

[Fact(DisplayName = "lock should be acquire-able after heart beat conflict")]
public void LockShouldAcquireAfterHeartBeatConflict()
{
Expand Down Expand Up @@ -840,15 +926,28 @@ protected void HeartBeatConflict()
});
}

/// <summary>
/// Drives heartbeat failures until the TTL window expires and the actor surrenders the lease.
/// With the retry logic, the actor retries heartbeats within the TTL window before surrendering.
/// </summary>
protected void HeartBeatFailure()
{
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(new LeaseException("Failed to communicate with API server")));
AwaitAssert(() =>
DriveHeartBeatFailures(new LeaseException("Failed to communicate with API server"));
}

/// <summary>
/// Keeps replying with the given failure to all heartbeat retry attempts
/// until the actor exhausts the TTL window and surrenders the lease.
/// </summary>
protected void DriveHeartBeatFailures(Exception failure)
{
while (Granted.Value)
{
Granted.Value.Should().BeFalse();
});
UpdateProbe.ExpectMsg<(string, ETag)>(TimeSpan.FromSeconds(2));
UpdateProbe.Reply(new Status.Failure(failure));
Task.Delay(10).Wait();
}
AwaitAssert(() => Granted.Value.Should().BeFalse());
}
}
}
25 changes: 19 additions & 6 deletions src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,22 @@ public OperationInProgress(

public sealed class GrantedVersion: IData
{
public GrantedVersion(ETag version, Action<Exception?> leaseLostCallback)
public GrantedVersion(ETag version, Action<Exception?> leaseLostCallback, DateTime? lastSuccessfulHeartbeat = null)
{
Version = version;
LeaseLostCallback = leaseLostCallback;
LastSuccessfulHeartbeat = lastSuccessfulHeartbeat ?? DateTime.UtcNow;
}

public ETag Version { get; }
public Action<Exception?> LeaseLostCallback { get; }
public DateTime LastSuccessfulHeartbeat { get; }

public GrantedVersion Copy(ETag? version = null, Action<Exception?>? leaseLostCallback = null)
public GrantedVersion Copy(ETag? version = null, Action<Exception?>? leaseLostCallback = null, DateTime? lastSuccessfulHeartbeat = null)
=> new GrantedVersion(
version: version ?? Version,
leaseLostCallback: leaseLostCallback ?? LeaseLostCallback);
leaseLostCallback: leaseLostCallback ?? LeaseLostCallback,
lastSuccessfulHeartbeat: lastSuccessfulHeartbeat ?? LastSuccessfulHeartbeat);
}

public interface ICommand { }
Expand Down Expand Up @@ -398,7 +401,7 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At
if(_log.IsDebugEnabled)
_log.Debug("Heartbeat: lease time updated: Version {0}", resource.Value.Version);
Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval);
return Stay().Using(gv.Copy(version: resource.Value.Version));
return Stay().Using(gv.Copy(version: resource.Value.Version, lastSuccessfulHeartbeat: DateTime.UtcNow));

case WriteResponse {Response: Left<LeaseResource, LeaseResource> resource}:
_log.Warning("Conflict during heartbeat to lease {0}. Lease assumed to be released.", resource.Value);
Expand All @@ -407,8 +410,18 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At
return GoTo(Idle.Instance).Using(ReadRequired.Instance);

case Status.Failure failure:
// FIXME, retry if timeout far enough off: https://github.com/lightbend/akka-commercial-addons/issues/501
_log.Warning(failure.Cause, "Failure during heartbeat to lease. Lease assumed to be released.");
var timeSinceLastHeartbeat = DateTime.UtcNow - gv.LastSuccessfulHeartbeat;
if (timeSinceLastHeartbeat < _timeoutOffset - settings.TimeoutSettings.HeartbeatInterval)
{
_log.Warning(failure.Cause,
"Transient failure during heartbeat to lease {0}. TTL still valid ({1:F0}s remaining). Retrying.",
leaseName,
(_timeoutOffset - timeSinceLastHeartbeat).TotalSeconds);
Timers!.StartSingleTimer("heartbeat", Heartbeat.Instance, settings.TimeoutSettings.HeartbeatInterval);
return Stay();
}
_log.Warning(failure.Cause,
"Failure during heartbeat to lease {0}. TTL window expired, releasing lease.", leaseName);
localGranted.GetAndSet(false);
ExecuteLeaseLockCallback(leaseLost, failure.Cause);
return GoTo(Idle.Instance).Using(ReadRequired.Instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,13 @@ public void HeartBeatFailShouldSetGrantedToFalse()
{
RunTest(() =>
{
var failure = new LeaseException("Failed to communicate with API server");
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(failure));
AwaitAssert(() =>
{
Granted.Value.Should().BeFalse();
});
// With retry logic, multiple failures are needed to exhaust the TTL window
HeartBeatFailure();
Granted.Value.Should().BeFalse();
});
}

Expand All @@ -354,16 +349,107 @@ public void HeartBeatFailShouldCallLeaseLostCallback()
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(failure));
// With retry logic, drive failures until TTL expires and callback fires
DriveHeartBeatFailures(failure);
AwaitAssert(() =>
{
callbackCalled.Should().Be(failure);
});
});
}

[Fact(DisplayName = "transient heartbeat failure should retry and stay granted")]
public void TransientHeartBeatFailureShouldRetryAndStayGranted()
{
RunTest(() =>
{
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// First heartbeat fails transiently
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure")));

// Should still be granted - actor retries within TTL window
Granted.Value.Should().BeTrue();

// Retry heartbeat succeeds
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

// Should still be granted and heartbeating normally
Granted.Value.Should().BeTrue();

// Normal heartbeat continues
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
});
}

[Fact(DisplayName = "multiple transient heartbeat failures should recover on success")]
public void MultipleTransientHeartBeatFailuresShouldRecover()
{
RunTest(() =>
{
AcquireLease();
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// Multiple consecutive failures, all within TTL window
for (var i = 0; i < 3; i++)
{
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException($"Transient failure {i}")));
Granted.Value.Should().BeTrue();
}

// Recovery: next heartbeat succeeds
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

// Lease is still held
Granted.Value.Should().BeTrue();
});
}

[Fact(DisplayName = "heartbeat failure should not call lease lost callback during retry")]
public void HeartBeatFailureShouldNotCallLeaseLostCallbackDuringRetry()
{
RunTest(() =>
{
var callbackCalled = false;
AcquireLease(e =>
{
callbackCalled = true;
});
ExpectHeartBeat();
Granted.Value.Should().BeTrue();

// Single transient failure within TTL
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
UpdateProbe.Reply(new Status.Failure(new LeaseException("Transient failure")));

// Callback should NOT be called - TTL still valid
callbackCalled.Should().BeFalse();
Granted.Value.Should().BeTrue();

// Recovery
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(
new Right<LeaseResource, LeaseResource>(
new LeaseResource(OwnerName, CurrentVersion, CurrentTime)));

callbackCalled.Should().BeFalse();
});
}

[Fact(DisplayName = "lock should be acquire-able after heart beat conflict")]
public void LockShouldAcquireAfterHeartBeatConflict()
{
Expand Down Expand Up @@ -840,13 +926,18 @@ protected void HeartBeatConflict()

protected void HeartBeatFailure()
{
UpdateProbe.ExpectMsg((OwnerName, CurrentVersion));
IncrementVersion();
UpdateProbe.Reply(new Status.Failure(new LeaseException("Failed to communicate with API server")));
AwaitAssert(() =>
DriveHeartBeatFailures(new LeaseException("Failed to communicate with API server"));
}

protected void DriveHeartBeatFailures(Exception failure)
{
while (Granted.Value)
{
Granted.Value.Should().BeFalse();
});
UpdateProbe.ExpectMsg<(string, string)>(TimeSpan.FromSeconds(2));
UpdateProbe.Reply(new Status.Failure(failure));
Task.Delay(10).Wait();
}
AwaitAssert(() => Granted.Value.Should().BeFalse());
}
}
}
}
Loading