From 0bf6700d5eea4463db13e2bf2c2313af453eb030 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 16:28:47 -0400 Subject: [PATCH 01/15] Add bevy_tasks crate to replace rayon. Move the github.com/lachlansneff/small-tasks prototype into bevy/crates/bevy_tasks. This is intended to replace rayon to provide better idling behavior. Initial commit Make scope return a vector of results Add par_chunk_map(_mut) and par_splat_map(_mut) apis onto slices Make calling thread do work until complete Remove current-thread doing work Ignore clion/intellij project files Remove dependency on futures-util Set up a builder for constructing the TaskPool with customizable thread count, stack size, and thread name. Add a couple examples to demonstrate busy and idle workloads. More detail on thread name doc comments and static build() function on TaskPool Update to README (add description, status, license) Reflect rayon's ThreadPoolBuilder api in TaskPoolBuilder Co-authored-by: Philip Degarmo --- crates/bevy_tasks/.gitignore | 3 + crates/bevy_tasks/Cargo.toml | 13 + crates/bevy_tasks/LICENSE-APACHE | 201 +++++++++++++++ crates/bevy_tasks/LICENSE-MIT | 23 ++ crates/bevy_tasks/README.md | 52 ++++ crates/bevy_tasks/examples/busy_behavior.rs | 33 +++ crates/bevy_tasks/examples/idle_behavior.rs | 31 +++ crates/bevy_tasks/src/lib.rs | 261 ++++++++++++++++++++ crates/bevy_tasks/src/slice.rs | 106 ++++++++ 9 files changed, 723 insertions(+) create mode 100644 crates/bevy_tasks/.gitignore create mode 100644 crates/bevy_tasks/Cargo.toml create mode 100644 crates/bevy_tasks/LICENSE-APACHE create mode 100644 crates/bevy_tasks/LICENSE-MIT create mode 100644 crates/bevy_tasks/README.md create mode 100644 crates/bevy_tasks/examples/busy_behavior.rs create mode 100644 crates/bevy_tasks/examples/idle_behavior.rs create mode 100644 crates/bevy_tasks/src/lib.rs create mode 100644 crates/bevy_tasks/src/slice.rs diff --git a/crates/bevy_tasks/.gitignore b/crates/bevy_tasks/.gitignore new file mode 100644 index 0000000000000..6a59f558ae40d --- /dev/null +++ b/crates/bevy_tasks/.gitignore @@ -0,0 +1,3 @@ +/target +Cargo.lock +/.idea \ No newline at end of file diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml new file mode 100644 index 0000000000000..786928a0d020f --- /dev/null +++ b/crates/bevy_tasks/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "smalltask" +version = "0.1.0" +authors = ["Lachlan Sneff "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +multitask = "0.2" +num_cpus = "1" +parking = "1" +pollster = "0.2" diff --git a/crates/bevy_tasks/LICENSE-APACHE b/crates/bevy_tasks/LICENSE-APACHE new file mode 100644 index 0000000000000..139dce95d473c --- /dev/null +++ b/crates/bevy_tasks/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/crates/bevy_tasks/LICENSE-MIT b/crates/bevy_tasks/LICENSE-MIT new file mode 100644 index 0000000000000..468cd79a8f6e5 --- /dev/null +++ b/crates/bevy_tasks/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md new file mode 100644 index 0000000000000..5c2787957146e --- /dev/null +++ b/crates/bevy_tasks/README.md @@ -0,0 +1,52 @@ +# small-task + +This is a simple threadpool with minimal dependencies. The main usecase is a scoped fork-join, i.e. spawning tasks from +a single thread and having that thread await the completion of those tasks. This is intended specifically for +[`bevy`][bevy] as a lighter alternative to [`rayon`][rayon] for this specific usecase. There are also utilities for +generating the tasks from a slice of data. This library is intended for games and makes no attempt to ensure fairness +or ordering of spawned tasks. + +It is based on [`multitask`][multitask], a lightweight executor that allows the end user to manage their own threads. +`multitask` is based on async-task, a core piece of async-std. + +[bevy]: https://bevyengine.org +[rayon]: https://github.com/rayon-rs/rayon +[multitask]: https://github.com/stjepang/multitask + +## Dependencies + +A very small dependency list is a key feature of this library + +``` +├── multitask +│ ├── async-task +│ ├── concurrent-queue +│ │ └── cache-padded +│ └── fastrand +├── num_cpus +│ └── libc +├── parking +└── pollster +``` + +## Status + +This is an unpublished prototype intended for eventual inclusion with `bevy`. + +## License + +Licensed under either of + +* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) +* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally +submitted for inclusion in the work by you, as defined in the Apache-2.0 +license, shall be dual licensed as above, without any additional terms or +conditions. + +See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT). diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs new file mode 100644 index 0000000000000..a596b1b6ac7f7 --- /dev/null +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -0,0 +1,33 @@ +use smalltask::TaskPoolBuilder; + +// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin +// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical +// cores) + +fn main() { + let pool = TaskPoolBuilder::new() + .thread_name("Busy Behavior ThreadPool".to_string()) + .num_threads(4) + .build(); + + let t0 = std::time::Instant::now(); + pool.scope(|s| { + for i in 0..40 { + s.spawn(async move { + let now = std::time::Instant::now(); + while std::time::Instant::now() - now < std::time::Duration::from_millis(100) { + // spin, simulating work being done + } + + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + i + ); + }) + } + }); + + let t1 = std::time::Instant::now(); + println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); +} diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs new file mode 100644 index 0000000000000..867fe5cfd6ff3 --- /dev/null +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -0,0 +1,31 @@ +use smalltask::TaskPoolBuilder; + +// This sample demonstrates a thread pool with one thread per logical core and only one task +// spinning. Other than the one thread, the system should remain idle, demonstrating good behavior +// for small workloads. + +fn main() { + let pool = TaskPoolBuilder::new() + .thread_name("Idle Behavior ThreadPool".to_string()) + .build(); + + pool.scope(|s| { + for i in 0..1 { + s.spawn(async move { + println!("Blocking for 10 seconds"); + let now = std::time::Instant::now(); + while std::time::Instant::now() - now < std::time::Duration::from_millis(10000) { + // spin, simulating work being done + } + + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + i + ); + }) + } + }); + + println!("all tasks finished"); +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs new file mode 100644 index 0000000000000..a83fd32537ae0 --- /dev/null +++ b/crates/bevy_tasks/src/lib.rs @@ -0,0 +1,261 @@ +use multitask::{Executor, Task}; +use parking::Unparker; +use std::{ + fmt::{self, Debug}, + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +mod slice; + +macro_rules! pin_mut { + ($($x:ident),*) => { $( + // Move the value to ensure that it is owned + let mut $x = $x; + // Shadow the original binding so that it can't be directly accessed + // ever again. + #[allow(unused_mut)] + let mut $x = unsafe { + Pin::new_unchecked(&mut $x) + }; + )* } +} + +/// Used to create a TaskPool +#[derive(Debug, Default, Clone)] +pub struct TaskPoolBuilder { + /// If set, we'll set up the thread pool to use at most n threads. Otherwise use + /// the logical core count of the system + num_threads: Option, + /// If set, we'll use the given stack size rather than the system default + stack_size: Option, + /// Allows customizing the name of the threads - helpful for debugging. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + thread_name: Option, +} + +impl TaskPoolBuilder { + /// Creates a new TaskPoolBuilder instance + pub fn new() -> Self { + Self::default() + } + + /// Override the number of threads created for the pool. If unset, we default to the number + /// of logical cores of the system + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = Some(num_threads); + self + } + + /// Override the stack size of the threads created for the pool + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// Override the name of the threads created for the pool. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + pub fn thread_name(mut self, thread_name: String) -> Self { + self.thread_name = Some(thread_name); + self + } + + /// Creates a new ThreadPoolBuilder based on the current options. + pub fn build(&self) -> TaskPool { + TaskPool::new_internal( + self.num_threads, + self.stack_size, + self.thread_name.as_deref(), + ) + } +} + +pub struct TaskPool { + executor: Arc, + threads: Vec<(JoinHandle<()>, Arc)>, + shutdown_flag: Arc, +} + +impl TaskPool { + // Create a `TaskPool` with the default configuration. + pub fn new() -> TaskPool { + TaskPoolBuilder::new().build() + } + + fn new_internal( + num_threads: Option, + stack_size: Option, + thread_name: Option<&str>, + ) -> Self { + let executor = Arc::new(Executor::new()); + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + let num_threads = num_threads.unwrap_or_else(num_cpus::get); + + let threads = (0..num_threads) + .map(|i| { + let ex = Arc::clone(&executor); + let flag = Arc::clone(&shutdown_flag); + let (p, u) = parking::pair(); + let unparker = Arc::new(u); + let u = Arc::clone(&unparker); + // Run an executor thread. + + let thread_name = if let Some(thread_name) = thread_name { + format!("{} ({})", thread_name, i) + } else { + format!("TaskPool ({})", i) + }; + + let mut thread_builder = thread::Builder::new().name(thread_name); + + if let Some(stack_size) = stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + let handle = thread_builder + .spawn(move || { + let ticker = ex.ticker(move || u.unpark()); + loop { + if flag.load(Ordering::Acquire) { + break; + } + + if !ticker.tick() { + p.park(); + } + } + }) + .expect("failed to spawn thread"); + + (handle, unparker) + }) + .collect(); + + Self { + executor, + threads, + shutdown_flag, + } + } + + pub fn thread_num(&self) -> usize { + self.threads.len() + } + + pub fn scope<'scope, F, T>(&self, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + // let ex = Arc::clone(&self.executor); + let executor: &'scope Executor = unsafe { mem::transmute(&*self.executor) }; + + let fut = async move { + let mut scope = Scope { + executor, + spawned: Vec::new(), + }; + + f(&mut scope); + + let mut results = Vec::with_capacity(scope.spawned.len()); + for task in scope.spawned { + results.push(task.await); + } + + results + }; + + pin_mut!(fut); + + // let fut: Pin<&mut (dyn Future + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = + unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; + + pollster::block_on(self.executor.spawn(fut)) + } + + pub fn shutdown(self) -> Result<(), ThreadPanicked> { + let mut this = self; + this.shutdown_internal() + } + + fn shutdown_internal(&mut self) -> Result<(), ThreadPanicked> { + self.shutdown_flag.store(true, Ordering::Release); + + for (_, unparker) in &self.threads { + unparker.unpark(); + } + for (join_handle, _) in self.threads.drain(..) { + join_handle + .join() + .expect("task thread panicked while executing"); + } + Ok(()) + } +} + +impl Drop for TaskPool { + fn drop(&mut self) { + self.shutdown_internal().unwrap(); + } +} + +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct ThreadPanicked(()); + +impl Debug for ThreadPanicked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "a task thread panicked during execution") + } +} + +pub struct Scope<'scope, T> { + executor: &'scope Executor, + spawned: Vec>, +} + +impl<'scope, T: Send + 'static> Scope<'scope, T> { + pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + let fut: Pin + 'scope + Send>> = Box::pin(f); + let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; + + let task = self.executor.spawn(fut); + self.spawned.push(task); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_spawn() { + let pool = TaskPool::new(); + + let foo = Box::new(42); + + let outputs = pool.scope(|scope| { + for _ in 0..1000 { + scope.spawn(async { + if *foo != 42 { + panic!("not 42!?!?") + } else { + *foo + } + }); + } + }); + + for output in outputs { + assert_eq!(output, 42); + } + } +} diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs new file mode 100644 index 0000000000000..b0a3eb00f5305 --- /dev/null +++ b/crates/bevy_tasks/src/slice.rs @@ -0,0 +1,106 @@ +use super::TaskPool; + +pub trait ParallelSlice: AsRef<[T]> { + fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + where + F: Fn(&[T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_ref(); + let f = &f; + task_pool.scope(|scope| { + for chunk in slice.chunks(chunk_size) { + scope.spawn(async move { f(chunk) }); + } + }) + } + + fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec + where + F: Fn(&[T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_ref(); + let chunk_size = std::cmp::max( + 1, + std::cmp::max( + slice.len() / task_pool.thread_num(), + slice.len() / max_tasks.unwrap_or(usize::MAX), + ), + ); + + slice.par_chunk_map(task_pool, chunk_size, f) + } +} + +impl ParallelSlice for S where S: AsRef<[T]> {} + +pub trait ParallelSliceMut: AsMut<[T]> { + fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + where + F: Fn(&mut [T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_mut(); + let f = &f; + task_pool.scope(|scope| { + for chunk in slice.chunks_mut(chunk_size) { + scope.spawn(async move { f(chunk) }); + } + }) + } + + fn par_splat_map_mut( + &mut self, + task_pool: &TaskPool, + max_tasks: Option, + f: F, + ) -> Vec + where + F: Fn(&mut [T]) -> R + Send + Sync, + R: Send + 'static, + { + let mut slice = self.as_mut(); + let chunk_size = std::cmp::max( + 1, + std::cmp::max( + slice.len() / task_pool.thread_num(), + slice.len() / max_tasks.unwrap_or(usize::MAX), + ), + ); + + slice.par_chunk_map_mut(task_pool, chunk_size, f) + } +} + +impl ParallelSliceMut for S where S: AsMut<[T]> {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_par_chunks_map() { + let v = vec![42; 1000]; + let task_pool = TaskPool::new(); + + let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); + + println!("outputs: {:?}", outputs); + } + + #[test] + fn test_par_chunks_map_mut() { + let mut v = vec![42; 1000]; + let task_pool = TaskPool::new(); + + let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { + for number in numbers.iter_mut() { + *number *= 2; + } + numbers.iter().sum() + }); + + println!("outputs: {:?}", outputs); + } +} From fe6cf0d6474c445de373e275d3d859e2d87db012 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 17:22:29 -0400 Subject: [PATCH 02/15] More tuning of bevy_tasks api --- crates/bevy_tasks/Cargo.toml | 7 +++++-- crates/bevy_tasks/examples/busy_behavior.rs | 2 +- crates/bevy_tasks/examples/idle_behavior.rs | 2 +- crates/bevy_tasks/src/lib.rs | 13 +++++++++++-- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 786928a0d020f..e648b0c759ce9 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -1,7 +1,10 @@ [package] -name = "smalltask" +name = "bevy_tasks" version = "0.1.0" -authors = ["Lachlan Sneff "] +authors = [ + "Lachlan Sneff ", + "Philip Degarmo " +] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index a596b1b6ac7f7..26dfaeac285d9 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,4 +1,4 @@ -use smalltask::TaskPoolBuilder; +use bevy_tasks::TaskPoolBuilder; // This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin // for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs index 867fe5cfd6ff3..4a392cb2e6da9 100644 --- a/crates/bevy_tasks/examples/idle_behavior.rs +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -1,4 +1,4 @@ -use smalltask::TaskPoolBuilder; +use bevy_tasks::TaskPoolBuilder; // This sample demonstrates a thread pool with one thread per logical core and only one task // spinning. Other than the one thread, the system should remain idle, demonstrating good behavior diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index a83fd32537ae0..9047a520e99e0 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -13,6 +13,7 @@ use std::{ }; mod slice; +pub use slice::{ParallelSlice, ParallelSliceMut}; macro_rules! pin_mut { ($($x:ident),*) => { $( @@ -202,6 +203,12 @@ impl TaskPool { } } +impl Default for TaskPool { + fn default() -> Self { + Self::new() + } +} + impl Drop for TaskPool { fn drop(&mut self) { self.shutdown_internal().unwrap(); @@ -241,10 +248,12 @@ mod tests { let pool = TaskPool::new(); let foo = Box::new(42); + let foo = &*foo; let outputs = pool.scope(|scope| { - for _ in 0..1000 { - scope.spawn(async { + for i in 0..100 { + scope.spawn(async move { + println!("task {}", i); if *foo != 42 { panic!("not 42!?!?") } else { From 058e26c7162acca2c53f83cd85f4ee8088795cad Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 18:11:51 -0400 Subject: [PATCH 03/15] Add global task pool to bevy_tasks Add test of global pool Formatting --- crates/bevy_tasks/src/lib.rs | 71 +++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 9047a520e99e0..6cf128000513a 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -5,8 +5,9 @@ use std::{ future::Future, mem, pin::Pin, + ptr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicPtr, Ordering}, Arc, }, thread::{self, JoinHandle}, @@ -28,6 +29,8 @@ macro_rules! pin_mut { )* } } +static GLOBAL_TASK_POOL: AtomicPtr = AtomicPtr::new(ptr::null_mut()); + /// Used to create a TaskPool #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder { @@ -68,13 +71,23 @@ impl TaskPoolBuilder { } /// Creates a new ThreadPoolBuilder based on the current options. - pub fn build(&self) -> TaskPool { + pub fn build(self) -> TaskPool { TaskPool::new_internal( self.num_threads, self.stack_size, self.thread_name.as_deref(), ) } + + pub fn install(self) { + let pool = Box::leak(Box::new(self.build())); + if !GLOBAL_TASK_POOL + .compare_and_swap(ptr::null_mut(), pool, Ordering::SeqCst) + .is_null() + { + panic!("GLOBAL_TASK_POLL can only be set once"); + } + } } pub struct TaskPool { @@ -89,6 +102,23 @@ impl TaskPool { TaskPoolBuilder::new().build() } + #[inline(always)] + pub fn global() -> &'static TaskPool { + #[inline(never)] + #[cold] + fn do_panic() { + panic!( + "A global task pool must be installed before `TaskPool::global()` can be called." + ); + } + + let ptr = GLOBAL_TASK_POOL.load(Ordering::Acquire); + if ptr.is_null() { + do_panic(); + } + unsafe { &*ptr } + } + fn new_internal( num_threads: Option, stack_size: Option, @@ -239,6 +269,18 @@ impl<'scope, T: Send + 'static> Scope<'scope, T> { } } +pub fn scope<'scope, F, T>(f: F) -> Vec +where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, +{ + TaskPool::global().scope(f) +} + +pub fn global_thread_num() -> usize { + TaskPool::global().thread_num() +} + #[cfg(test)] mod tests { use super::*; @@ -267,4 +309,29 @@ mod tests { assert_eq!(output, 42); } } + + #[test] + pub fn test_global_spawn() { + TaskPoolBuilder::new().install(); + + let foo = Box::new(42); + let foo = &*foo; + + let outputs = scope(|scope| { + for i in 0..100 { + scope.spawn(async move { + println!("task {}", i); + if *foo != 42 { + panic!("not 42!?!?") + } else { + *foo + } + }); + } + }); + + for output in outputs { + assert_eq!(output, 42); + } + } } From d6cbe591508205107811463f31180d6aabfd6fe6 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 18:48:48 -0400 Subject: [PATCH 04/15] Replace uses of rayon with bevy_tasks --- crates/bevy_app/src/app.rs | 2 + crates/bevy_ecs/Cargo.toml | 2 +- .../src/schedule/parallel_executor.rs | 40 ++++++++++++++----- crates/bevy_ecs/src/schedule/schedule.rs | 10 ----- crates/bevy_tasks/README.md | 2 +- 5 files changed, 33 insertions(+), 23 deletions(-) diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index d5d1cfbc3c466..84e0bce2d9290 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -63,6 +63,8 @@ impl App { } pub fn run(mut self) { + ParallelExecutor::initialize_pools(&self.resources); + self.startup_schedule.initialize(&mut self.resources); self.startup_executor.run( &mut self.startup_schedule, diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index e524272b74b82..5e48b492645f4 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -15,8 +15,8 @@ profiler = [] [dependencies] bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" } +bevy_tasks = { path = "../bevy_tasks" } rand = "0.7.2" -rayon = "1.3" crossbeam-channel = "0.4.2" fixedbitset = "0.3.0" downcast-rs = "1.1.1" diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index a51ea4cff0f2d..808cef47022f6 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World}; use crossbeam_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; use parking_lot::Mutex; -use rayon::ScopeFifo; use std::{ops::Range, sync::Arc}; /// Executes each schedule stage in parallel by analyzing system dependencies. @@ -36,6 +35,17 @@ impl Default for ParallelExecutor { } impl ParallelExecutor { + pub fn initialize_pools(resources: &Resources) { + let task_pool_builder = resources + .get::() + .map(|options| (*options).clone()) + .unwrap_or_else(ParallelExecutorOptions::default) + .create_builder(); + + // For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here. + task_pool_builder.install(); + } + pub fn without_tracker_clears() -> Self { Self { clear_trackers: false, @@ -66,13 +76,13 @@ impl ParallelExecutor { } } -/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs. +/// This can be added as an app resource to control the global `bevy_tasks::TaskPool` used by ecs. // Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync. #[derive(Debug, Default, Clone)] pub struct ParallelExecutorOptions { - /// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`. + /// If some value, we'll set up the thread pool to use at most n threads. See `bevy_tasks::TaskPoolBuilder::num_threads`. num_threads: Option, - /// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`. + /// If some value, we'll set up the thread pool's' workers to the given stack size. See `bevy_tasks::TaskPoolBuilder::stack_size`. stack_size: Option, // TODO: Do we also need/want to expose other features (*_handler, etc.) } @@ -97,8 +107,8 @@ impl ParallelExecutorOptions { } /// Creates a new ThreadPoolBuilder based on the current options. - pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder { - let mut builder = rayon::ThreadPoolBuilder::new(); + pub(crate) fn create_builder(&self) -> bevy_tasks::TaskPoolBuilder { + let mut builder = bevy_tasks::TaskPoolBuilder::new(); if let Some(num_threads) = self.num_threads { builder = builder.num_threads(num_threads); @@ -262,7 +272,7 @@ impl ExecutorStage { &mut self, systems: &[Arc>>], run_ready_type: RunReadyType, - scope: &ScopeFifo<'run>, + scope: &mut bevy_tasks::Scope<'run, ()>, world: &'run World, resources: &'run Resources, ) -> RunReadyResult { @@ -308,11 +318,17 @@ impl ExecutorStage { // handle multi-threaded system let sender = self.sender.clone(); self.running_systems.insert(system_index); - scope.spawn_fifo(move |_| { + + scope.spawn(async move { let mut system = system.lock(); system.run(world, resources); sender.send(system_index).unwrap(); }); + // scope.spawn_fifo(move |_| { + // let mut system = system.lock(); + // system.run(world, resources); + // sender.send(system_index).unwrap(); + // }); systems_currently_running = true; } @@ -364,7 +380,8 @@ impl ExecutorStage { // if there are no upcoming thread local systems, run everything right now 0..systems.len() }; - rayon::scope_fifo(|scope| { + + bevy_tasks::scope(|scope| { run_ready_result = self.run_ready_systems( systems, RunReadyType::Range(run_ready_system_index_range), @@ -373,6 +390,7 @@ impl ExecutorStage { resources, ); }); + loop { // if all systems in the stage are finished, break out of the loop if self.finished_systems.count_ones(..) == systems.len() { @@ -393,7 +411,7 @@ impl ExecutorStage { run_ready_result = RunReadyResult::Ok; } else { // wait for a system to finish, then run its dependents - rayon::scope_fifo(|scope| { + bevy_tasks::scope(|scope| { loop { // if all systems in the stage are finished, break out of the loop if self.finished_systems.count_ones(..) == systems.len() { @@ -410,7 +428,7 @@ impl ExecutorStage { resources, ); - // if the next ready system is thread local, break out of this loop/rayon scope so it can be run + // if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run if let RunReadyResult::ThreadLocalReady(_) = run_ready_result { break; } diff --git a/crates/bevy_ecs/src/schedule/schedule.rs b/crates/bevy_ecs/src/schedule/schedule.rs index 83c0c6f8ceb1f..5a48cd79344ef 100644 --- a/crates/bevy_ecs/src/schedule/schedule.rs +++ b/crates/bevy_ecs/src/schedule/schedule.rs @@ -1,6 +1,5 @@ use crate::{ resource::Resources, - schedule::ParallelExecutorOptions, system::{System, SystemId, ThreadLocalExecution}, }; use bevy_hecs::World; @@ -171,15 +170,6 @@ impl Schedule { return; } - let thread_pool_builder = resources - .get::() - .map(|options| (*options).clone()) - .unwrap_or_else(ParallelExecutorOptions::default) - .create_builder(); - // For now, bevy_ecs only uses the global thread pool so it is sufficient to configure it once here. - // Dont call .unwrap() as the function is called twice.. - let _ = thread_pool_builder.build_global(); - for stage in self.stages.values_mut() { for system in stage.iter_mut() { let mut system = system.lock(); diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 5c2787957146e..35f6ad619e97f 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -1,4 +1,4 @@ -# small-task +# bevy_tasks This is a simple threadpool with minimal dependencies. The main usecase is a scoped fork-join, i.e. spawning tasks from a single thread and having that thread await the completion of those tasks. This is intended specifically for From 1a55abe683e8b585e7ee7197ae922e77e3be21d9 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 18:51:46 -0400 Subject: [PATCH 05/15] Fix leaked task_pool when attempting to install while a task pool was already installed globally --- crates/bevy_tasks/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 6cf128000513a..0822bd900c019 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -80,13 +80,14 @@ impl TaskPoolBuilder { } pub fn install(self) { - let pool = Box::leak(Box::new(self.build())); + let mut pool = Box::new(self.build()); if !GLOBAL_TASK_POOL - .compare_and_swap(ptr::null_mut(), pool, Ordering::SeqCst) + .compare_and_swap(ptr::null_mut(), &mut*pool, Ordering::SeqCst) .is_null() { panic!("GLOBAL_TASK_POLL can only be set once"); } + mem::forget(pool); } } From c76cb5f8500d48940bbc120ecb3aa7be64362091 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 19:55:00 -0400 Subject: [PATCH 06/15] Fix typo --- crates/bevy_tasks/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 0822bd900c019..f096eff53de96 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -85,7 +85,7 @@ impl TaskPoolBuilder { .compare_and_swap(ptr::null_mut(), &mut*pool, Ordering::SeqCst) .is_null() { - panic!("GLOBAL_TASK_POLL can only be set once"); + panic!("GLOBAL_TASK_POOL can only be set once"); } mem::forget(pool); } From cd7ce62b447ed1ad142747987b0233bede45a381 Mon Sep 17 00:00:00 2001 From: Philip Degarmo Date: Sun, 23 Aug 2020 19:05:57 -0700 Subject: [PATCH 07/15] Add spawn() public function to pass raw futures through to the executor (#1) Add a task newtype so that we don't expose multitask::Task Remove some of the README/license info that was present in the prototype repo Add some doc comments --- crates/bevy_tasks/.gitignore | 3 - crates/bevy_tasks/Cargo.toml | 1 + crates/bevy_tasks/LICENSE-APACHE | 201 ------------------------------- crates/bevy_tasks/LICENSE-MIT | 23 ---- crates/bevy_tasks/README.md | 26 +--- crates/bevy_tasks/src/lib.rs | 39 ++++-- crates/bevy_tasks/src/task.rs | 43 +++++++ 7 files changed, 78 insertions(+), 258 deletions(-) delete mode 100644 crates/bevy_tasks/.gitignore delete mode 100644 crates/bevy_tasks/LICENSE-APACHE delete mode 100644 crates/bevy_tasks/LICENSE-MIT create mode 100644 crates/bevy_tasks/src/task.rs diff --git a/crates/bevy_tasks/.gitignore b/crates/bevy_tasks/.gitignore deleted file mode 100644 index 6a59f558ae40d..0000000000000 --- a/crates/bevy_tasks/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/target -Cargo.lock -/.idea \ No newline at end of file diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index e648b0c759ce9..fef7a462b587a 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -2,6 +2,7 @@ name = "bevy_tasks" version = "0.1.0" authors = [ + "Bevy Contributors ", "Lachlan Sneff ", "Philip Degarmo " ] diff --git a/crates/bevy_tasks/LICENSE-APACHE b/crates/bevy_tasks/LICENSE-APACHE deleted file mode 100644 index 139dce95d473c..0000000000000 --- a/crates/bevy_tasks/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. \ No newline at end of file diff --git a/crates/bevy_tasks/LICENSE-MIT b/crates/bevy_tasks/LICENSE-MIT deleted file mode 100644 index 468cd79a8f6e5..0000000000000 --- a/crates/bevy_tasks/LICENSE-MIT +++ /dev/null @@ -1,23 +0,0 @@ -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 35f6ad619e97f..67aebf87c4bea 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -1,5 +1,7 @@ # bevy_tasks +A refreshingly simple task executor for bevy. :) + This is a simple threadpool with minimal dependencies. The main usecase is a scoped fork-join, i.e. spawning tasks from a single thread and having that thread await the completion of those tasks. This is intended specifically for [`bevy`][bevy] as a lighter alternative to [`rayon`][rayon] for this specific usecase. There are also utilities for @@ -15,7 +17,7 @@ It is based on [`multitask`][multitask], a lightweight executor that allows the ## Dependencies -A very small dependency list is a key feature of this library +A very small dependency list is a key feature of this module ``` ├── multitask @@ -28,25 +30,3 @@ A very small dependency list is a key feature of this library ├── parking └── pollster ``` - -## Status - -This is an unpublished prototype intended for eventual inclusion with `bevy`. - -## License - -Licensed under either of - -* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) -* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) - -at your option. - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally -submitted for inclusion in the work by you, as defined in the Apache-2.0 -license, shall be dual licensed as above, without any additional terms or -conditions. - -See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT). diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index f096eff53de96..1fe85ca3dc10e 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -1,4 +1,3 @@ -use multitask::{Executor, Task}; use parking::Unparker; use std::{ fmt::{self, Debug}, @@ -16,6 +15,9 @@ use std::{ mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; +mod task; +pub use task::Task; + macro_rules! pin_mut { ($($x:ident),*) => { $( // Move the value to ensure that it is owned @@ -91,14 +93,16 @@ impl TaskPoolBuilder { } } +/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by +/// the pool on threads owned by the pool. pub struct TaskPool { - executor: Arc, + executor: Arc, threads: Vec<(JoinHandle<()>, Arc)>, shutdown_flag: Arc, } impl TaskPool { - // Create a `TaskPool` with the default configuration. + /// Create a `TaskPool` with the default configuration. pub fn new() -> TaskPool { TaskPoolBuilder::new().build() } @@ -125,7 +129,7 @@ impl TaskPool { stack_size: Option, thread_name: Option<&str>, ) -> Self { - let executor = Arc::new(Executor::new()); + let executor = Arc::new(multitask::Executor::new()); let shutdown_flag = Arc::new(AtomicBool::new(false)); let num_threads = num_threads.unwrap_or_else(num_cpus::get); @@ -177,17 +181,22 @@ impl TaskPool { } } + /// Return the number of threads owned by the task pool pub fn thread_num(&self) -> usize { self.threads.len() } + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { - // let ex = Arc::clone(&self.executor); - let executor: &'scope Executor = unsafe { mem::transmute(&*self.executor) }; + let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; let fut = async move { let mut scope = Scope { @@ -214,6 +223,20 @@ impl TaskPool { pollster::block_on(self.executor.spawn(fut)) } + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be + /// cancelled and "detached" allowing it to continue running without having to be polled by the + /// end-user. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> impl Future + Send + where + T: Send + 'static, + { + self.executor.spawn(future) + } + + /// Joins all the threads in the thread pool. pub fn shutdown(self) -> Result<(), ThreadPanicked> { let mut this = self; this.shutdown_internal() @@ -256,8 +279,8 @@ impl Debug for ThreadPanicked { } pub struct Scope<'scope, T> { - executor: &'scope Executor, - spawned: Vec>, + executor: &'scope multitask::Executor, + spawned: Vec>, } impl<'scope, T: Send + 'static> Scope<'scope, T> { diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs new file mode 100644 index 0000000000000..041e7587b7c4e --- /dev/null +++ b/crates/bevy_tasks/src/task.rs @@ -0,0 +1,43 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Wraps `multitask::Task`, a spawned future. +/// +/// Tasks are also futures themselves and yield the output of the spawned future. +/// +/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit +/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. +/// +/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. +/// Wraps multitask::Task +pub struct Task(multitask::Task); + +impl Task { + /// Detaches the task to let it keep running in the background. See `multitask::Task::detach` + pub fn detach(self) { + self.0.detach(); + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// See `multitask::Task::cancel` + pub async fn cancel(self) -> Option { + self.0.cancel().await + } +} + +impl Future for Task { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safe because Task is pinned and contains multitask::Task by value + unsafe { self.map_unchecked_mut(|x| &mut x.0).poll(cx) } + } +} From 4232e751aaf6455bd54acda48738d4479664952b Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 23:19:13 -0400 Subject: [PATCH 08/15] Update bevy_tasks with new Usage type parameter and make the task pool a resource instead of an internal static --- crates/bevy_app/src/app.rs | 2 +- crates/bevy_ecs/src/resource/resources.rs | 4 + .../src/schedule/parallel_executor.rs | 15 +- crates/bevy_tasks/examples/busy_behavior.rs | 4 +- crates/bevy_tasks/examples/idle_behavior.rs | 4 +- crates/bevy_tasks/src/lib.rs | 155 +++++------------- crates/bevy_tasks/src/slice.rs | 31 +++- crates/bevy_tasks/src/task.rs | 8 +- crates/bevy_tasks/src/usages.rs | 4 + 9 files changed, 95 insertions(+), 132 deletions(-) create mode 100644 crates/bevy_tasks/src/usages.rs diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 84e0bce2d9290..c5a9fa59acf76 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -63,7 +63,7 @@ impl App { } pub fn run(mut self) { - ParallelExecutor::initialize_pools(&self.resources); + ParallelExecutor::initialize_pools(&mut self.resources); self.startup_schedule.initialize(&mut self.resources); self.startup_executor.run( diff --git a/crates/bevy_ecs/src/resource/resources.rs b/crates/bevy_ecs/src/resource/resources.rs index f92a716c609b2..0d9d5afba0a0f 100644 --- a/crates/bevy_ecs/src/resource/resources.rs +++ b/crates/bevy_ecs/src/resource/resources.rs @@ -42,6 +42,10 @@ impl Resources { self.get_resource_mut(ResourceIndex::Global) } + pub fn get_cloned(&self) -> Option { + self.get::().map(|r| (*r).clone()) + } + #[allow(clippy::needless_lifetimes)] pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option> { self.get_resource(ResourceIndex::System(id)) diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index 808cef47022f6..630fadb83f100 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -35,15 +35,16 @@ impl Default for ParallelExecutor { } impl ParallelExecutor { - pub fn initialize_pools(resources: &Resources) { - let task_pool_builder = resources + pub fn initialize_pools(resources: &mut Resources) { + let compute_pool: bevy_tasks::ComputePool = resources .get::() .map(|options| (*options).clone()) .unwrap_or_else(ParallelExecutorOptions::default) - .create_builder(); + .create_builder() + .build(); // For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here. - task_pool_builder.install(); + resources.insert(compute_pool); } pub fn without_tracker_clears() -> Self { @@ -344,6 +345,8 @@ impl ExecutorStage { systems: &[Arc>>], schedule_changed: bool, ) { + let compute_pool = resources.get_cloned::().unwrap(); + // if the schedule has changed, clear executor state / fill it with new defaults if schedule_changed { self.system_dependencies.clear(); @@ -381,7 +384,7 @@ impl ExecutorStage { 0..systems.len() }; - bevy_tasks::scope(|scope| { + compute_pool.scope(|scope| { run_ready_result = self.run_ready_systems( systems, RunReadyType::Range(run_ready_system_index_range), @@ -411,7 +414,7 @@ impl ExecutorStage { run_ready_result = RunReadyResult::Ok; } else { // wait for a system to finish, then run its dependents - bevy_tasks::scope(|scope| { + compute_pool.scope(|scope| { loop { // if all systems in the stage are finished, break out of the loop if self.finished_systems.count_ones(..) == systems.len() { diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 26dfaeac285d9..6c10175abcfb8 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,4 +1,4 @@ -use bevy_tasks::TaskPoolBuilder; +use bevy_tasks::{Compute, TaskPoolBuilder}; // This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin // for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical @@ -8,7 +8,7 @@ fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) .num_threads(4) - .build(); + .build::(); let t0 = std::time::Instant::now(); pool.scope(|s| { diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs index 4a392cb2e6da9..2fb370901a410 100644 --- a/crates/bevy_tasks/examples/idle_behavior.rs +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -1,4 +1,4 @@ -use bevy_tasks::TaskPoolBuilder; +use bevy_tasks::{Compute, TaskPoolBuilder}; // This sample demonstrates a thread pool with one thread per logical core and only one task // spinning. Other than the one thread, the system should remain idle, demonstrating good behavior @@ -7,7 +7,7 @@ use bevy_tasks::TaskPoolBuilder; fn main() { let pool = TaskPoolBuilder::new() .thread_name("Idle Behavior ThreadPool".to_string()) - .build(); + .build::(); pool.scope(|s| { for i in 0..1 { diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1fe85ca3dc10e..5075f5fc55e54 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -1,12 +1,11 @@ use parking::Unparker; use std::{ - fmt::{self, Debug}, future::Future, + marker::PhantomData, mem, pin::Pin, - ptr, sync::{ - atomic::{AtomicBool, AtomicPtr, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, thread::{self, JoinHandle}, @@ -18,6 +17,11 @@ pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; +mod usages; +pub use usages::Compute; + +pub type ComputePool = TaskPool; + macro_rules! pin_mut { ($($x:ident),*) => { $( // Move the value to ensure that it is owned @@ -31,8 +35,6 @@ macro_rules! pin_mut { )* } } -static GLOBAL_TASK_POOL: AtomicPtr = AtomicPtr::new(ptr::null_mut()); - /// Used to create a TaskPool #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder { @@ -73,57 +75,49 @@ impl TaskPoolBuilder { } /// Creates a new ThreadPoolBuilder based on the current options. - pub fn build(self) -> TaskPool { + pub fn build(self) -> TaskPool { TaskPool::new_internal( self.num_threads, self.stack_size, self.thread_name.as_deref(), ) } +} + +struct TaskPoolInternal { + threads: Vec<(JoinHandle<()>, Arc)>, + shutdown_flag: Arc, +} - pub fn install(self) { - let mut pool = Box::new(self.build()); - if !GLOBAL_TASK_POOL - .compare_and_swap(ptr::null_mut(), &mut*pool, Ordering::SeqCst) - .is_null() - { - panic!("GLOBAL_TASK_POOL can only be set once"); +impl Drop for TaskPoolInternal { + fn drop(&mut self) { + self.shutdown_flag.store(true, Ordering::Release); + + for (_, unparker) in &self.threads { + unparker.unpark(); + } + for (join_handle, _) in self.threads.drain(..) { + join_handle + .join() + .expect("task thread panicked while executing"); } - mem::forget(pool); } } /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. -pub struct TaskPool { +pub struct TaskPool { executor: Arc, - threads: Vec<(JoinHandle<()>, Arc)>, - shutdown_flag: Arc, + internal: Arc, + _marker: PhantomData, } -impl TaskPool { +impl TaskPool { /// Create a `TaskPool` with the default configuration. - pub fn new() -> TaskPool { + pub fn new() -> Self { TaskPoolBuilder::new().build() } - #[inline(always)] - pub fn global() -> &'static TaskPool { - #[inline(never)] - #[cold] - fn do_panic() { - panic!( - "A global task pool must be installed before `TaskPool::global()` can be called." - ); - } - - let ptr = GLOBAL_TASK_POOL.load(Ordering::Acquire); - if ptr.is_null() { - do_panic(); - } - unsafe { &*ptr } - } - fn new_internal( num_threads: Option, stack_size: Option, @@ -176,14 +170,17 @@ impl TaskPool { Self { executor, - threads, - shutdown_flag, + internal: Arc::new(TaskPoolInternal { + threads, + shutdown_flag, + }), + _marker: PhantomData, } } /// Return the number of threads owned by the task pool pub fn thread_num(&self) -> usize { - self.threads.len() + self.internal.threads.len() } /// Allows spawning non-`static futures on the thread pool. The function takes a callback, @@ -235,46 +232,21 @@ impl TaskPool { { self.executor.spawn(future) } - - /// Joins all the threads in the thread pool. - pub fn shutdown(self) -> Result<(), ThreadPanicked> { - let mut this = self; - this.shutdown_internal() - } - - fn shutdown_internal(&mut self) -> Result<(), ThreadPanicked> { - self.shutdown_flag.store(true, Ordering::Release); - - for (_, unparker) in &self.threads { - unparker.unpark(); - } - for (join_handle, _) in self.threads.drain(..) { - join_handle - .join() - .expect("task thread panicked while executing"); - } - Ok(()) - } } -impl Default for TaskPool { +impl Default for TaskPool { fn default() -> Self { Self::new() } } -impl Drop for TaskPool { - fn drop(&mut self) { - self.shutdown_internal().unwrap(); - } -} - -#[derive(Copy, Clone, Eq, PartialEq)] -pub struct ThreadPanicked(()); - -impl Debug for ThreadPanicked { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "a task thread panicked during execution") +impl Clone for TaskPool { + fn clone(&self) -> Self { + Self { + executor: Arc::clone(&self.executor), + internal: Arc::clone(&self.internal), + _marker: PhantomData, + } } } @@ -293,25 +265,13 @@ impl<'scope, T: Send + 'static> Scope<'scope, T> { } } -pub fn scope<'scope, F, T>(f: F) -> Vec -where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, - T: Send + 'static, -{ - TaskPool::global().scope(f) -} - -pub fn global_thread_num() -> usize { - TaskPool::global().thread_num() -} - #[cfg(test)] mod tests { use super::*; #[test] pub fn test_spawn() { - let pool = TaskPool::new(); + let pool = TaskPool::::new(); let foo = Box::new(42); let foo = &*foo; @@ -333,29 +293,4 @@ mod tests { assert_eq!(output, 42); } } - - #[test] - pub fn test_global_spawn() { - TaskPoolBuilder::new().install(); - - let foo = Box::new(42); - let foo = &*foo; - - let outputs = scope(|scope| { - for i in 0..100 { - scope.spawn(async move { - println!("task {}", i); - if *foo != 42 { - panic!("not 42!?!?") - } else { - *foo - } - }); - } - }); - - for output in outputs { - assert_eq!(output, 42); - } - } } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index b0a3eb00f5305..bc2465a77a9fa 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,7 +1,12 @@ use super::TaskPool; pub trait ParallelSlice: AsRef<[T]> { - fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + fn par_chunk_map( + &self, + task_pool: &TaskPool, + chunk_size: usize, + f: F, + ) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -15,7 +20,12 @@ pub trait ParallelSlice: AsRef<[T]> { }) } - fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec + fn par_splat_map( + &self, + task_pool: &TaskPool, + max_tasks: Option, + f: F, + ) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -36,7 +46,12 @@ pub trait ParallelSlice: AsRef<[T]> { impl ParallelSlice for S where S: AsRef<[T]> {} pub trait ParallelSliceMut: AsMut<[T]> { - fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + fn par_chunk_map_mut( + &mut self, + task_pool: &TaskPool, + chunk_size: usize, + f: F, + ) -> Vec where F: Fn(&mut [T]) -> R + Send + Sync, R: Send + 'static, @@ -50,9 +65,9 @@ pub trait ParallelSliceMut: AsMut<[T]> { }) } - fn par_splat_map_mut( + fn par_splat_map_mut( &mut self, - task_pool: &TaskPool, + task_pool: &TaskPool, max_tasks: Option, f: F, ) -> Vec @@ -77,12 +92,12 @@ impl ParallelSliceMut for S where S: AsMut<[T]> {} #[cfg(test)] mod tests { - use super::*; + use crate::*; #[test] fn test_par_chunks_map() { let v = vec![42; 1000]; - let task_pool = TaskPool::new(); + let task_pool = TaskPool::::new(); let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); @@ -92,7 +107,7 @@ mod tests { #[test] fn test_par_chunks_map_mut() { let mut v = vec![42; 1000]; - let task_pool = TaskPool::new(); + let task_pool = TaskPool::::new(); let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { for number in numbers.iter_mut() { diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index 041e7587b7c4e..27d8fc4e38021 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -1,6 +1,8 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; /// Wraps `multitask::Task`, a spawned future. /// diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs new file mode 100644 index 0000000000000..7cd15df5e44e5 --- /dev/null +++ b/crates/bevy_tasks/src/usages.rs @@ -0,0 +1,4 @@ +//! Several premade usage profiles +//! Just `Compute` for now. + +pub struct Compute(()); From 93fcd7dc26308fbf10a38fa365e496a31ccaef77 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Sun, 23 Aug 2020 23:23:18 -0400 Subject: [PATCH 09/15] Add to root Cargo.toml and lib.rs --- Cargo.toml | 1 + src/lib.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index dd7e8d1d72108..7f667eaea66b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ bevy_transform = { path = "crates/bevy_transform", version = "0.1" } bevy_text = { path = "crates/bevy_text", version = "0.1" } bevy_ui = { path = "crates/bevy_ui", version = "0.1" } bevy_window = { path = "crates/bevy_window", version = "0.1" } +bevy_tasks = { path = "crates/bevy_tasks" } # bevy (optional) bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" } diff --git a/src/lib.rs b/src/lib.rs index ae29f1b089ca6..b08856e41e7fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ pub use bevy_transform as transform; pub use bevy_type_registry as type_registry; pub use bevy_ui as ui; pub use bevy_window as window; +pub use bevy_tasks as tasks; #[cfg(feature = "bevy_audio")] pub use bevy_audio as audio; From e406b073acc4d19a86353622f60da655d1a5c6cc Mon Sep 17 00:00:00 2001 From: Philip Degarmo Date: Thu, 27 Aug 2020 20:57:50 -0700 Subject: [PATCH 10/15] Add default task pools and method for configuring them (#2) * Remove generics from TaskPool newtypes and some of the task API Add IO, AsyncCompute and Compute TaskPools Move TaskPool setup from bevy_ecs to bevy_app ParallelExecutorOptions is essentially replaced by DefaultTaskPoolOptions * Pull TaskPool and related types out of bevy_tasks/lib.rs into a separate module Add a prelude to bevy_tasks Update the version of bevy_tasks to match other crates * Assert percent of cores >= 0 --- crates/bevy_app/Cargo.toml | 2 + crates/bevy_app/src/app.rs | 7 +- crates/bevy_app/src/lib.rs | 2 + crates/bevy_app/src/task_pool_options.rs | 157 ++++++++++ crates/bevy_ecs/src/resource/resources.rs | 2 + .../src/schedule/parallel_executor.rs | 67 +--- crates/bevy_tasks/Cargo.toml | 2 +- crates/bevy_tasks/src/lib.rs | 296 +----------------- crates/bevy_tasks/src/slice.rs | 25 +- crates/bevy_tasks/src/task_pool.rs | 280 +++++++++++++++++ crates/bevy_tasks/src/usages.rs | 54 +++- examples/app/thread_pool_resources.rs | 3 +- src/lib.rs | 2 +- 13 files changed, 521 insertions(+), 378 deletions(-) create mode 100644 crates/bevy_app/src/task_pool_options.rs create mode 100644 crates/bevy_tasks/src/task_pool.rs diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index 912b747953947..cde8462828aa1 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -13,6 +13,8 @@ keywords = ["bevy"] # bevy bevy_derive = { path = "../bevy_derive", version = "0.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.1" } +bevy_tasks = { path = "../bevy_tasks" } +num_cpus = "1" # other libloading = "0.6" diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index c5a9fa59acf76..046f69b3a24b9 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1,4 +1,5 @@ use crate::app_builder::AppBuilder; +use crate::DefaultTaskPoolOptions; use bevy_ecs::{ParallelExecutor, Resources, Schedule, World}; #[allow(clippy::needless_doctest_main)] @@ -63,7 +64,11 @@ impl App { } pub fn run(mut self) { - ParallelExecutor::initialize_pools(&mut self.resources); + // Setup the default bevy task pools + self.resources + .get_cloned::() + .unwrap_or_else(DefaultTaskPoolOptions::default) + .create_default_pools(&mut self.resources); self.startup_schedule.initialize(&mut self.resources); self.startup_executor.run( diff --git a/crates/bevy_app/src/lib.rs b/crates/bevy_app/src/lib.rs index d2e241616b55d..a409edbfafae3 100644 --- a/crates/bevy_app/src/lib.rs +++ b/crates/bevy_app/src/lib.rs @@ -8,6 +8,7 @@ mod app_builder; mod event; mod plugin; mod schedule_runner; +mod task_pool_options; pub use app::*; pub use app_builder::*; @@ -15,6 +16,7 @@ pub use bevy_derive::DynamicPlugin; pub use event::*; pub use plugin::*; pub use schedule_runner::*; +pub use task_pool_options::*; pub mod prelude { pub use crate::{ diff --git a/crates/bevy_app/src/task_pool_options.rs b/crates/bevy_app/src/task_pool_options.rs new file mode 100644 index 0000000000000..f5cdc9d6680c8 --- /dev/null +++ b/crates/bevy_app/src/task_pool_options.rs @@ -0,0 +1,157 @@ +use bevy_ecs::Resources; +use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder}; + +fn clamp_usize(value: usize, min: usize, max: usize) -> usize { + if value > max { + max + } else if value < min { + min + } else { + value + } +} + +/// Defines a simple way to determine how many threads to use given the number of remaining cores +/// and number of total cores +#[derive(Clone)] +pub struct TaskPoolThreadAssignmentPolicy { + /// Force using at least this many threads + pub min_threads: usize, + /// Under no circumstance use more than this many threads for this pool + pub max_threads: usize, + /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is + /// permitted to use 1.0 to try to use all remaining threads + pub percent: f32, +} + +impl TaskPoolThreadAssignmentPolicy { + /// Determine the number of threads to use for this task pool + fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { + assert!(self.percent >= 0.0); + let mut desired = (total_threads as f32 * self.percent).round() as usize; + + // Limit ourselves to the number of cores available + desired = desired.min(remaining_threads); + + // Clamp by min_threads, max_threads. (This may result in us using more threads than are + // available, this is intended. An example case where this might happen is a device with + // <= 2 threads. + clamp_usize(desired, self.min_threads, self.max_threads) + } +} + +/// Helper for configuring and creating the default task pools. For end-users who want full control, +/// insert the default task pools into the resource map manually. If the pools are already inserted, +/// this helper will do nothing. +#[derive(Clone)] +pub struct DefaultTaskPoolOptions { + /// If the number of physical cores is less than min_total_threads, force using min_total_threads + pub min_total_threads: usize, + /// If the number of physical cores is grater than max_total_threads, force using max_total_threads + pub max_total_threads: usize, + + /// Used to determine number of IO threads to allocate + pub io: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of async compute threads to allocate + pub async_compute: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of compute threads to allocate + pub compute: TaskPoolThreadAssignmentPolicy, +} + +impl Default for DefaultTaskPoolOptions { + fn default() -> Self { + DefaultTaskPoolOptions { + // By default, use however many cores are available on the system + min_total_threads: 1, + max_total_threads: std::usize::MAX, + + // Use 25% of cores for IO, at least 1, no more than 4 + io: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use 25% of cores for async compute, at least 1, no more than 4 + async_compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use all remaining cores for compute (at least 1) + compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: std::usize::MAX, + percent: 1.0, // This 1.0 here means "whatever is left over" + }, + } + } +} + +impl DefaultTaskPoolOptions { + /// Create a configuration that forces using the given number of threads. + pub fn with_num_threads(thread_count: usize) -> Self { + let mut options = Self::default(); + options.min_total_threads = thread_count; + options.max_total_threads = thread_count; + + options + } + + /// Inserts the default thread pools into the given resource map based on the configured values + pub fn create_default_pools(&self, resources: &mut Resources) { + let total_threads = clamp_usize( + num_cpus::get(), + self.min_total_threads, + self.max_total_threads, + ); + + let mut remaining_threads = total_threads; + + if !resources.contains::() { + // Determine the number of IO threads we will use + let io_threads = self + .io + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= io_threads; + + resources.insert(IOTaskPool( + TaskPoolBuilder::default() + .num_threads(io_threads) + .thread_name("IO Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of async compute threads we will use + let async_compute_threads = self + .async_compute + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= async_compute_threads; + + resources.insert(AsyncComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(async_compute_threads) + .thread_name("Async Compute Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of compute threads we will use + // This is intentionally last so that an end user can specify 1.0 as the percent + let compute_threads = self + .compute + .get_number_of_threads(remaining_threads, total_threads); + + resources.insert(ComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(compute_threads) + .thread_name("Compute Task Pool".to_string()) + .build(), + )); + } + } +} diff --git a/crates/bevy_ecs/src/resource/resources.rs b/crates/bevy_ecs/src/resource/resources.rs index 0d9d5afba0a0f..86f324d269a1c 100644 --- a/crates/bevy_ecs/src/resource/resources.rs +++ b/crates/bevy_ecs/src/resource/resources.rs @@ -42,6 +42,8 @@ impl Resources { self.get_resource_mut(ResourceIndex::Global) } + /// Returns a clone of the underlying resource, this is helpful when borrowing something + /// cloneable (like a task pool) without taking a borrow on the resource map pub fn get_cloned(&self) -> Option { self.get::().map(|r| (*r).clone()) } diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index 630fadb83f100..c8b6585fd2b74 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -35,18 +35,6 @@ impl Default for ParallelExecutor { } impl ParallelExecutor { - pub fn initialize_pools(resources: &mut Resources) { - let compute_pool: bevy_tasks::ComputePool = resources - .get::() - .map(|options| (*options).clone()) - .unwrap_or_else(ParallelExecutorOptions::default) - .create_builder() - .build(); - - // For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here. - resources.insert(compute_pool); - } - pub fn without_tracker_clears() -> Self { Self { clear_trackers: false, @@ -77,52 +65,6 @@ impl ParallelExecutor { } } -/// This can be added as an app resource to control the global `bevy_tasks::TaskPool` used by ecs. -// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync. -#[derive(Debug, Default, Clone)] -pub struct ParallelExecutorOptions { - /// If some value, we'll set up the thread pool to use at most n threads. See `bevy_tasks::TaskPoolBuilder::num_threads`. - num_threads: Option, - /// If some value, we'll set up the thread pool's' workers to the given stack size. See `bevy_tasks::TaskPoolBuilder::stack_size`. - stack_size: Option, - // TODO: Do we also need/want to expose other features (*_handler, etc.) -} - -impl ParallelExecutorOptions { - /// Creates a new ParallelExecutorOptions instance - pub fn new() -> Self { - Self::default() - } - - /// Sets the num_threads option, using the builder pattern - pub fn with_num_threads(mut self, num_threads: Option) -> Self { - self.num_threads = num_threads; - self - } - - /// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing, - /// otherwise your application may run into stability and performance issues. - pub fn with_stack_size(mut self, stack_size: Option) -> Self { - self.stack_size = stack_size; - self - } - - /// Creates a new ThreadPoolBuilder based on the current options. - pub(crate) fn create_builder(&self) -> bevy_tasks::TaskPoolBuilder { - let mut builder = bevy_tasks::TaskPoolBuilder::new(); - - if let Some(num_threads) = self.num_threads { - builder = builder.num_threads(num_threads); - } - - if let Some(stack_size) = self.stack_size { - builder = builder.stack_size(stack_size); - } - - builder - } -} - #[derive(Debug, Clone)] pub struct ExecutorStage { /// each system's set of dependencies @@ -325,11 +267,6 @@ impl ExecutorStage { system.run(world, resources); sender.send(system_index).unwrap(); }); - // scope.spawn_fifo(move |_| { - // let mut system = system.lock(); - // system.run(world, resources); - // sender.send(system_index).unwrap(); - // }); systems_currently_running = true; } @@ -345,7 +282,9 @@ impl ExecutorStage { systems: &[Arc>>], schedule_changed: bool, ) { - let compute_pool = resources.get_cloned::().unwrap(); + let compute_pool = resources + .get_cloned::() + .unwrap(); // if the schedule has changed, clear executor state / fill it with new defaults if schedule_changed { diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index fef7a462b587a..38ee408ee2682 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bevy_tasks" -version = "0.1.0" +version = "0.1.3" authors = [ "Bevy Contributors ", "Lachlan Sneff ", diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 5075f5fc55e54..1757591740e86 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -1,296 +1,18 @@ -use parking::Unparker; -use std::{ - future::Future, - marker::PhantomData, - mem, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::{self, JoinHandle}, -}; - mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; -mod usages; -pub use usages::Compute; - -pub type ComputePool = TaskPool; - -macro_rules! pin_mut { - ($($x:ident),*) => { $( - // Move the value to ensure that it is owned - let mut $x = $x; - // Shadow the original binding so that it can't be directly accessed - // ever again. - #[allow(unused_mut)] - let mut $x = unsafe { - Pin::new_unchecked(&mut $x) - }; - )* } -} - -/// Used to create a TaskPool -#[derive(Debug, Default, Clone)] -pub struct TaskPoolBuilder { - /// If set, we'll set up the thread pool to use at most n threads. Otherwise use - /// the logical core count of the system - num_threads: Option, - /// If set, we'll use the given stack size rather than the system default - stack_size: Option, - /// Allows customizing the name of the threads - helpful for debugging. If set, threads will - /// be named (), i.e. "MyThreadPool (2)" - thread_name: Option, -} - -impl TaskPoolBuilder { - /// Creates a new TaskPoolBuilder instance - pub fn new() -> Self { - Self::default() - } - - /// Override the number of threads created for the pool. If unset, we default to the number - /// of logical cores of the system - pub fn num_threads(mut self, num_threads: usize) -> Self { - self.num_threads = Some(num_threads); - self - } - - /// Override the stack size of the threads created for the pool - pub fn stack_size(mut self, stack_size: usize) -> Self { - self.stack_size = Some(stack_size); - self - } - - /// Override the name of the threads created for the pool. If set, threads will - /// be named (), i.e. "MyThreadPool (2)" - pub fn thread_name(mut self, thread_name: String) -> Self { - self.thread_name = Some(thread_name); - self - } - - /// Creates a new ThreadPoolBuilder based on the current options. - pub fn build(self) -> TaskPool { - TaskPool::new_internal( - self.num_threads, - self.stack_size, - self.thread_name.as_deref(), - ) - } -} - -struct TaskPoolInternal { - threads: Vec<(JoinHandle<()>, Arc)>, - shutdown_flag: Arc, -} - -impl Drop for TaskPoolInternal { - fn drop(&mut self) { - self.shutdown_flag.store(true, Ordering::Release); - - for (_, unparker) in &self.threads { - unparker.unpark(); - } - for (join_handle, _) in self.threads.drain(..) { - join_handle - .join() - .expect("task thread panicked while executing"); - } - } -} - -/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by -/// the pool on threads owned by the pool. -pub struct TaskPool { - executor: Arc, - internal: Arc, - _marker: PhantomData, -} - -impl TaskPool { - /// Create a `TaskPool` with the default configuration. - pub fn new() -> Self { - TaskPoolBuilder::new().build() - } - - fn new_internal( - num_threads: Option, - stack_size: Option, - thread_name: Option<&str>, - ) -> Self { - let executor = Arc::new(multitask::Executor::new()); - let shutdown_flag = Arc::new(AtomicBool::new(false)); - - let num_threads = num_threads.unwrap_or_else(num_cpus::get); - - let threads = (0..num_threads) - .map(|i| { - let ex = Arc::clone(&executor); - let flag = Arc::clone(&shutdown_flag); - let (p, u) = parking::pair(); - let unparker = Arc::new(u); - let u = Arc::clone(&unparker); - // Run an executor thread. - - let thread_name = if let Some(thread_name) = thread_name { - format!("{} ({})", thread_name, i) - } else { - format!("TaskPool ({})", i) - }; - - let mut thread_builder = thread::Builder::new().name(thread_name); - - if let Some(stack_size) = stack_size { - thread_builder = thread_builder.stack_size(stack_size); - } - - let handle = thread_builder - .spawn(move || { - let ticker = ex.ticker(move || u.unpark()); - loop { - if flag.load(Ordering::Acquire) { - break; - } +mod task_pool; +pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; - if !ticker.tick() { - p.park(); - } - } - }) - .expect("failed to spawn thread"); - - (handle, unparker) - }) - .collect(); - - Self { - executor, - internal: Arc::new(TaskPoolInternal { - threads, - shutdown_flag, - }), - _marker: PhantomData, - } - } - - /// Return the number of threads owned by the task pool - pub fn thread_num(&self) -> usize { - self.internal.threads.len() - } - - /// Allows spawning non-`static futures on the thread pool. The function takes a callback, - /// passing a scope object into it. The scope object provided to the callback can be used - /// to spawn tasks. This function will await the completion of all tasks before returning. - /// - /// This is similar to `rayon::scope` and `crossbeam::scope` - pub fn scope<'scope, F, T>(&self, f: F) -> Vec - where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, - T: Send + 'static, - { - let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; - - let fut = async move { - let mut scope = Scope { - executor, - spawned: Vec::new(), - }; - - f(&mut scope); - - let mut results = Vec::with_capacity(scope.spawned.len()); - for task in scope.spawned { - results.push(task.await); - } - - results - }; - - pin_mut!(fut); - - // let fut: Pin<&mut (dyn Future + Send)> = fut; - let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = - unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; - - pollster::block_on(self.executor.spawn(fut)) - } - - /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be - /// cancelled and "detached" allowing it to continue running without having to be polled by the - /// end-user. - pub fn spawn( - &self, - future: impl Future + Send + 'static, - ) -> impl Future + Send - where - T: Send + 'static, - { - self.executor.spawn(future) - } -} - -impl Default for TaskPool { - fn default() -> Self { - Self::new() - } -} - -impl Clone for TaskPool { - fn clone(&self) -> Self { - Self { - executor: Arc::clone(&self.executor), - internal: Arc::clone(&self.internal), - _marker: PhantomData, - } - } -} - -pub struct Scope<'scope, T> { - executor: &'scope multitask::Executor, - spawned: Vec>, -} - -impl<'scope, T: Send + 'static> Scope<'scope, T> { - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { - let fut: Pin + 'scope + Send>> = Box::pin(f); - let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; - - let task = self.executor.spawn(fut); - self.spawned.push(task); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - pub fn test_spawn() { - let pool = TaskPool::::new(); - - let foo = Box::new(42); - let foo = &*foo; - - let outputs = pool.scope(|scope| { - for i in 0..100 { - scope.spawn(async move { - println!("task {}", i); - if *foo != 42 { - panic!("not 42!?!?") - } else { - *foo - } - }); - } - }); +mod usages; +pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}; - for output in outputs { - assert_eq!(output, 42); - } - } +pub mod prelude { + pub use crate::{ + slice::{ParallelSlice, ParallelSliceMut}, + usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}, + }; } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index bc2465a77a9fa..3461b0bc20756 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,12 +1,7 @@ use super::TaskPool; pub trait ParallelSlice: AsRef<[T]> { - fn par_chunk_map( - &self, - task_pool: &TaskPool, - chunk_size: usize, - f: F, - ) -> Vec + fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -20,12 +15,7 @@ pub trait ParallelSlice: AsRef<[T]> { }) } - fn par_splat_map( - &self, - task_pool: &TaskPool, - max_tasks: Option, - f: F, - ) -> Vec + fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -46,12 +36,7 @@ pub trait ParallelSlice: AsRef<[T]> { impl ParallelSlice for S where S: AsRef<[T]> {} pub trait ParallelSliceMut: AsMut<[T]> { - fn par_chunk_map_mut( - &mut self, - task_pool: &TaskPool, - chunk_size: usize, - f: F, - ) -> Vec + fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(&mut [T]) -> R + Send + Sync, R: Send + 'static, @@ -65,9 +50,9 @@ pub trait ParallelSliceMut: AsMut<[T]> { }) } - fn par_splat_map_mut( + fn par_splat_map_mut( &mut self, - task_pool: &TaskPool, + task_pool: &TaskPool, max_tasks: Option, f: F, ) -> Vec diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs new file mode 100644 index 0000000000000..e719f1e40e269 --- /dev/null +++ b/crates/bevy_tasks/src/task_pool.rs @@ -0,0 +1,280 @@ +use parking::Unparker; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +macro_rules! pin_mut { + ($($x:ident),*) => { $( + // Move the value to ensure that it is owned + let mut $x = $x; + // Shadow the original binding so that it can't be directly accessed + // ever again. + #[allow(unused_mut)] + let mut $x = unsafe { + Pin::new_unchecked(&mut $x) + }; + )* } +} + +/// Used to create a TaskPool +#[derive(Debug, Default, Clone)] +pub struct TaskPoolBuilder { + /// If set, we'll set up the thread pool to use at most n threads. Otherwise use + /// the logical core count of the system + num_threads: Option, + /// If set, we'll use the given stack size rather than the system default + stack_size: Option, + /// Allows customizing the name of the threads - helpful for debugging. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + thread_name: Option, +} + +impl TaskPoolBuilder { + /// Creates a new TaskPoolBuilder instance + pub fn new() -> Self { + Self::default() + } + + /// Override the number of threads created for the pool. If unset, we default to the number + /// of logical cores of the system + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = Some(num_threads); + self + } + + /// Override the stack size of the threads created for the pool + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// Override the name of the threads created for the pool. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + pub fn thread_name(mut self, thread_name: String) -> Self { + self.thread_name = Some(thread_name); + self + } + + /// Creates a new ThreadPoolBuilder based on the current options. + pub fn build(self) -> TaskPool { + TaskPool::new_internal( + self.num_threads, + self.stack_size, + self.thread_name.as_deref(), + ) + } +} + +struct TaskPoolInner { + threads: Vec<(JoinHandle<()>, Arc)>, + shutdown_flag: Arc, +} + +impl Drop for TaskPoolInner { + fn drop(&mut self) { + self.shutdown_flag.store(true, Ordering::Release); + + for (_, unparker) in &self.threads { + unparker.unpark(); + } + for (join_handle, _) in self.threads.drain(..) { + join_handle + .join() + .expect("task thread panicked while executing"); + } + } +} + +/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by +/// the pool on threads owned by the pool. +#[derive(Clone)] +pub struct TaskPool { + /// The executor for the pool + /// + /// This has to be separate from TaskPoolInner because we have to create an Arc to + /// pass into the worker threads, and we must create the worker threads before we can create the + /// Vec> contained within TaskPoolInner + executor: Arc, + + /// + inner: Arc, +} + +impl TaskPool { + /// Create a `TaskPool` with the default configuration. + pub fn new() -> Self { + TaskPoolBuilder::new().build() + } + + fn new_internal( + num_threads: Option, + stack_size: Option, + thread_name: Option<&str>, + ) -> Self { + let executor = Arc::new(multitask::Executor::new()); + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + let num_threads = num_threads.unwrap_or_else(num_cpus::get); + + let threads = (0..num_threads) + .map(|i| { + let ex = Arc::clone(&executor); + let flag = Arc::clone(&shutdown_flag); + let (p, u) = parking::pair(); + let unparker = Arc::new(u); + let u = Arc::clone(&unparker); + // Run an executor thread. + + let thread_name = if let Some(thread_name) = thread_name { + format!("{} ({})", thread_name, i) + } else { + format!("TaskPool ({})", i) + }; + + let mut thread_builder = thread::Builder::new().name(thread_name); + + if let Some(stack_size) = stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + let handle = thread_builder + .spawn(move || { + let ticker = ex.ticker(move || u.unpark()); + loop { + if flag.load(Ordering::Acquire) { + break; + } + + if !ticker.tick() { + p.park(); + } + } + }) + .expect("failed to spawn thread"); + + (handle, unparker) + }) + .collect(); + + Self { + executor, + inner: Arc::new(TaskPoolInner { + threads, + shutdown_flag, + }), + } + } + + /// Return the number of threads owned by the task pool + pub fn thread_num(&self) -> usize { + self.inner.threads.len() + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope<'scope, F, T>(&self, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; + + let fut = async move { + let mut scope = Scope { + executor, + spawned: Vec::new(), + }; + + f(&mut scope); + + let mut results = Vec::with_capacity(scope.spawned.len()); + for task in scope.spawned { + results.push(task.await); + } + + results + }; + + pin_mut!(fut); + + // let fut: Pin<&mut (dyn Future + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = + unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; + + pollster::block_on(self.executor.spawn(fut)) + } + + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be + /// cancelled and "detached" allowing it to continue running without having to be polled by the + /// end-user. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> impl Future + Send + where + T: Send + 'static, + { + self.executor.spawn(future) + } +} + +impl Default for TaskPool { + fn default() -> Self { + Self::new() + } +} + +pub struct Scope<'scope, T> { + executor: &'scope multitask::Executor, + spawned: Vec>, +} + +impl<'scope, T: Send + 'static> Scope<'scope, T> { + pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + let fut: Pin + 'scope + Send>> = Box::pin(f); + let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; + + let task = self.executor.spawn(fut); + self.spawned.push(task); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_spawn() { + let pool = TaskPool::::new(); + + let foo = Box::new(42); + let foo = &*foo; + + let outputs = pool.scope(|scope| { + for i in 0..100 { + scope.spawn(async move { + println!("task {}", i); + if *foo != 42 { + panic!("not 42!?!?") + } else { + *foo + } + }); + } + }); + + for output in outputs { + assert_eq!(output, 42); + } + } +} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 7cd15df5e44e5..f604aae70a73f 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,4 +1,52 @@ -//! Several premade usage profiles -//! Just `Compute` for now. +//! Definitions for a few common task pools that we want. Generally the determining factor for what +//! kind of work should go in each pool is latency requirements. +//! +//! For CPU-intensive work (tasks that generally spin until completion) we have a standard Compute +//! pool and an AsyncCompute pool. Work that does not need to be completed to present the next +//! frame should go to the AsyncCompute pool +//! +//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO +//! task pool. The tasks here are expected to complete very quickly. Generally they should just +//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready +//! for consumption. (likely via channels) -pub struct Compute(()); +use super::TaskPool; +use std::ops::Deref; + +/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next +/// frame +#[derive(Clone)] +pub struct ComputeTaskPool(pub TaskPool); + +impl Deref for ComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for CPU-intensive work that may span across multiple frames +#[derive(Clone)] +pub struct AsyncComputeTaskPool(pub TaskPool); + +impl Deref for AsyncComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a +/// "woken" state) +#[derive(Clone)] +pub struct IOTaskPool(pub TaskPool); + +impl Deref for IOTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/examples/app/thread_pool_resources.rs b/examples/app/thread_pool_resources.rs index 99f3016236748..1da1b867c686d 100644 --- a/examples/app/thread_pool_resources.rs +++ b/examples/app/thread_pool_resources.rs @@ -1,10 +1,11 @@ use bevy::{ecs::ParallelExecutorOptions, prelude::*}; +use bevy_ecs::DefaultTaskPoolOptions; /// This example illustrates how to customize the thread pool used internally (e.g. to only use a /// certain number of threads). fn main() { App::build() - .add_resource(ParallelExecutorOptions::new().with_num_threads(Some(4))) + .add_resource(DefaultTaskPoolOptions::with_num_threads(4)) .add_default_plugins() .run(); } diff --git a/src/lib.rs b/src/lib.rs index b08856e41e7fd..b0a00b83fabf3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,12 +53,12 @@ pub use bevy_property as property; pub use bevy_render as render; pub use bevy_scene as scene; pub use bevy_sprite as sprite; +pub use bevy_tasks as tasks; pub use bevy_text as text; pub use bevy_transform as transform; pub use bevy_type_registry as type_registry; pub use bevy_ui as ui; pub use bevy_window as window; -pub use bevy_tasks as tasks; #[cfg(feature = "bevy_audio")] pub use bevy_audio as audio; From 5a59d62a5fa6c2024237bebe14e2e49537ecdcc7 Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Fri, 28 Aug 2020 12:27:37 -0400 Subject: [PATCH 11/15] Fix tests --- crates/bevy_tasks/examples/busy_behavior.rs | 4 ++-- crates/bevy_tasks/examples/idle_behavior.rs | 4 ++-- crates/bevy_tasks/src/slice.rs | 5 ++--- crates/bevy_tasks/src/task_pool.rs | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 6c10175abcfb8..26dfaeac285d9 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,4 +1,4 @@ -use bevy_tasks::{Compute, TaskPoolBuilder}; +use bevy_tasks::TaskPoolBuilder; // This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin // for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical @@ -8,7 +8,7 @@ fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) .num_threads(4) - .build::(); + .build(); let t0 = std::time::Instant::now(); pool.scope(|s| { diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs index 2fb370901a410..4a392cb2e6da9 100644 --- a/crates/bevy_tasks/examples/idle_behavior.rs +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -1,4 +1,4 @@ -use bevy_tasks::{Compute, TaskPoolBuilder}; +use bevy_tasks::TaskPoolBuilder; // This sample demonstrates a thread pool with one thread per logical core and only one task // spinning. Other than the one thread, the system should remain idle, demonstrating good behavior @@ -7,7 +7,7 @@ use bevy_tasks::{Compute, TaskPoolBuilder}; fn main() { let pool = TaskPoolBuilder::new() .thread_name("Idle Behavior ThreadPool".to_string()) - .build::(); + .build(); pool.scope(|s| { for i in 0..1 { diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index 3461b0bc20756..fdd56ebccd720 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -82,8 +82,7 @@ mod tests { #[test] fn test_par_chunks_map() { let v = vec![42; 1000]; - let task_pool = TaskPool::::new(); - + let task_pool = TaskPool::new(); let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); println!("outputs: {:?}", outputs); @@ -92,7 +91,7 @@ mod tests { #[test] fn test_par_chunks_map_mut() { let mut v = vec![42; 1000]; - let task_pool = TaskPool::::new(); + let task_pool = TaskPool::new(); let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { for number in numbers.iter_mut() { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index e719f1e40e269..31d266bcf4a4d 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -255,7 +255,7 @@ mod tests { #[test] pub fn test_spawn() { - let pool = TaskPool::::new(); + let pool = TaskPool::new(); let foo = Box::new(42); let foo = &*foo; From d092add38476cfd9d3b4ac0ae394a351d81cac0d Mon Sep 17 00:00:00 2001 From: Lachlan Sneff Date: Fri, 28 Aug 2020 12:30:15 -0400 Subject: [PATCH 12/15] Fix thread_pool_resources example --- crates/bevy_app/src/app.rs | 3 +-- examples/app/thread_pool_resources.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 046f69b3a24b9..c62cdea8e7460 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1,5 +1,4 @@ -use crate::app_builder::AppBuilder; -use crate::DefaultTaskPoolOptions; +use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions}; use bevy_ecs::{ParallelExecutor, Resources, Schedule, World}; #[allow(clippy::needless_doctest_main)] diff --git a/examples/app/thread_pool_resources.rs b/examples/app/thread_pool_resources.rs index 1da1b867c686d..27ce33e2670c4 100644 --- a/examples/app/thread_pool_resources.rs +++ b/examples/app/thread_pool_resources.rs @@ -1,5 +1,5 @@ -use bevy::{ecs::ParallelExecutorOptions, prelude::*}; -use bevy_ecs::DefaultTaskPoolOptions; +use bevy::prelude::*; +use bevy_app::DefaultTaskPoolOptions; /// This example illustrates how to customize the thread pool used internally (e.g. to only use a /// certain number of threads). From 8996109c2e3793fa79fe1df57675fc3bdc11b9f3 Mon Sep 17 00:00:00 2001 From: Philip Degarmo Date: Sat, 29 Aug 2020 08:34:32 -0700 Subject: [PATCH 13/15] Bevy tasks (#3) * Address some feedback for the bevy_tasks PR - Add a general clamp fn to bevy_math - Expose num_cpus in bevy_tasks - Fill out empty doc comment * Add comments and clean up pinning in bevy_tasks, fix a couple tests --- crates/bevy_app/Cargo.toml | 2 +- crates/bevy_app/src/task_pool_options.rs | 16 ++------- crates/bevy_math/src/clamp.rs | 19 +++++++++++ crates/bevy_math/src/lib.rs | 2 ++ crates/bevy_tasks/src/lib.rs | 8 +++++ crates/bevy_tasks/src/slice.rs | 15 +++++++-- crates/bevy_tasks/src/task_pool.rs | 41 +++++++++++++----------- 7 files changed, 69 insertions(+), 34 deletions(-) create mode 100644 crates/bevy_math/src/clamp.rs diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index cde8462828aa1..2e30093eaa037 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -14,7 +14,7 @@ keywords = ["bevy"] bevy_derive = { path = "../bevy_derive", version = "0.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.1" } bevy_tasks = { path = "../bevy_tasks" } -num_cpus = "1" +bevy_math = { path = "../bevy_math", version = "0.1" } # other libloading = "0.6" diff --git a/crates/bevy_app/src/task_pool_options.rs b/crates/bevy_app/src/task_pool_options.rs index f5cdc9d6680c8..fc5edbfc0e36f 100644 --- a/crates/bevy_app/src/task_pool_options.rs +++ b/crates/bevy_app/src/task_pool_options.rs @@ -1,16 +1,6 @@ use bevy_ecs::Resources; use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder}; -fn clamp_usize(value: usize, min: usize, max: usize) -> usize { - if value > max { - max - } else if value < min { - min - } else { - value - } -} - /// Defines a simple way to determine how many threads to use given the number of remaining cores /// and number of total cores #[derive(Clone)] @@ -36,7 +26,7 @@ impl TaskPoolThreadAssignmentPolicy { // Clamp by min_threads, max_threads. (This may result in us using more threads than are // available, this is intended. An example case where this might happen is a device with // <= 2 threads. - clamp_usize(desired, self.min_threads, self.max_threads) + bevy_math::clamp(desired, self.min_threads, self.max_threads) } } @@ -101,8 +91,8 @@ impl DefaultTaskPoolOptions { /// Inserts the default thread pools into the given resource map based on the configured values pub fn create_default_pools(&self, resources: &mut Resources) { - let total_threads = clamp_usize( - num_cpus::get(), + let total_threads = bevy_math::clamp( + bevy_tasks::logical_core_count(), self.min_total_threads, self.max_total_threads, ); diff --git a/crates/bevy_math/src/clamp.rs b/crates/bevy_math/src/clamp.rs new file mode 100644 index 0000000000000..92807e54f179e --- /dev/null +++ b/crates/bevy_math/src/clamp.rs @@ -0,0 +1,19 @@ +/// A value bounded by a minimum and a maximum +/// +/// If input is less than min then this returns min. +/// If input is greater than max then this returns max. +/// Otherwise this returns input. +/// +/// **Panics** in debug mode if `!(min <= max)`. +/// +/// Original implementation from num-traits licensed as MIT +pub fn clamp(input: T, min: T, max: T) -> T { + debug_assert!(min <= max, "min must be less than or equal to max"); + if input < min { + min + } else if input > max { + max + } else { + input + } +} diff --git a/crates/bevy_math/src/lib.rs b/crates/bevy_math/src/lib.rs index 3dcb0614dc565..121932725b725 100644 --- a/crates/bevy_math/src/lib.rs +++ b/crates/bevy_math/src/lib.rs @@ -1,6 +1,8 @@ +mod clamp; mod face_toward; mod geometry; +pub use clamp::*; pub use face_toward::*; pub use geometry::*; pub use glam::*; diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1757591740e86..8ac79b3e1cf37 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -16,3 +16,11 @@ pub mod prelude { usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}, }; } + +pub fn logical_core_count() -> usize { + num_cpus::get() +} + +pub fn physical_core_count() -> usize { + num_cpus::get_physical() +} diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index fdd56ebccd720..a04c9af10a28f 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -85,7 +85,12 @@ mod tests { let task_pool = TaskPool::new(); let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); - println!("outputs: {:?}", outputs); + let mut sum = 0; + for output in outputs { + sum += output; + } + + assert_eq!(sum, 1000 * 42); } #[test] @@ -100,6 +105,12 @@ mod tests { numbers.iter().sum() }); - println!("outputs: {:?}", outputs); + let mut sum = 0; + for output in outputs { + sum += output; + } + + assert_eq!(sum, 1000 * 42 * 2); + assert_eq!(v[0], 84); } } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 31d266bcf4a4d..546a8c1a081a2 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -10,19 +10,6 @@ use std::{ thread::{self, JoinHandle}, }; -macro_rules! pin_mut { - ($($x:ident),*) => { $( - // Move the value to ensure that it is owned - let mut $x = $x; - // Shadow the original binding so that it can't be directly accessed - // ever again. - #[allow(unused_mut)] - let mut $x = unsafe { - Pin::new_unchecked(&mut $x) - }; - )* } -} - /// Used to create a TaskPool #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder { @@ -103,7 +90,7 @@ pub struct TaskPool { /// Vec> contained within TaskPoolInner executor: Arc, - /// + /// Inner state of the pool inner: Arc, } @@ -187,7 +174,12 @@ impl TaskPool { F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { - let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; + // SAFETY: This function blocks until all futures complete, so this future must return + // before this function returns. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let executor: &multitask::Executor = &*self.executor as &multitask::Executor; + let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) }; let fut = async move { let mut scope = Scope { @@ -205,11 +197,20 @@ impl TaskPool { results }; - pin_mut!(fut); + // Move the value to ensure that it is owned + let mut fut = fut; + + // Shadow the original binding so that it can't be directly accessed + // ever again. + let fut = unsafe { Pin::new_unchecked(&mut fut) }; - // let fut: Pin<&mut (dyn Future + Send)> = fut; + // SAFETY: This function blocks until all futures complete, so we do not read/write the + // data from futures outside of the 'scope lifetime. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let fut: Pin<&mut (dyn Future> + Send)> = fut; let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = - unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; + unsafe { mem::transmute(fut) }; pollster::block_on(self.executor.spawn(fut)) } @@ -241,6 +242,10 @@ pub struct Scope<'scope, T> { impl<'scope, T: Send + 'static> Scope<'scope, T> { pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + // SAFETY: This function blocks until all futures complete, so we do not read/write the + // data from futures outside of the 'scope lifetime. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. let fut: Pin + 'scope + Send>> = Box::pin(f); let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; From 879afa900d3cbb1554491de7143adb81caff4aaf Mon Sep 17 00:00:00 2001 From: Philip Degarmo Date: Sat, 29 Aug 2020 11:00:45 -0700 Subject: [PATCH 14/15] Fix tests --- crates/bevy_ecs/src/schedule/parallel_executor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index c8b6585fd2b74..3b434db503b6f 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -402,6 +402,7 @@ mod tests { Commands, }; use bevy_hecs::{Entity, World}; + use bevy_tasks::{ComputeTaskPool, TaskPool}; use fixedbitset::FixedBitSet; use parking_lot::Mutex; use std::sync::Arc; @@ -415,6 +416,8 @@ mod tests { fn cross_stage_archetype_change_prepare() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); + let mut schedule = Schedule::default(); schedule.add_stage("PreArchetypeChange"); schedule.add_stage("PostArchetypeChange"); @@ -444,6 +447,8 @@ mod tests { fn intra_stage_archetype_change_prepare() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); + let mut schedule = Schedule::default(); schedule.add_stage("update"); @@ -472,6 +477,7 @@ mod tests { fn schedule() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); resources.insert(Counter::default()); resources.insert(1.0f64); resources.insert(2isize); From 77b126de052eaa781a9e0b22c6776450c368fe14 Mon Sep 17 00:00:00 2001 From: Philip Degarmo Date: Sat, 29 Aug 2020 11:14:38 -0700 Subject: [PATCH 15/15] Add version to bevy_tasks references in cargo.toml files Co-authored-by: stefee --- Cargo.toml | 2 +- crates/bevy_app/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index aa5521cedb095..e18cdb2c78f5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" } bevy_ui = { path = "crates/bevy_ui", version = "0.1" } bevy_utils = { path = "crates/bevy_utils", version = "0.1" } bevy_window = { path = "crates/bevy_window", version = "0.1" } -bevy_tasks = { path = "crates/bevy_tasks" } +bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" } # bevy (optional) bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" } diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index 2e30093eaa037..713560ba553fd 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -13,10 +13,10 @@ keywords = ["bevy"] # bevy bevy_derive = { path = "../bevy_derive", version = "0.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.1" } -bevy_tasks = { path = "../bevy_tasks" } +bevy_tasks = { path = "../bevy_tasks", version = "0.1" } bevy_math = { path = "../bevy_math", version = "0.1" } # other libloading = "0.6" log = { version = "0.4", features = ["release_max_level_info"] } -serde = { version = "1.0", features = ["derive"]} \ No newline at end of file +serde = { version = "1.0", features = ["derive"]}