-
Notifications
You must be signed in to change notification settings - Fork 782
Change Ix.Async Amb() to cancel the losers, add unit tests #914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bartdesmet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good improvement to Amb.
| // The incoming cancellationToken should still be able to cancel both | ||
| // | ||
|
|
||
| var bothRegistry = cancellationToken.Register(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationTokenSource.CreateLinkedTokenSource could be used to do away with some of the Register complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) | ||
| }; | ||
|
|
||
| bothRegistry.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should both cancellation token sources be Cancel'ed here as to unblock the non-failing MoveNextAsync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We this in the N-ary case, but not here it seems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I'll update the code and add tests for this case.
| { | ||
| individualTokenSources[i] = new CancellationTokenSource(); | ||
| } | ||
| var allIndividualDispose = cancellationToken.Register(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See remark above; linked cancellation token sources would be easier. They represent an OR between their own cancellation state and the cancellation state of the CancellationToken they wrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This OR relation was not apparent to me from the documentation.
| { | ||
| var cleanup = new Task[n]; | ||
|
|
||
| for (var i = 0; i < n; i++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually clean up in opposite order for the binary variant. We may want to reverse this loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updating.
| { | ||
| await moveNextAsync.ConfigureAwait(false); | ||
| } | ||
| catch (TaskCanceledException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth considering passing the CancellationToken down to this method and changing this exception handler using a when filter to only handle the exception if the cancellation is due to the given token.
catch (TaskCanceledException tce) when (tce.CancellationToken == token)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't make this work. Upon cancellation, the tce.CancellationToken is not equal to token and tce.CancellationToken.IsCancellationRequested is false whereas token.IsCancellationRequested is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is in Never:
var task = new TaskCompletionSource<bool>();
_registration = _token.Register(state =>
((TaskCompletionSource<bool>)state).SetCanceled(), task);
return new ValueTask<bool>(task.Task);When the TaskCompletionSource is unblocked by SetCanceled, there is no link to the _token there and the task produces an unrelated TaskCanceledException. I don't know how to get the token into that exception. Would a SetException work with a well prepared TaskCancellationException instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe with TrySetCancel(_token).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for Never, that should be changed to carry the correct token on the exception for sure. TrySetCanceled with a token is the right thing to use there, as @quinmars pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bartdesmet Okay, are you saying let this particular catch not be predicated on the token, just catch all TaskCanceledException and ignore them.
As for not losing errors, RxJava uses a globar error handler callback that can be hooked and the undeliverable or suppressed exceptions be consumed/logged that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, giving this is AwaitMoveNextAsyncAndDispose, I think it would be fair to catch all cancellations. However...
As I was looking over the code again, I noticed something interesting, namely that we await the outcome of Task.WhenAll for all the losers from the finally block, where we may be propagating an exception from enumerating over the winner. If our awaiting of the losers throws an exception, it will supersede the original exception, which would be counterintuitive. As such, it would be better to let exceptions for losers escape and always give priority to the winner's enumeration outcome (successful or exceptional) and not bother propagating losers' exceptions from finally.
The design point I'm looking for is to be analogous to Task.WhenAny's behavior which is Amb for tasks. There, too, exceptions from losing tasks are not propagated. This said, it's a bit of a contrived analogy, given that waiting for the losers' outcomes for Task.WhenAny would amount to doing a Task.WhenAll, which would render it useless. There's no second rendez-vous with a task, unlike on async enumerators where all subsequent async operations have another chance of observing outcomes of losers. I still think though that the Task.WhenAny analogy is valid, and maybe the design point for Amb should be to be equivalent to await Task.WhenAny(enumerators.Select(e => e.MoveNextAsync().ToTask())) for the first call to MoveNextAsync, thus letting the losing tasks have their exceptions unhandled. We would also no longer block in the finally handler for any losing tasks.
Maybe the following equivalence should hold as well? Given:
static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this Task<T> t)
{
yield return await t;
}the following should hold:
static Task<bool> AssertAsync<T>(params Func<Task<T>> taskFactories)
{
var tasks = taskFactories.Select(tf => tf()).ToArray();
var taskAny = await ((Task<T>)await Task.WhenAny(tasks));
var enums = taskFactories.Select(tf => tf().ToAsyncEnumerable()).ToArray();
var enumAny = await AsyncEnumerableEx.Amb(enums).SingleAsync();
return taskAny == enumAny;
}both in terms of returned value (assuming the tasks are completing deterministically across both sides of the assert), exception propagation behavior, and timing behavior in the face of non-terminating tasks (i.e. Task.WhenAny would complete if any task terminates, while our current Amb would block indefinitely).
With such a design, it'd be easier to explain IAsyncEnumerator<T> as a multi-shot continuation equivalent of Task<T>, which effectively is how async iterators are implemented and designed as well (using a single IValueTaskSource shared instance being used for the returned ValueTask<bool> values returns from MoveNetxtAsync).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the when from catch. I don't fully understand what you mean by the WhenAll/WhenAny part. Perhaps it could be resolved in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will give it another thought and figure out what makes most sense to do here.
This PR changes the
Amb()implementation to cancel the individual losers when there is a winner, which should unblock sources such asNever. I also added the missing unit tests to verifyAmb.