Skip to content

Commit fcd197c

Browse files
authored
Support bounded channel with bound of 0 (rendezvous) (#116097)
* Support bounded channel with bound of 0 (rendezvous) This PR enables Channel.CreateBounded(0), whereas currently a bound of < 1 is exceptional. A bound is the number of items the channel can buffer, so a bound of 0 means it can't buffer anything, which makes it into a rendezvous, where the reader and writer must be at the channel at the same time in order to directly hand off from the writer to the reader. This is the same meaning as in other languages/libraries, e.g. if in go you don't specify a bound or you specify a bound of 0, you similarly get an unbuffered rendezvous channel. * Address PR feedback
1 parent 5edf06e commit fcd197c

15 files changed

+1046
-161
lines changed

src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
2525
<Compile Include="System\Threading\Channels\Channel_1.cs" />
2626
<Compile Include="System\Threading\Channels\Channel_2.cs" />
2727
<Compile Include="System\Threading\Channels\IDebugEnumerator.cs" />
28+
<Compile Include="System\Threading\Channels\RendezvousChannel.cs" />
2829
<Compile Include="System\Threading\Channels\SingleConsumerUnboundedChannel.cs" />
2930
<Compile Include="System\Threading\Channels\UnboundedChannel.cs" />
3031
<Compile Include="$(CommonPath)Internal\Padding.cs" Link="Common\Internal\Padding.cs" />

src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,7 @@ public override bool TryWrite(T item)
388388
// There are no items in the channel, which means we may have blocked/waiting readers.
389389

390390
// Try to get a blocked reader that we can transfer the item to.
391-
while (ChannelUtilities.TryDequeue(ref parent._blockedReadersHead, out blockedReader))
392-
{
393-
if (blockedReader.TryReserveCompletionIfCancelable())
394-
{
395-
break;
396-
}
397-
}
391+
blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);
398392

399393
// If we weren't able to get a reader, instead queue the item and get any waiters that need to be notified.
400394
if (blockedReader is null)
@@ -551,13 +545,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
551545
// There are no items in the channel, which means we may have blocked/waiting readers.
552546

553547
// Try to get a blocked reader that we can transfer the item to.
554-
while (ChannelUtilities.TryDequeue(ref parent._blockedReadersHead, out blockedReader))
555-
{
556-
if (blockedReader.TryReserveCompletionIfCancelable())
557-
{
558-
break;
559-
}
560-
}
548+
blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);
561549

562550
// If we weren't able to get a reader, instead queue the item and get any waiters that need to be notified.
563551
if (blockedReader is null)

src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public static Channel<T> CreateUnbounded<T>() =>
1515
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
1616
/// <param name="options">Options that guide the behavior of the channel.</param>
1717
/// <returns>The created channel.</returns>
18+
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
1819
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options)
1920
{
2021
ArgumentNullException.ThrowIfNull(options);
@@ -35,35 +36,33 @@ public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options)
3536
/// Channels created with this method apply the <see cref="BoundedChannelFullMode.Wait"/>
3637
/// behavior and prohibit continuations from running synchronously.
3738
/// </remarks>
38-
public static Channel<T> CreateBounded<T>(int capacity)
39-
{
40-
if (capacity < 1)
41-
{
42-
throw new ArgumentOutOfRangeException(nameof(capacity));
43-
}
44-
45-
return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null);
46-
}
39+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is negative.</exception>
40+
public static Channel<T> CreateBounded<T>(int capacity) =>
41+
capacity > 0 ? new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null) :
42+
capacity == 0 ? new RendezvousChannel<T>(BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null) :
43+
throw new ArgumentOutOfRangeException(nameof(capacity));
4744

4845
/// <summary>Creates a channel subject to the provided options.</summary>
4946
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
5047
/// <param name="options">Options that guide the behavior of the channel.</param>
5148
/// <returns>The created channel.</returns>
52-
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
53-
{
54-
return CreateBounded<T>(options, itemDropped: null);
55-
}
49+
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
50+
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options) =>
51+
CreateBounded<T>(options, itemDropped: null);
5652

5753
/// <summary>Creates a channel subject to the provided options.</summary>
5854
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
5955
/// <param name="options">Options that guide the behavior of the channel.</param>
6056
/// <param name="itemDropped">Delegate that will be called when item is being dropped from channel. See <see cref="BoundedChannelFullMode"/>.</param>
6157
/// <returns>The created channel.</returns>
58+
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
6259
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped)
6360
{
6461
ArgumentNullException.ThrowIfNull(options);
6562

66-
return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
63+
return
64+
options.Capacity > 0 ? new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped) :
65+
new RendezvousChannel<T>(options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
6766
}
6867
}
6968
}

src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ public sealed class BoundedChannelOptions : ChannelOptions
4949

5050
/// <summary>Initializes the options.</summary>
5151
/// <param name="capacity">The maximum number of items the bounded channel may store.</param>
52+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is negative.</exception>
5253
public BoundedChannelOptions(int capacity)
5354
{
54-
if (capacity < 1)
55+
if (capacity < 0)
5556
{
5657
throw new ArgumentOutOfRangeException(nameof(capacity));
5758
}
@@ -60,12 +61,13 @@ public BoundedChannelOptions(int capacity)
6061
}
6162

6263
/// <summary>Gets or sets the maximum number of items the bounded channel may store.</summary>
64+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="value"/> is negative.</exception>
6365
public int Capacity
6466
{
6567
get => _capacity;
6668
set
6769
{
68-
if (value < 1)
70+
if (value < 0)
6971
{
7072
throw new ArgumentOutOfRangeException(nameof(value));
7173
}
@@ -74,6 +76,7 @@ public int Capacity
7476
}
7577

7678
/// <summary>Gets or sets the behavior incurred by write operations when the channel is full.</summary>
79+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="value"/> is an invalid enum value.</exception>
7780
public BoundedChannelFullMode FullMode
7881
{
7982
get => _mode;

src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,23 @@ internal static ValueTask<T> GetInvalidCompletionValueTask<T>(Exception error)
6666
return new ValueTask<T>(t);
6767
}
6868

69+
/// <summary>Dequeues from <paramref name="head"/> until an element is dequeued that can have completion reserved.</summary>
70+
/// <param name="head">The head of the list, with items dequeued up through the returned element, or entirely if <see langword="null"/> is returned.</param>
71+
/// <returns>The operation on which completion has been reserved, or null if none can be found.</returns>
72+
internal static TAsyncOp? TryDequeueAndReserveCompletionIfCancelable<TAsyncOp>(ref TAsyncOp? head)
73+
where TAsyncOp : AsyncOperation<TAsyncOp>
74+
{
75+
while (ChannelUtilities.TryDequeue(ref head, out var op))
76+
{
77+
if (op.TryReserveCompletionIfCancelable())
78+
{
79+
return op;
80+
}
81+
}
82+
83+
return null;
84+
}
85+
6986
/// <summary>Dequeues an operation from the circular doubly-linked list referenced by <paramref name="head"/>.</summary>
7087
/// <param name="head">The head of the list.</param>
7188
/// <param name="op">The dequeued operation.</param>
@@ -317,6 +334,29 @@ internal static void AssertAll<TAsyncOp>(TAsyncOp? head, Func<TAsyncOp, bool> co
317334
}
318335
}
319336

337+
/// <summary>Counts the number of operations in the list.</summary>
338+
/// <param name="head">The head of the queue of operations to count.</param>
339+
internal static long CountOperations<TAsyncOp>(TAsyncOp? head)
340+
where TAsyncOp : AsyncOperation<TAsyncOp>
341+
{
342+
TAsyncOp? current = head;
343+
long count = 0;
344+
345+
if (current is not null)
346+
{
347+
do
348+
{
349+
count++;
350+
351+
Debug.Assert(current is not null);
352+
current = current.Next;
353+
}
354+
while (current != head);
355+
}
356+
357+
return count;
358+
}
359+
320360
/// <summary>Creates and returns an exception object to indicate that a channel has been closed.</summary>
321361
internal static Exception CreateInvalidCompletionException(Exception? inner = null) =>
322362
inner is OperationCanceledException ? inner :

0 commit comments

Comments
 (0)