Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private CurrentThreadScheduler()
[ThreadStatic]
private static IStopwatch s_clock;

[ThreadStatic]
private static bool running;

private static SchedulerQueue<TimeSpan> GetQueue() => s_threadLocalQueue;

private static void SetQueue(SchedulerQueue<TimeSpan> newQueue)
Expand Down Expand Up @@ -61,7 +64,79 @@ private static TimeSpan Time
/// Gets a value that indicates whether the caller must call a Schedule method.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static bool IsScheduleRequired => GetQueue() == null;
public static bool IsScheduleRequired => !running;

/// <summary>
/// Schedules an action to be executed immediately.
/// </summary>
/// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
/// <param name="state">State passed to the action to be executed.</param>
/// <param name="action">Action to be executed.</param>
/// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pack it into the timed overload for the dueTime == 0 case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I'm trying to avoid the time arithmetic overhead.

{
if (action == null)
throw new ArgumentNullException(nameof(action));

var queue = default(SchedulerQueue<TimeSpan>);

// There is no timed task and no task is currently running
if (!running)
{
running = true;

// execute directly without queueing
IDisposable d;
try
{
d = action(this, state);
}
catch
{
SetQueue(null);
running = false;
throw;
}

// did recursive tasks arrive?
queue = GetQueue();

// yes, run those in the queue as well
if (queue != null)
{
try
{
Trampoline.Run(queue);
}
finally
{
SetQueue(null);
running = false;
}
}
else
{
running = false;
}

return d;
}

queue = GetQueue();

// if there is a task running or there is a queue
if (queue == null)
{
queue = new SchedulerQueue<TimeSpan>(4);
SetQueue(queue);
}

// queue up more work
var si = new ScheduledItem<TimeSpan, TState>(this, state, action, Time);
queue.Enqueue(si);
return si;
}

/// <summary>
/// Schedules an action to be executed after dueTime.
Expand All @@ -86,22 +161,23 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
if (queue == null)
{
queue = new SchedulerQueue<TimeSpan>(4);
queue.Enqueue(si);

SetQueue(queue);
}
queue.Enqueue(si);

if (!running)
{
running = true;
try
{
Trampoline.Run(queue);
}
finally
{
SetQueue(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the queue is sufficiently small (maybe < 16 or so), we could think about just clearing the queue instead of recreating one next time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if the current capacity is actually available on this type of queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it doesn't work. There is no current capacity to ScheduledQueue and the PriorityQueue shrinks itself when the number of items is 25% of its internal capacity upon remove/dequeue.

running = false;
}
}
else
{
queue.Enqueue(si);
}

return si;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ namespace System.Reactive.Concurrency
[System.ObsoleteAttribute("This instance property is no longer supported. Use CurrentThreadScheduler.IsSched" +
"uleRequired instead.")]
public bool ScheduleRequired { get; }
public override System.IDisposable Schedule<TState>(TState state, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
public override System.IDisposable Schedule<TState>(TState state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
}
public sealed class DefaultScheduler : System.Reactive.Concurrency.LocalScheduler, System.Reactive.Concurrency.ISchedulerPeriodic
Expand Down