Skip to content

Provide a blocking pool for all runtimes #588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
carllerche opened this issue Aug 28, 2018 · 7 comments
Closed

Provide a blocking pool for all runtimes #588

carllerche opened this issue Aug 28, 2018 · 7 comments
Labels
C-enhancement Category: A PR with an enhancement or bugfix.
Milestone

Comments

@carllerche
Copy link
Member

carllerche commented Aug 28, 2018

Currently, tokio-threadpool provides a blocking API (#317) that enables tasks to perform operations that would block the current thread.

This API only works with the concurrent Runtime, which means that the current_thread::Runtime is unable to perform blocking operations and use tokio-fs.

There should be a blocking pool strategy that is usable from all runtimes.

This strategy should avoid unbounded buffering problems that a traditional threadpool + queue strategy would have.

At a high level, there would most likely be some new functions added to the Executor trait in tokio_executor. One would could take a Box<FnOnce()> that contains a blocking operation:

fn dispatch_blocking(&self, op: Box<FnOnce()>);

This function would be sent to a threadpool to run. This function would also be unbounded. Then, we could use a Semaphore to limit concurrent blocking ops. I'm not sure what the best way to expose this would be yet... but we could start by encapsulating it in each library. For example, there would be a tokio-fs limit and that lib would be responsible for storing the semaphore somewhere.

There still needs to be a way to perform the "fast block" strategy that tokio-threadpool uses. This may need another function on Executor:

fn enter_blocking_section(&self) -> Result<(), ???>;

fn exit_blocking_section(&self);

The logic for blocking would be to first, try to enter a blocking section. If that fails, dispatch an op to dispatch_blocking.

Rel: #432

@dekellum
Copy link
Contributor

dekellum commented Aug 8, 2019

Some observations on dispatch_blocking (for the current thread runtime):

There needs to be some way to retrieve a return value for the blocking operation closure when it completes. For example in tokio-fs, the async fn's need to return things like the number of bytes actually written or read, etc. The current task should also be woken up on that completion, so it seems to me that dispatch_blocking should return a light-weight future type. Here is a prototyped adjusted signature:

/// Dispatch a blocking operation in the closure to a non-reactor thread, and
/// return a future representing its return value.
pub fn dispatch_blocking<T>(f: Box<dyn FnOnce() -> T + Send>)
    -> DispatchBlocking<T>
    where T: Send + 'static;

In an async fn, one would typically await the returned DispatchBlocking future. In a manual Future implementation (or Stream or Sync) the DispatchBlocking future would be stored internally and polled when the outer implementation is polled, until completion. So per implementation instance, I think calls to dispatch_blocking are practically bounded to 1 at a time. Said another way, if for example you are implementing async write operations (or a Sink) then you can't just call dispatch_blocking in a unbounded fashion without waiting, because:

  • this is unbounded buffering
  • if a thread pool is servicing these operations from a shared queue, then writes could take place out of order.

Thus, while I do think the Semaphore concept is useful for the concurrent (tokio-threadpool) runtime, I am not sure what role it should play with dispatch_blocking? Wouldn't just configuring a maximum number of blocking threads for a pool associated with each (or all) current thread runtime(s), be sufficient?

I have some other thoughts on interface with Semaphore and enter/exit_blocking_section with the concurrent (tokio-threadpool) runtime, which I will share later.

@carllerche
Copy link
Member Author

@dekellum The dispatch_blocking fn needs to be trait object compatible, so it cannot have a generic T. My thought is that the FnOnce passed in would contain a oneshot to return the value.

re: the semaphore, my thought is just that limiting concurrency would be left to a higher level API and not be a concern of dispatch_blocking.

@dekellum
Copy link
Contributor

dekellum commented Aug 10, 2019

I have a proof of concept implementation focused on the api/safety/ergonomics, so far out of tree, here:

https://github.com/dekellum/blocking-permit/blob/master/src/lib.rs

Please take a look. To summarize briefly, this culminates not surprisingly in a permit_or_dispatch! macro:

permit_or_dispatch!(|| { /*.. blocking code..*/ });
permit_or_dispatch!(&semaphore, || { /*.. blocking code..*/ });

Helper macro for use in the context of an async block or function, repeating the same code block in thread if blocking_permit_future (or blocking_permit) succeeds, or Box'ed in a call to dispatch_blocking, if IsReactorThread is returned.

If the first argument is a Semaphore reference, uses blocking_permit_future with that semaphore, otherwise uses blocking_permit (unlimited).


If all usage is via this macro, then arranging internal to the macro, the oneshot for return from dispatch_blocking is not so bad. However, as for example I maintain Streams and Sinks that will need to use some form of this facility, necessitating manual implementation instead of await blocks, the above proposed generic free-function dispatch_blocking signature is preferable, to avoid the boilerplate. Couldn't this be supported as a free function like spawn is, using thread local to route to the appropriate executor? Alternatively, if that is too burdensome, then perhaps there is a a minimal, composable macro that can be usable in the manual case as well.

Any parts of this you think might work as PR(s) to Tokio? Also I don't have a great sence of how to organize this with regard to current/future tokio crate boundaries, so could use some suggestions there.

@dekellum
Copy link
Contributor

dekellum commented Aug 13, 2019

I made several improvements on my PoC master branch.

carllerche added a commit that referenced this issue Aug 26, 2019
Provides a thread pool dedicated to running blocking operations (#588)
and update `tokio-fs` to use this pool.

In an effort to make incremental progress, this is an initial step
towards a final solution. First, it provides a very basic pool
implementation with the intend that the pool will be
replaced before the final release. Second, it updates `tokio-fs` to
always use this blocking pool instead of conditionally using
`threadpool::blocking`. Issue #588 contains additional discussion around
potential improvements to the "blocking for all" strategy.

The implementation provided here builds on work started in #954 and
continued in #1045. The general idea is th same as #1045, but the PR
improves on some of the details:

* The number of explicit operations tracked by `File` is reduced only to
  the ones that could interact. All other ops are spawned on the
  blocking pool without being tracked by the `File` instance.

* The `seek` implementation is not backed by a trait and `poll_seek`
  function. This avoids the question of how to model non-blocking seeks
  on top of a blocking file. In this patch, `seek` is represented as an
  `async fn`. If the associated future is dropped before the caller
  observes the return value, we make no effort to define the state in
  which the file ends up.
carllerche added a commit that referenced this issue Aug 27, 2019
Provides a thread pool dedicated to running blocking operations (#588)
and update `tokio-fs` to use this pool.

In an effort to make incremental progress, this is an initial step
towards a final solution. First, it provides a very basic pool
implementation with the intend that the pool will be
replaced before the final release. Second, it updates `tokio-fs` to
always use this blocking pool instead of conditionally using
`threadpool::blocking`. Issue #588 contains additional discussion around
potential improvements to the "blocking for all" strategy.

The implementation provided here builds on work started in #954 and
continued in #1045. The general idea is th same as #1045, but the PR
improves on some of the details:

* The number of explicit operations tracked by `File` is reduced only to
  the ones that could interact. All other ops are spawned on the
  blocking pool without being tracked by the `File` instance.

* The `seek` implementation is not backed by a trait and `poll_seek`
  function. This avoids the question of how to model non-blocking seeks
  on top of a blocking file. In this patch, `seek` is represented as an
  `async fn`. If the associated future is dropped before the caller
  observes the return value, we make no effort to define the state in
  which the file ends up.
@dekellum
Copy link
Contributor

dekellum commented Sep 3, 2019

In a similar incremental approach as your #1495, @carllerche, I'd like to make progress on the "blocking for all" goal (as in all runtimes and all types of "blocking" operations):

The ThreadPool enter_ and exit_blocking_section functions can be factored out of the current blocking function, in the form:

pub fn enter_blocking_section() -> Result<(), BlockingError> {}
pub fn exit_blocking_section() -> Result<(), BlockingError> {}

…with the existing BlockingError error type extended as needed. Initially this can be extended to cover the max_blocking limit exceeded case (which blocking relays as Poll::Pending), until this limit is eventually removed and that error case goes away. I'll submit a PR for this shortly.

@carllerche
Copy link
Member Author

On master.

@dekellum
Copy link
Contributor

Continuation of this topic, here: http://gravitext.com/2020/01/13/blocking-permit.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement Category: A PR with an enhancement or bugfix.
Projects
None yet
Development

No branches or pull requests

3 participants