Skip to content

Commit 86f924d

Browse files
authored
Refactor Confirmation Pipe and Heatbeat start (#141)
* Move the confirmation pipe after the Producer initialization to avoid pendig thread in case the producer init fails * Move the heartbeat start after the login to avoid pending threads in case the client fails to connect for some reason. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 87ac5f4 commit 86f924d

File tree

4 files changed

+18
-4
lines changed

4 files changed

+18
-4
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public int Write(Span<byte> span)
8585

8686
public class Client : IClient
8787
{
88-
private bool isClosed = false;
88+
private bool isClosed = true;
8989

9090
private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);
9191

@@ -165,6 +165,11 @@ private Client(ClientParameters parameters)
165165
IsClosed = false;
166166
}
167167

168+
private void StartHeartBeat()
169+
{
170+
_heartBeatHandler.Start();
171+
}
172+
168173
public delegate Task ConnectionCloseHandler(string reason);
169174

170175
public event ConnectionCloseHandler ConnectionClosed;
@@ -228,6 +233,8 @@ await client.Publish(new TuneRequest(0,
228233
client.ConnectionProperties = open.ConnectionProperties;
229234

230235
client.correlationId = 100;
236+
// start heart beat only when the client is connected
237+
client.StartHeartBeat();
231238
return client;
232239
}
233240

RabbitMQ.Stream.Client/HeartBeatHandler.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public HeartBeatHandler(Func<ValueTask<bool>> sendHeartbeatFunc,
3030
// }
3131
if (heartbeat > 0)
3232
{
33-
_timer.Enabled = true;
33+
_timer.Enabled = false;
3434
_timer.Interval = heartbeat * 1000;
3535
_timer.Elapsed += (_, _) =>
3636
{
@@ -72,6 +72,11 @@ internal void Close()
7272
_timer.Close();
7373
}
7474

75+
internal void Start()
76+
{
77+
_timer.Enabled = true;
78+
}
79+
7580
internal bool IsActive()
7681
{
7782
return _timer.Enabled;

RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ private ReliableProducer(ReliableProducerConfig reliableProducerConfig)
6161
reliableProducerConfig.ConfirmationHandler,
6262
reliableProducerConfig.TimeoutMessageAfter
6363
);
64-
_confirmationPipe.Start();
6564
}
6665

6766
public static async Task<ReliableProducer> CreateReliableProducer(ReliableProducerConfig reliableProducerConfig)
@@ -114,6 +113,9 @@ protected override async Task GetNewReliable(bool boot)
114113
// Init the publishing id
115114
Interlocked.Exchange(ref _publishingId,
116115
await _producer.GetLastPublishingId());
116+
117+
// confirmation Pipe can start only if the producer is ready
118+
_confirmationPipe.Start();
117119
}
118120
}
119121
catch (Exception e)

Tests/UnitTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ public void HeartBeatRaiseClose()
322322
return null;
323323
},
324324
1);
325-
325+
hBeatHandler.Start();
326326
var r = testPassed.Task.Wait(6_000);
327327
Assert.True(r);
328328
Assert.False(hBeatHandler.IsActive());

0 commit comments

Comments
 (0)