Skip to content

Conversation

@akarnokd
Copy link
Collaborator

This PR reimplements the Timeout.Relative operator with a lock-free set of methods. The atomic index change by one ensures that the main source won; when the index atomically changes to long.MaxValue the timeout won. Emissions after these transitions are guaranteed to be thread-safe thus no lock is needed.

The PR also inlined the two SingleAssignmentDisposables and the StableCompositeDisposable return. However, to avoid a cyclic Dispose due to the cancel input saved in Sink, I have exposed just the clearing of the downstream observable and otherwise the Dispose(bool) disposes the inlined IDisposables.

If this style is acceptable, the other variants will be updated to lock-free as well in a separate PR.

@danielcweber
Copy link
Collaborator

Can we first have these Disposable-Helper-methods with SetSingle, SetSerial, SetMultiple, GetIsDisposed, Dispose taking those ref-parameters? I am currently working on this, it will reduce a lot of repetitive code, especially those volatile fields everywhere and lots of Interlocked stuff. This will be a lot cleaner afterwards and more intention revealing wrt. the disposable fields.

@akarnokd
Copy link
Collaborator Author

Yes, I was planning on doing that. We can have just the helper merged, then a separate PR can go over all classes and change to them.

@danielcweber
Copy link
Collaborator

Ok, I am working on the helper currently.

@danielcweber
Copy link
Collaborator

Helper was merged.

public override void OnNext(TSource value)
{
var idx = Volatile.Read(ref _index);
if (idx != long.MaxValue && Interlocked.CompareExchange(ref _index, idx + 1, idx) == idx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many messages would it have to produce per second for how many years to actually break this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you could produce 1 message per nanosecond, this would overflow in 500+ years.

Interlocked.Exchange(ref _cancel, null)?.Dispose();
}

protected void ClearObserver()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why divide this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _cancel holds a SingleAssignmentDisposable which holds this in itself and would call Dispose again on the implementing instance. In fact, any newer Run implementation returning this is prone to this wasteful call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an issue I have been trying to wrap my head around for some time now. Can we defer this change for later, going with the wasteful call for now, because it would add more complexity to the Sink and I am thinking around ways to lower complexity of Sink.

@danielcweber danielcweber merged commit 795a430 into dotnet:master Jun 1, 2018
@akarnokd akarnokd deleted the LockFreeTimeoutRelative branch June 26, 2018 21:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants