Skip to content

Commit da7cfbe

Browse files
authored
Merge pull request #452 from devstress/copilot/continue-todo-root
Complete Phase 2 & Phase 3: Heartbeat Monitoring + TaskManager Execution Engine + Documentation
2 parents a314538 + 2743435 commit da7cfbe

24 files changed

+2580
-161
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2025 FlinkDotNet
2+
// Licensed under the Apache License, Version 2.0.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using FlinkDotNet.JobManager.Implementation;
6+
using FlinkDotNet.JobManager.Interfaces;
7+
using FluentAssertions;
8+
using Microsoft.Extensions.Logging;
9+
using Microsoft.Extensions.Options;
10+
using Moq;
11+
12+
namespace FlinkDotNet.JobManager.Tests;
13+
14+
public class HeartbeatMonitoringServiceTests
15+
{
16+
private readonly Mock<IResourceManager> _mockResourceManager;
17+
private readonly Mock<ILogger<HeartbeatMonitoringService>> _mockLogger;
18+
private readonly HeartbeatConfiguration _configuration;
19+
20+
public HeartbeatMonitoringServiceTests()
21+
{
22+
_mockResourceManager = new Mock<IResourceManager>();
23+
_mockLogger = new Mock<ILogger<HeartbeatMonitoringService>>();
24+
_configuration = new HeartbeatConfiguration
25+
{
26+
TimeoutSeconds = 2, // Short timeout for testing
27+
CheckIntervalSeconds = 1 // Short interval for testing
28+
};
29+
}
30+
31+
[Fact]
32+
public void Constructor_WithNullResourceManager_ThrowsArgumentNullException()
33+
{
34+
// Arrange & Act & Assert
35+
var act = () => new HeartbeatMonitoringService(
36+
null!,
37+
Options.Create(_configuration),
38+
_mockLogger.Object);
39+
40+
act.Should().Throw<ArgumentNullException>()
41+
.WithParameterName("resourceManager");
42+
}
43+
44+
[Fact]
45+
public void Constructor_WithNullConfiguration_ThrowsArgumentNullException()
46+
{
47+
// Arrange & Act & Assert
48+
var act = () => new HeartbeatMonitoringService(
49+
_mockResourceManager.Object,
50+
null!,
51+
_mockLogger.Object);
52+
53+
act.Should().Throw<ArgumentNullException>();
54+
}
55+
56+
[Fact]
57+
public void Constructor_WithNullLogger_ThrowsArgumentNullException()
58+
{
59+
// Arrange & Act & Assert
60+
var act = () => new HeartbeatMonitoringService(
61+
_mockResourceManager.Object,
62+
Options.Create(_configuration),
63+
null!);
64+
65+
act.Should().Throw<ArgumentNullException>()
66+
.WithParameterName("logger");
67+
}
68+
69+
[Fact]
70+
public async Task HeartbeatMonitoring_DetectsTimeout_AndUnregistersTaskManager()
71+
{
72+
// Arrange
73+
var taskManagerId = "tm-timeout";
74+
var oldHeartbeat = DateTime.UtcNow.AddSeconds(-10); // Old heartbeat (10 seconds ago)
75+
76+
_mockResourceManager
77+
.Setup(rm => rm.GetRegisteredTaskManagers())
78+
.Returns(new[] { taskManagerId });
79+
80+
_mockResourceManager
81+
.Setup(rm => rm.GetLastHeartbeat(taskManagerId))
82+
.Returns(oldHeartbeat);
83+
84+
var service = new HeartbeatMonitoringService(
85+
_mockResourceManager.Object,
86+
Options.Create(_configuration),
87+
_mockLogger.Object);
88+
89+
// Act
90+
await service.StartAsync(CancellationToken.None);
91+
await Task.Delay(TimeSpan.FromSeconds(2)); // Wait for check interval
92+
await service.StopAsync(CancellationToken.None);
93+
94+
// Assert
95+
_mockResourceManager.Verify(
96+
rm => rm.UnregisterTaskManagerAsync(taskManagerId, It.IsAny<CancellationToken>()),
97+
Times.AtLeastOnce());
98+
}
99+
100+
[Fact]
101+
public async Task HeartbeatMonitoring_WithRecentHeartbeat_DoesNotUnregister()
102+
{
103+
// Arrange
104+
var taskManagerId = "tm-healthy";
105+
106+
_mockResourceManager
107+
.Setup(rm => rm.GetRegisteredTaskManagers())
108+
.Returns(new[] { taskManagerId });
109+
110+
// Return a fresh heartbeat each time it's queried
111+
_mockResourceManager
112+
.Setup(rm => rm.GetLastHeartbeat(taskManagerId))
113+
.Returns(() => DateTime.UtcNow);
114+
115+
var service = new HeartbeatMonitoringService(
116+
_mockResourceManager.Object,
117+
Options.Create(_configuration),
118+
_mockLogger.Object);
119+
120+
// Act
121+
await service.StartAsync(CancellationToken.None);
122+
await Task.Delay(TimeSpan.FromSeconds(2)); // Wait for check interval
123+
await service.StopAsync(CancellationToken.None);
124+
125+
// Assert
126+
_mockResourceManager.Verify(
127+
rm => rm.UnregisterTaskManagerAsync(taskManagerId, It.IsAny<CancellationToken>()),
128+
Times.Never());
129+
}
130+
131+
[Fact]
132+
public async Task HeartbeatMonitoring_WithNoTaskManagers_DoesNothing()
133+
{
134+
// Arrange
135+
_mockResourceManager
136+
.Setup(rm => rm.GetRegisteredTaskManagers())
137+
.Returns(Array.Empty<string>());
138+
139+
var service = new HeartbeatMonitoringService(
140+
_mockResourceManager.Object,
141+
Options.Create(_configuration),
142+
_mockLogger.Object);
143+
144+
// Act
145+
await service.StartAsync(CancellationToken.None);
146+
await Task.Delay(TimeSpan.FromSeconds(2)); // Wait for check interval
147+
await service.StopAsync(CancellationToken.None);
148+
149+
// Assert
150+
_mockResourceManager.Verify(
151+
rm => rm.UnregisterTaskManagerAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()),
152+
Times.Never());
153+
}
154+
155+
[Fact]
156+
public void HeartbeatConfiguration_HasCorrectDefaults()
157+
{
158+
// Arrange & Act
159+
var config = new HeartbeatConfiguration();
160+
161+
// Assert
162+
config.TimeoutSeconds.Should().Be(30);
163+
config.CheckIntervalSeconds.Should().Be(10);
164+
HeartbeatConfiguration.SectionName.Should().Be("Heartbeat");
165+
}
166+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2025 FlinkDotNet
2+
// Licensed under the Apache License, Version 2.0.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using FlinkDotNet.JobManager.Implementation;
6+
using FluentAssertions;
7+
using Microsoft.Extensions.Logging;
8+
using Moq;
9+
10+
namespace FlinkDotNet.JobManager.Tests;
11+
12+
public class HeartbeatTests
13+
{
14+
private readonly Mock<ILogger<ResourceManager>> _mockLogger;
15+
private readonly ResourceManager _resourceManager;
16+
17+
public HeartbeatTests()
18+
{
19+
_mockLogger = new Mock<ILogger<ResourceManager>>();
20+
_resourceManager = new ResourceManager(_mockLogger.Object);
21+
}
22+
23+
[Fact]
24+
public async Task RecordHeartbeatAsync_UpdatesLastHeartbeatTimestamp()
25+
{
26+
// Arrange
27+
var taskManagerId = "tm-heartbeat-1";
28+
var numberOfSlots = 4;
29+
30+
await _resourceManager.RegisterTaskManagerAsync(taskManagerId, numberOfSlots);
31+
DateTime? initialHeartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
32+
33+
// Wait a small amount to ensure timestamp difference
34+
await Task.Delay(10);
35+
36+
// Act
37+
await _resourceManager.RecordHeartbeatAsync(taskManagerId);
38+
39+
// Assert
40+
DateTime? updatedHeartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
41+
42+
updatedHeartbeat.Should().NotBeNull();
43+
initialHeartbeat.Should().NotBeNull();
44+
updatedHeartbeat.Should().BeAfter(initialHeartbeat.Value);
45+
}
46+
47+
[Fact]
48+
public async Task RecordHeartbeatAsync_ForUnregisteredTaskManager_LogsWarning()
49+
{
50+
// Arrange
51+
var unregisteredTaskManagerId = "tm-unregistered";
52+
53+
// Act
54+
await _resourceManager.RecordHeartbeatAsync(unregisteredTaskManagerId);
55+
56+
// Assert
57+
// Verify that a warning was logged (implementation logs warning)
58+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(unregisteredTaskManagerId);
59+
heartbeat.Should().BeNull();
60+
}
61+
62+
[Fact]
63+
public async Task GetLastHeartbeat_ForRegisteredTaskManager_ReturnsTimestamp()
64+
{
65+
// Arrange
66+
var taskManagerId = "tm-heartbeat-2";
67+
var numberOfSlots = 4;
68+
69+
await _resourceManager.RegisterTaskManagerAsync(taskManagerId, numberOfSlots);
70+
71+
// Act
72+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
73+
74+
// Assert
75+
heartbeat.Should().NotBeNull();
76+
heartbeat.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
77+
}
78+
79+
[Fact]
80+
public void GetLastHeartbeat_ForUnregisteredTaskManager_ReturnsNull()
81+
{
82+
// Arrange
83+
var unregisteredTaskManagerId = "tm-not-registered";
84+
85+
// Act
86+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(unregisteredTaskManagerId);
87+
88+
// Assert
89+
heartbeat.Should().BeNull();
90+
}
91+
92+
[Fact]
93+
public async Task RegisterTaskManagerAsync_InitializesLastHeartbeat()
94+
{
95+
// Arrange
96+
var taskManagerId = "tm-heartbeat-3";
97+
var numberOfSlots = 4;
98+
99+
// Act
100+
await _resourceManager.RegisterTaskManagerAsync(taskManagerId, numberOfSlots);
101+
102+
// Assert
103+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
104+
heartbeat.Should().NotBeNull();
105+
heartbeat.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
106+
}
107+
108+
[Fact]
109+
public async Task MultipleHeartbeats_UpdateTimestampSequentially()
110+
{
111+
// Arrange
112+
var taskManagerId = "tm-heartbeat-4";
113+
var numberOfSlots = 4;
114+
115+
await _resourceManager.RegisterTaskManagerAsync(taskManagerId, numberOfSlots);
116+
117+
// Act & Assert
118+
DateTime? heartbeat1 = _resourceManager.GetLastHeartbeat(taskManagerId);
119+
heartbeat1.Should().NotBeNull();
120+
121+
await Task.Delay(10);
122+
await _resourceManager.RecordHeartbeatAsync(taskManagerId);
123+
DateTime? heartbeat2 = _resourceManager.GetLastHeartbeat(taskManagerId);
124+
heartbeat2.Should().BeAfter(heartbeat1.Value);
125+
126+
await Task.Delay(10);
127+
await _resourceManager.RecordHeartbeatAsync(taskManagerId);
128+
DateTime? heartbeat3 = _resourceManager.GetLastHeartbeat(taskManagerId);
129+
heartbeat3.Should().BeAfter(heartbeat2.Value);
130+
}
131+
132+
[Fact]
133+
public async Task ConcurrentHeartbeats_AreThreadSafe()
134+
{
135+
// Arrange
136+
var taskManagerId = "tm-concurrent";
137+
var numberOfSlots = 4;
138+
139+
await _resourceManager.RegisterTaskManagerAsync(taskManagerId, numberOfSlots);
140+
141+
// Act - Send concurrent heartbeats
142+
var tasks = Enumerable.Range(0, 10).Select(_ =>
143+
Task.Run(async () => await _resourceManager.RecordHeartbeatAsync(taskManagerId))
144+
);
145+
146+
await Task.WhenAll(tasks);
147+
148+
// Assert - Should not throw and should have a valid timestamp
149+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
150+
heartbeat.Should().NotBeNull();
151+
}
152+
153+
[Fact]
154+
public void SynchronousRegisterTaskManager_InitializesLastHeartbeat()
155+
{
156+
// Arrange
157+
var taskManagerId = "tm-sync-heartbeat";
158+
var numberOfSlots = 4;
159+
160+
// Act
161+
_resourceManager.RegisterTaskManager(taskManagerId, numberOfSlots);
162+
163+
// Assert
164+
DateTime? heartbeat = _resourceManager.GetLastHeartbeat(taskManagerId);
165+
heartbeat.Should().NotBeNull();
166+
heartbeat.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
167+
}
168+
}

FlinkDotNet/FlinkDotNet.JobManager.Tests/JobsControllerTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// Licensed under the Apache License, Version 2.0.
33
// See LICENSE file in the project root for full license information.
44

5+
#nullable enable
6+
57
using FlinkDotNet.JobManager.Controllers;
68
using FlinkDotNet.JobManager.Interfaces;
79
using FlinkDotNet.JobManager.Models;
@@ -127,7 +129,7 @@ public async Task GetJobStatus_WithNonExistentJob_ReturnsNotFound()
127129

128130
_mockDispatcher
129131
.Setup(d => d.GetJobStatusAsync(jobId, It.IsAny<CancellationToken>()))
130-
.ReturnsAsync((JobStatus?) null);
132+
.ReturnsAsync((JobStatus?)null);
131133

132134
// Act
133135
var result = await _controller.GetJobStatus(jobId);

FlinkDotNet/FlinkDotNet.JobManager/Controllers/ClusterController.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,29 @@ public IActionResult UnregisterTaskManager(string taskManagerId)
154154
message = $"TaskManager {taskManagerId} unregistered successfully"
155155
});
156156
}
157+
158+
/// <summary>
159+
/// Record heartbeat from a TaskManager.
160+
/// TaskManagers should call this endpoint periodically to indicate they are alive.
161+
/// </summary>
162+
/// <param name="taskManagerId">ID of the TaskManager sending the heartbeat.</param>
163+
/// <returns>Heartbeat acknowledgement.</returns>
164+
[HttpPost("taskmanagers/{taskManagerId}/heartbeat")]
165+
[ProducesResponseType(typeof(object), StatusCodes.Status200OK)]
166+
[ProducesResponseType(StatusCodes.Status404NotFound)]
167+
public async Task<IActionResult> RecordHeartbeat(string taskManagerId)
168+
{
169+
this._logger.LogDebug("Received heartbeat from TaskManager: {TaskManagerId}", taskManagerId);
170+
171+
await this._resourceManager.RecordHeartbeatAsync(taskManagerId);
172+
173+
return Ok(new
174+
{
175+
message = "Heartbeat recorded",
176+
taskManagerId,
177+
timestamp = DateTime.UtcNow
178+
});
179+
}
157180
}
158181

159182
/// <summary>

0 commit comments

Comments
 (0)