Skip to content

Commit 320ee41

Browse files
authored
Merge pull request #83 from UiPath/pool_tcs
Pool TaskCompletionSource-s
2 parents 3a3b729 + 0254f24 commit 320ee41

File tree

7 files changed

+74
-32
lines changed

7 files changed

+74
-32
lines changed

src/UiPath.CoreIpc.Tests/SystemTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public virtual async Task UploadNoRead()
175175
(await _systemClient.UploadNoRead(new MemoryStream(Encoding.UTF8.GetBytes("Hello world")))).ShouldBeEmpty();
176176
}
177177
catch (IOException) { }
178+
catch (ObjectDisposedException) { }
178179
await Guid();
179180
}
180181

src/UiPath.CoreIpc.Tests/WebSocketTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ protected override ServiceHostBuilder Configure(ServiceHostBuilder serviceHostBu
1111
}
1212
public override void Dispose()
1313
{
14-
_listener?.Dispose();
1514
base.Dispose();
15+
_listener?.Dispose();
1616
}
1717
protected override WebSocketClientBuilder<ISystemService> CreateSystemClientBuilder() => new(new("ws"+GetEndPoint()));
1818
[Fact]
@@ -43,7 +43,7 @@ protected override ServiceHostBuilder Configure(ServiceHostBuilder serviceHostBu
4343
}
4444
public override void Dispose()
4545
{
46-
_listener?.Dispose();
4746
base.Dispose();
47+
_listener?.Dispose();
4848
}
4949
}

src/UiPath.CoreIpc/CancellationTokenSourcePool.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ public static PooledCancellationTokenSource Rent()
1818
}
1919
private static bool Return(PooledCancellationTokenSource cts)
2020
{
21-
if (!cts.TryReset())
22-
{
23-
return false;
24-
}
2521
if (Interlocked.Increment(ref Count) > MaxQueueSize)
2622
{
2723
Interlocked.Decrement(ref Count);
@@ -32,16 +28,15 @@ private static bool Return(PooledCancellationTokenSource cts)
3228
}
3329
public sealed class PooledCancellationTokenSource : CancellationTokenSource
3430
{
35-
protected override void Dispose(bool disposing)
31+
public void Return()
3632
{
3733
// If we failed to return to the pool then dispose
38-
if (disposing && !Return(this))
34+
#if !NET461
35+
if (!TryReset() || !CancellationTokenSourcePool.Return(this))
36+
#endif
3937
{
40-
base.Dispose(disposing);
38+
Dispose();
4139
}
4240
}
43-
#if NET461
44-
public bool TryReset() => false;
45-
#endif
4641
}
4742
}

src/UiPath.CoreIpc/Connection.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
namespace UiPath.CoreIpc;
2-
3-
using RequestCompletionSource = TaskCompletionSource<Response>;
2+
using static TaskCompletionPool<Response>;
43
using static IOHelpers;
54
public sealed class Connection : IDisposable
65
{
7-
private readonly ConcurrentDictionary<string, RequestCompletionSource> _requests = new();
6+
private static readonly IOException ClosedException = new("Connection closed.");
7+
private readonly ConcurrentDictionary<string, ManualResetValueTaskSource> _requests = new();
88
private long _requestCounter = -1;
99
private readonly int _maxMessageSize;
1010
private readonly Lazy<Task> _receiveLoop;
@@ -47,19 +47,20 @@ public Connection(Stream network, ISerializer serializer, ILogger logger, string
4747
#endif
4848
internal async ValueTask<Response> RemoteCall(Request request, CancellationToken token)
4949
{
50-
var requestCompletion = new RequestCompletionSource();
50+
var requestCompletion = Rent();
5151
_requests[request.Id] = requestCompletion;
5252
CancellationTokenRegistration tokenRegistration = default;
5353
try
5454
{
5555
await Send(request, token);
5656
tokenRegistration = token.UnsafeRegister(request.UploadStream == null ? _cancelRequest : _cancelUploadRequest, request.Id);
57-
return await requestCompletion.Task;
57+
return await requestCompletion.ValueTask();
5858
}
5959
finally
6060
{
61-
tokenRegistration.Dispose();
6261
_requests.TryRemove(request.Id, out _);
62+
tokenRegistration.Dispose();
63+
requestCompletion.Return();
6364
}
6465
}
6566
internal ValueTask Send(Request request, CancellationToken token)
@@ -85,9 +86,9 @@ void CancelUploadRequest(string requestId)
8586
}
8687
private void TryCancelRequest(string requestId)
8788
{
88-
if (_requests.TryGetValue(requestId, out var requestCompletion))
89+
if (_requests.TryRemove(requestId, out var requestCompletion))
8990
{
90-
requestCompletion.TrySetCanceled();
91+
requestCompletion.SetCanceled();
9192
}
9293
}
9394
internal ValueTask Send(Response response, CancellationToken cancellationToken)
@@ -165,9 +166,12 @@ public void Dispose()
165166
}
166167
private void CompleteRequests()
167168
{
168-
foreach (var completionSource in _requests.Values)
169+
foreach (var requestId in _requests.Keys)
169170
{
170-
completionSource.TrySetException(new IOException("Connection closed."));
171+
if (_requests.TryRemove(requestId, out var requestCompletion))
172+
{
173+
requestCompletion.SetException(ClosedException);
174+
}
171175
}
172176
}
173177
#if !NET461
@@ -287,7 +291,7 @@ private async Task EnterStreamMode()
287291
{
288292
if (!await ReadBuffer(sizeof(long)))
289293
{
290-
throw new IOException("Connection closed.");
294+
throw ClosedException;
291295
}
292296
var userStreamLength = BitConverter.ToInt64(_buffer, startIndex: 0);
293297
_nestedStream.Reset(userStreamLength);
@@ -340,9 +344,9 @@ private void OnResponseReceived(Response response)
340344
{
341345
Log($"Received response for request {response.RequestId} {Name}.");
342346
}
343-
if (_requests.TryGetValue(response.RequestId, out var completionSource))
347+
if (_requests.TryRemove(response.RequestId, out var completionSource))
344348
{
345-
completionSource.TrySetResult(response);
349+
completionSource.SetResult(response);
346350
}
347351
}
348352
catch (Exception ex)

src/UiPath.CoreIpc/Helpers.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public void ThrowTimeout(Exception exception, string message)
291291
public void Dispose()
292292
{
293293
_linkedRegistration.Dispose();
294-
_timeoutCancellationSource.Dispose();
294+
_timeoutCancellationSource.Return();
295295
}
296296
public CancellationToken Token => _timeoutCancellationSource.Token;
297297
}

src/UiPath.CoreIpc/Server/Server.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
using System.Linq.Expressions;
2-
32
namespace UiPath.CoreIpc;
4-
53
using GetTaskResultFunc = Func<Task, object>;
64
using MethodExecutor = Func<object, object[], CancellationToken, Task>;
75
using static Expression;
6+
using static CancellationTokenSourcePool;
87
class Server
98
{
109
private static readonly MethodInfo GetResultMethod = typeof(Server).GetStaticMethod(nameof(GetTaskResultImpl));
1110
private static readonly ConcurrentDictionary<(Type,string), Method> Methods = new();
1211
private static readonly ConcurrentDictionary<Type, GetTaskResultFunc> GetTaskResultByType = new();
1312
private readonly Connection _connection;
1413
private readonly IClient _client;
15-
private readonly ConcurrentDictionary<string, CancellationTokenSource> _requests = new();
14+
private readonly ConcurrentDictionary<string, PooledCancellationTokenSource> _requests = new();
1615
public Server(ListenerSettings settings, Connection connection, IClient client = null)
1716
{
1817
Settings = settings;
@@ -44,7 +43,7 @@ void CancelRequest(string requestId)
4443
if (_requests.TryRemove(requestId, out var cancellation))
4544
{
4645
cancellation.Cancel();
47-
cancellation.Dispose();
46+
cancellation.Return();
4847
}
4948
}
5049
#if !NET461
@@ -70,7 +69,7 @@ async ValueTask OnRequestReceived(Request request)
7069
return;
7170
}
7271
Response response = null;
73-
var requestCancellation = CancellationTokenSourcePool.Rent();
72+
var requestCancellation = Rent();
7473
_requests[request.Id] = requestCancellation;
7574
var timeout = request.GetTimeout(Settings.RequestTimeout);
7675
var timeoutHelper = new TimeoutHelper(timeout, requestCancellation.Token);
@@ -99,7 +98,7 @@ async ValueTask OnRequestReceived(Request request)
9998
}
10099
if (_requests.TryRemove(request.Id, out var cancellation))
101100
{
102-
cancellation.Dispose();
101+
cancellation.Return();
103102
}
104103
}
105104
ValueTask OnError(Request request, Exception ex)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System.Threading.Tasks.Sources;
2+
namespace UiPath.CoreIpc;
3+
internal static class TaskCompletionPool<T>
4+
{
5+
private const int MaxQueueSize = 1024;
6+
private static readonly ConcurrentQueue<ManualResetValueTaskSource> Cache = new();
7+
private static int Count;
8+
public static ManualResetValueTaskSource Rent()
9+
{
10+
if (Cache.TryDequeue(out var source))
11+
{
12+
Interlocked.Decrement(ref Count);
13+
return source;
14+
}
15+
return new();
16+
}
17+
private static void Return(ManualResetValueTaskSource source)
18+
{
19+
if (Interlocked.Increment(ref Count) > MaxQueueSize)
20+
{
21+
Interlocked.Decrement(ref Count);
22+
return;
23+
}
24+
source.Reset();
25+
Cache.Enqueue(source);
26+
}
27+
public sealed class ManualResetValueTaskSource : IValueTaskSource<T>, IValueTaskSource
28+
{
29+
private ManualResetValueTaskSourceCore<T> _core; // mutable struct; do not make this readonly
30+
public bool RunContinuationsAsynchronously { get => _core.RunContinuationsAsynchronously; set => _core.RunContinuationsAsynchronously = value; }
31+
public short Version => _core.Version;
32+
public ValueTask<T> ValueTask() => new(this, Version);
33+
public void Reset() => _core.Reset();
34+
public void SetResult(T result) => _core.SetResult(result);
35+
public void SetException(Exception error) => _core.SetException(error);
36+
public void SetCanceled() => _core.SetException(new TaskCanceledException());
37+
public T GetResult(short token) => _core.GetResult(token);
38+
void IValueTaskSource.GetResult(short token) => _core.GetResult(token);
39+
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
40+
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
41+
public void Return() => TaskCompletionPool<T>.Return(this);
42+
}
43+
}

0 commit comments

Comments
 (0)