From 307ea61de61b7404ff93a1d3b1725181f47df5ed Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 9 Mar 2023 22:50:22 +0000 Subject: [PATCH 01/10] beginning of an Async section --- src/SUMMARY.md | 15 +++++++++++ src/async.md | 17 ++++++++++++ src/async/async-await.md | 52 ++++++++++++++++++++++++++++++++++++ src/async/async-blocks.md | 34 +++++++++++++++++++++++ src/exercises/day-4/async.md | 3 +++ 5 files changed, 121 insertions(+) create mode 100644 src/async.md create mode 100644 src/async/async-await.md create mode 100644 src/async/async-blocks.md create mode 100644 src/exercises/day-4/async.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index bf5800a8e8f0..0f9a1466d8fc 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -225,6 +225,21 @@ - [With Java](android/interoperability/java.md) - [Exercises](exercises/day-4/android.md) +# Day 4: Afternoon (Async) + +---- + +- [Async](async.md) + - [async/await](async/async-await.md) + - [Async Blocks](async/async-blocks.md) + - Futures + - Executors + - Polling + - Pin + - Channels + - Select +- [Exercises](exercises/day-4/async.md) + # Final Words - [Thanks!](thanks.md) diff --git a/src/async.md b/src/async.md new file mode 100644 index 000000000000..d4f7d3ad536e --- /dev/null +++ b/src/async.md @@ -0,0 +1,17 @@ +# Async Rust + +"Async" is a concurrency model where multiple tasks are executed concurrently by +executing each task until it would block, then switching to another task that is +ready to make progress. The model scales to higher concurrency than threads +because the per-task overhead is typically very low and operating systems +provide means of efficiently selecting tasks that can make progress. + +## Comparisons + + * Python has a similar model in its `asyncio`. However, its `Future` type is + callback-based, and not polled. Async Python programs require a "loop", + similar to an executor in Rust. + + * JavaScript's `Promise` is similar, but again callback-based. The language + runtime implements the event loop, so many of the details of Promise + resolution are hidden. diff --git a/src/async/async-await.md b/src/async/async-await.md new file mode 100644 index 000000000000..4414c9b367cd --- /dev/null +++ b/src/async/async-await.md @@ -0,0 +1,52 @@ +# `async`/`await` + +At a high level, async Rust code looks very much like "normal" sequential code: + +```rust,editable +use tokio::time; + +async fn count_to(i: i32) { + for i in 1..10 { + println!("Count in task: {i}!"); + time::sleep(time::Duration::from_millis(5)).await; + } +} + +#[tokio::main] +async fn main() { + tokio::spawn(count_to(10)); + + for i in 1..5 { + println!("Main task: {i}"); + time::sleep(time::Duration::from_millis(5)).await; + } +} +``` + +
+ +Key points: + +* Tokio is one of several async runtimes available for Rust. + +* The function is decorated with the "async" keyword to indicate that it is async. The + `tokio::main` macro invocation is a convenience to wrap the `main` function as a task. + +* The `spawn` function creates a new, concurrent "task", just like spawning a thread. + +* Whenever a task would block, we add an `.await` which returns control to the runtime until the + blocking operation is ready to proceed. + +Further exploration: + +* Why does `count_to` not (usually) get to 10? This is an example of async cancellation. + `tokio::spawn` returns a handle which can be awaited to wait until it finishes. + +* Try `count_to(10).await` instead of spawning. + +* Try importing `tokio::join` and using it to join multiple handles. + +Note that the Rust playground does not allow network connections, so examples like making HTTP +requests are not possible. + +
diff --git a/src/async/async-blocks.md b/src/async/async-blocks.md new file mode 100644 index 000000000000..06038151e8f9 --- /dev/null +++ b/src/async/async-blocks.md @@ -0,0 +1,34 @@ +# Async Blocks + +Similar to closures, a snippet of async code can be included inline in another +function with an async block: + +```rust, editable +use tokio::{time, task}; + +#[tokio::main] +async fn main() { + let mut joinset = task::JoinSet::new(); + + for i in 1..5 { + joinset.spawn(async move { + println!("task {i} starting"); + time::sleep(time::Duration::from_millis(i)).await; + println!("task {i} done"); + format!("hello from task {i}") + }); + } + + while let Some(res) = joinset.join_next().await { + let greeting = res.unwrap(); + println!("task joined with result: {greeting}"); + } +} + +
+ +An async block is similar to a closure, but does not take any arguments. + +Its return value is a Future, which is described on the next slide. + +
diff --git a/src/exercises/day-4/async.md b/src/exercises/day-4/async.md new file mode 100644 index 000000000000..14047188461c --- /dev/null +++ b/src/exercises/day-4/async.md @@ -0,0 +1,3 @@ +# Exercises + +TBD From 7515eb2e6e610edd2c759a1f20ef10c4655dd634 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Fri, 10 Mar 2023 16:34:13 +0000 Subject: [PATCH 02/10] address review comments --- src/async.md | 16 ++++++++++++---- src/async/async-blocks.md | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/async.md b/src/async.md index d4f7d3ad536e..cd2487dd522a 100644 --- a/src/async.md +++ b/src/async.md @@ -2,15 +2,23 @@ "Async" is a concurrency model where multiple tasks are executed concurrently by executing each task until it would block, then switching to another task that is -ready to make progress. The model scales to higher concurrency than threads -because the per-task overhead is typically very low and operating systems -provide means of efficiently selecting tasks that can make progress. +ready to make progress. The model allows running a larger number of tasks on a +limited number of threads. This is because the per-task overhead is typically +very low and operating systems provide primitives for efficiently identifying +I/O that is able to proceed. + +Rust's asynchronous operation is based around "futures", which represent work +that may be completed in the future. Futures are "polled" until they signal that +they are complete. + +Futures are polled by an async runtime, and several different runtimes are +available. ## Comparisons * Python has a similar model in its `asyncio`. However, its `Future` type is callback-based, and not polled. Async Python programs require a "loop", - similar to an executor in Rust. + similar to a runtime in Rust. * JavaScript's `Promise` is similar, but again callback-based. The language runtime implements the event loop, so many of the details of Promise diff --git a/src/async/async-blocks.md b/src/async/async-blocks.md index 06038151e8f9..57115c1fc88d 100644 --- a/src/async/async-blocks.md +++ b/src/async/async-blocks.md @@ -24,6 +24,7 @@ async fn main() { println!("task joined with result: {greeting}"); } } +```
From 665e89814987993ed3cafadfee3c8f3193c5fb98 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 22 Mar 2023 16:53:45 -0400 Subject: [PATCH 03/10] Add futures page (#497) NOTE: `mdbook test` does not allow code samples to reference other crates, so they must be marked as `compile_fail`; see #175. --- src/SUMMARY.md | 7 +--- src/async/async-await.md | 6 ++-- src/async/async-blocks.md | 2 +- src/async/futures.md | 72 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 10 deletions(-) create mode 100644 src/async/futures.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 0f9a1466d8fc..341ac6016131 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -232,12 +232,7 @@ - [Async](async.md) - [async/await](async/async-await.md) - [Async Blocks](async/async-blocks.md) - - Futures - - Executors - - Polling - - Pin - - Channels - - Select + - [Futures](async/futures.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/async-await.md b/src/async/async-await.md index 4414c9b367cd..b0a2e4278ba7 100644 --- a/src/async/async-await.md +++ b/src/async/async-await.md @@ -2,11 +2,11 @@ At a high level, async Rust code looks very much like "normal" sequential code: -```rust,editable +```rust,editable,compile_fail use tokio::time; -async fn count_to(i: i32) { - for i in 1..10 { +async fn count_to(count: i32) { + for i in 1..=count { println!("Count in task: {i}!"); time::sleep(time::Duration::from_millis(5)).await; } diff --git a/src/async/async-blocks.md b/src/async/async-blocks.md index 57115c1fc88d..8ec21a63df8e 100644 --- a/src/async/async-blocks.md +++ b/src/async/async-blocks.md @@ -3,7 +3,7 @@ Similar to closures, a snippet of async code can be included inline in another function with an async block: -```rust, editable +```rust,editable,compile_fail use tokio::{time, task}; #[tokio::main] diff --git a/src/async/futures.md b/src/async/futures.md new file mode 100644 index 000000000000..0e2ca784ce8b --- /dev/null +++ b/src/async/futures.md @@ -0,0 +1,72 @@ +# Futures + +What is the type of an async operation? + +```rust,editable,compile_fail +use tokio::time; + +async fn count_to(count: i32) -> i32 { + for i in 1..=count { + println!("Count in task: {i}!"); + time::sleep(time::Duration::from_millis(5)).await; + } + count +} + +#[tokio::main] +async fn main() { + let _: () = count_to(13); +} +``` + +[Future](https://doc.rust-lang.org/nightly/src/core/future/future.rs.html#37) +is a trait, implemented by objects that represent an operation that may not be +complete yet. A future can be polled, and `poll` returns either +`Poll::Ready(result)` or `Poll::Pending`. + +```rust +use std::pin::Pin; +use std::task::Context; + +pub trait Future { + type Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; +} + +pub enum Poll { + Ready(T), + Pending, +} +``` + +An async function returns an `impl Future`, and an async block evaluates to an +`impl Future`. It's also possible (but uncommon) to implement `Future` for your +own types. For example, the `JoinHandle` returned from `tokio::spawn` implements +`Future` to allow joining to it. + +The `.await` keyword, applied to a Future, causes the current async function or +block to pause until that Future is ready, and then evaluates to its output. + +An important difference from other languages is that a Future is inert: it does +not do anything until it is polled. + +
+ +* Run the example and look at the error message. `_: () = ..` is a common + technique for getting the type of an expression. Try adding a `.await` in + `main`. + +* The `Future` and `Poll` types are conceptually quite simple, and implemented as + such in `std::task`. + +* We will not get to `Pin` and `Context`, as we will focus on writing async + code, rather than building new async primitives. Briefly: + + * `Context` allows a Future to schedule itself to be polled again when an + event occurs. + + * `Pin` ensures that the Future isn't moved in memory, so that pointers into + that future remain valid. This is required to allow references to remain + valid after an `.await`. + +
From 1cff6e3eb3a775c3605ed2ca523e1de7e1af3cc2 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Fri, 24 Mar 2023 10:16:33 -0400 Subject: [PATCH 04/10] Add Runtimes & Tasks (#522) These concepts are closely related, and there's not much else to know about runtimes other than "they exist". This removes the bit about futures being "inert" because it doesn't really lead anywhere. --- src/SUMMARY.md | 4 ++- src/async/futures.md | 3 -- src/async/runtimes.md | 29 ++++++++++++++++ src/async/tasks.md | 60 +++++++++++++++++++++++++++++++++ src/running-the-course/day-4.md | 13 +++++++ 5 files changed, 105 insertions(+), 4 deletions(-) create mode 100644 src/async/runtimes.md create mode 100644 src/async/tasks.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 341ac6016131..5a250477d1e7 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -232,7 +232,9 @@ - [Async](async.md) - [async/await](async/async-await.md) - [Async Blocks](async/async-blocks.md) - - [Futures](async/futures.md) +- [Futures](async/futures.md) +- [Runtimes](async/runtimes.md) +- [Tasks](async/tasks.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/futures.md b/src/async/futures.md index 0e2ca784ce8b..18ef41b80c12 100644 --- a/src/async/futures.md +++ b/src/async/futures.md @@ -47,9 +47,6 @@ own types. For example, the `JoinHandle` returned from `tokio::spawn` implements The `.await` keyword, applied to a Future, causes the current async function or block to pause until that Future is ready, and then evaluates to its output. -An important difference from other languages is that a Future is inert: it does -not do anything until it is polled. -
* Run the example and look at the error message. `_: () = ..` is a common diff --git a/src/async/runtimes.md b/src/async/runtimes.md new file mode 100644 index 000000000000..64a309e0d91b --- /dev/null +++ b/src/async/runtimes.md @@ -0,0 +1,29 @@ +# Runtimes and Tasks + +A *runtime* provides support for performing operations asynchronously (a +*reactor*) and is responsible for executing futures (an *executor*). Rust does not have a +"built-in" runtime, but several options are available: + + * [Tokio](https://tokio.rs/) - performant, with a well-developed ecosystem of + functionality like [Hyper](https://hyper.rs/) for HTTP or + [Tonic](https://github.com/hyperium/tonic) for gRPC. + * [async-std](https://async.rs/) - aims to be a "std for async", and includes a + basic runtime in `async::task`. + * [smol](https://docs.rs/smol/latest/smol/) - simple and lightweight + +Several larger applications have their own runtimes. For example, +[Fuchsia](https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/src/lib/fuchsia-async/src/lib.rs) +already has one. + +
+ +* Note that of the listed runtimes, only Tokio is supported in the Rust + playground. The playground also does not permit any I/O, so most interesting + async things can't run in the playground. + +* Futures are "inert" in that they do not do anything (not even start an I/O + operation) unless there is an executor polling them. This differs from JS + Promises, for example, which will run to completion even if they are never + used. + +
diff --git a/src/async/tasks.md b/src/async/tasks.md new file mode 100644 index 000000000000..1011742c6223 --- /dev/null +++ b/src/async/tasks.md @@ -0,0 +1,60 @@ +# Tasks + +Runtimes have the concept of a "Task", similar to a thread but much +less resource-intensive. + +A Task has a single top-level Future which the executor polls to make progress. +That future may have one or more nested futures that its `poll` method polls, +corresponding loosely to a call stack. Concurrency is possible within a task by +polling multiple child futures, such as racing a timer and an I/O operation. + +```rust,editable,compile_fail +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> io::Result<()> { + let listener = TcpListener::bind("127.0.0.1:6142").await?; + println!("listening on port 6142"); + + loop { + let (mut socket, addr) = listener.accept().await?; + + println!("connection from {addr:?}"); + + tokio::spawn(async move { + if let Err(e) = socket.write_all(b"Who are you?\n").await { + println!("socket error: {e:?}"); + return; + } + + let mut buf = vec![0; 1024]; + let reply = match socket.read(&mut buf).await { + Ok(n) => { + let name = std::str::from_utf8(&buf[..n]).unwrap().trim(); + format!("Thanks for dialing in, {name}!\n") + } + Err(e) => { + println!("socket error: {e:?}"); + return; + } + }; + + if let Err(e) = socket.write_all(reply.as_bytes()).await { + println!("socket error: {e:?}"); + } + }); + } +} +``` + +
+ +Copy this example into your prepared `src/main.rs` and run it from there. + +* Ask students to visualize what the state of the example server would be with a + few connected clients. What tasks exist? What are their Futures? + +* Refactor the async block into a function, and improve the error handling using `?`. + +
diff --git a/src/running-the-course/day-4.md b/src/running-the-course/day-4.md index 0eb98dc57d85..ab21933cbf2d 100644 --- a/src/running-the-course/day-4.md +++ b/src/running-the-course/day-4.md @@ -23,5 +23,18 @@ Ensure that `adb sync` works with your emulator or real device and pre-build all Android examples using `src/android/build_all.sh`. Read the script to see the commands it runs and make sure they work when you run them by hand. +## Async + +If you chose Async for Day 4 afternoon, you will need a fresh crate set up and +the dependencies downloaded and ready to go. You can then copy/paste the +examples into `src/main.rs` to experiment with them. + +```shell +cargo init day4 +cd day4 +cargo add tokio --features full +cargo run +``` + [1]: https://source.android.com/docs/setup/download/downloading [2]: https://github.com/google/comprehensive-rust From cd17c6b80d2904b923d138310029fbf249600f81 Mon Sep 17 00:00:00 2001 From: sakex Date: Tue, 28 Mar 2023 18:51:40 +0200 Subject: [PATCH 05/10] Async chapter (#524) * Add async channels chapter * Async control flow * Async pitfalls * Separate in multiple chapters + add daemon section * Merge reentering threads in blocking-executor --- src/SUMMARY.md | 14 ++- src/async/channels.md | 36 ++++++++ src/async/control-flow.md | 9 ++ src/async/control-flow/daemon.md | 33 ++++++++ src/async/control-flow/join_all.md | 48 +++++++++++ src/async/control-flow/select.md | 63 ++++++++++++++ src/async/pitfalls.md | 8 ++ src/async/pitfalls/blocking-executor.md | 60 +++++++++++++ src/async/pitfalls/pin.md | 108 ++++++++++++++++++++++++ 9 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 src/async/channels.md create mode 100644 src/async/control-flow.md create mode 100644 src/async/control-flow/daemon.md create mode 100644 src/async/control-flow/join_all.md create mode 100644 src/async/control-flow/select.md create mode 100644 src/async/pitfalls.md create mode 100644 src/async/pitfalls/blocking-executor.md create mode 100644 src/async/pitfalls/pin.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 5a250477d1e7..7fb3dd1b80a3 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -232,9 +232,17 @@ - [Async](async.md) - [async/await](async/async-await.md) - [Async Blocks](async/async-blocks.md) -- [Futures](async/futures.md) -- [Runtimes](async/runtimes.md) -- [Tasks](async/tasks.md) + - [Futures](async/futures.md) + - [Runtimes](async/runtimes.md) + - [Tasks](async/tasks.md)ures](async/futures.md) + - [Async Channels](async/channels.md) + - [Futures Control Flow](async/control-flow.md) + - [Daemon](async/control-flow/daemon.md) + - [Join](async/control-flow/join_all.md) + - [Select](async/control-flow/select.md) + - [Pitfalls](async/pitfalls.md) + - [Blocking the executor](async/pitfalls/blocking-executor.md) + - [Pin](async/pitfalls/pin.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/channels.md b/src/async/channels.md new file mode 100644 index 000000000000..bd3df621360d --- /dev/null +++ b/src/async/channels.md @@ -0,0 +1,36 @@ +# Async Channels + +Multiple Channels crates have support for `async`/`await`. For instance `tokio` channels: + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; + +async fn ping_handler(mut input: Receiver<()>) { + let mut count: usize = 0; + + while let Some(_) = input.recv().await { + count += 1; + println!("Received {count} pings so far."); + } +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let ping_handler_task = tokio::spawn(ping_handler(receiver)); + for _ in 0..10 { + sender.send(()).await.expect("Failed to send ping."); + } + + std::mem::drop(sender); + ping_handler_task.await.expect("Something went wrong in ping handler task."); +} +``` + +
+ +- Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md). +- The `Flume` crate has channels that implement both `sync` and `async` `send` and `recv`. This can be convenient for complex application with both IO and heavy CPU processing tasks. +- What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow. + +
diff --git a/src/async/control-flow.md b/src/async/control-flow.md new file mode 100644 index 000000000000..b0158d87b04d --- /dev/null +++ b/src/async/control-flow.md @@ -0,0 +1,9 @@ +# Futures Control Flow + +Futures can be combined together to produce concurrent compute flow graphs. We will cover multiple common operations: + +---- + +- [Daemon](control-flow/daemon.md) +- [Join](control-flow/join_all.md) +- [Select](control-flow/select.md) diff --git a/src/async/control-flow/daemon.md b/src/async/control-flow/daemon.md new file mode 100644 index 000000000000..eeb0425f55e9 --- /dev/null +++ b/src/async/control-flow/daemon.md @@ -0,0 +1,33 @@ +# Daemon + +Tasks can be spawned without having to be awaited. They will be scheduled like any other tasks by the executor but won't block any running task. This can be useful for tasks that function like actors, receiving messages and sending messages to other tasks through channels. It can also be useful to log metrics or ping system's health. + +```rust,editable,compile_fail + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() { + let seconds_since_beginning = Arc::new(AtomicUsize::from(0)); + let counter = Arc::clone(&seconds_since_beginning); + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(1)).await; + counter.fetch_add(1, Ordering::SeqCst); + } + }); + + sleep(Duration::from_millis(4500)).await; + assert_eq!(seconds_since_beginning.load(Ordering::Relaxed), 4); +} + + +``` + +
+ +* It is good practice to make your deamons exit because some other blocking task might depend on them. Which would prevent your main thread from ever closing. You can use a `oneshot` channel to signal the task to terminate. You can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal. + +
diff --git a/src/async/control-flow/join_all.md b/src/async/control-flow/join_all.md new file mode 100644 index 000000000000..65dca5c65cc9 --- /dev/null +++ b/src/async/control-flow/join_all.md @@ -0,0 +1,48 @@ +# join_all + +Futures can be combined together to produce concurrent compute flow graphs. + +## Run a group of futures concurrently until they all resolve: `join_all` + +### Equivalents: + +- JS: `Promise.all` +- Python: `asyncio.gather` + +```rust,editable,compile_fail +use anyhow::Result; +use futures::future; +use reqwest; +use std::collections::HashMap; + +async fn size_of_page(url: &str) -> Result { + let resp = reqwest::get(url).await?; + Ok(resp.text().await?.len()) +} + +#[tokio::main] +async fn main() { + let urls: [&str; 4] = [ + "https://google.com", + "https://httpbin.org/ip", + "https://play.rust-lang.org/", + "BAD_URL", + ]; + let futures_iter = urls.into_iter().map(size_of_page); + let results = future::join_all(futures_iter).await; + let page_sizes_dict: HashMap<&str, Result> = + urls.into_iter().zip(results.into_iter()).collect(); + println!("{:?}", page_sizes_dict); +} +``` + +
+ +* `join_all` should soon be stabilized as part of the standard library in `std::future`. +* For multiple futures of disjoint types, you can use `join!` but you must know how many futures you will have at compile time. +* You can also combine `join_all` with `join!` for instance to join all requests to an http service as well as a database query. +* The risk of `join` is that one of the future could never resolve, this would cause your program to stall. +* Try adding a timeout to the future. + +
+ diff --git a/src/async/control-flow/select.md b/src/async/control-flow/select.md new file mode 100644 index 000000000000..6f53383be0d0 --- /dev/null +++ b/src/async/control-flow/select.md @@ -0,0 +1,63 @@ +# Select + +## Run multiple futures concurrently until the first one resolves + +### Equivalents: + +- JS: `Promise.race` +- Python: `asyncio.new_event_loop().run_until_complete(asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED))` + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +enum Animal { + Cat { name: String }, + Dog { name: String }, +} + +async fn first_animal_to_finish_race( + mut cat_rcv: Receiver, + mut dog_rcv: Receiver, +) -> Option { + tokio::select! { + cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }), + dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? }) + } +} + +#[tokio::main] +async fn main() { + let (cat_sender, cat_receiver) = mpsc::channel(32); + let (dog_sender, dog_receiver) = mpsc::channel(32); + tokio::spawn(async move { + sleep(Duration::from_secs(10)).await; + cat_sender + .send(String::from("Felix")) + .await + .expect("Failed to send cat."); + }); + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + dog_sender + .send(String::from("Rex")) + .await + .expect("Failed to send cat."); + }); + + let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) + .await + .expect("Failed to receive winner"); + + assert_eq!(winner, Animal::Dog {name: String::from("Rex")}); +} +``` + +
+ +* In this example, we have a race between a cat and a dog. `first_animal_to_finish_race` listens to both channels and will pick whichever arrives first. Since the dog takes 5 seconds, it wins against the cat that take 10 seconds. +* You can use `oneshot` channels in this example as the channels are supposed to receive only one `send`. +* You can try adding more contestants to the race and return a leaderboard. Also, you can add a deadline after which contestants get eliminated. + +
diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md new file mode 100644 index 000000000000..6fb8b1e73003 --- /dev/null +++ b/src/async/pitfalls.md @@ -0,0 +1,8 @@ +# Pitfalls of async/await + +Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter: + +--- + +- [Blocking the executor](pitfalls/blocking-executor.md) +- [Pin](pitfalls/pin.md) diff --git a/src/async/pitfalls/blocking-executor.md b/src/async/pitfalls/blocking-executor.md new file mode 100644 index 000000000000..8371855854ce --- /dev/null +++ b/src/async/pitfalls/blocking-executor.md @@ -0,0 +1,60 @@ +# Blocking the executor + +Most async runtimes only allow IO tasks to run concurrently. +This means that CPU blocking tasks will block the executor and prevent other tasks from being executed. +An easy workaround is to use async equivalent methods where possible. + +```rust,editable,compile_fail +use futures::future::join_all; +use std::time::Instant; + +// Uncomment to try with `spawn_blocking` around `std::thread::sleep`. +const USE_SPAWN_BLOCKING: bool = false; + +async fn std_sleep_ms(duration_ms: u64) { + if USE_SPAWN_BLOCKING { + tokio::task::spawn_blocking(move || { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + }) + .await + .unwrap(); + } else { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + } +} + +async fn tokio_sleep_ms(duration_ms: u64) { + tokio::time::sleep(tokio::time::Duration::from_millis(duration_ms)).await; +} + +// Single threaded executor for better reproducibility in runtime. +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() { + let std_sleep_futures = (1..=100).map(std_sleep_ms); + let tokio_sleep_futures = (1..=100).map(tokio_sleep_ms); + + let now = Instant::now(); + join_all(std_sleep_futures).await; + assert!(now.elapsed().as_millis() >= 5050); + + let now = Instant::now(); + join_all(tokio_sleep_futures).await; + let runtime = now.elapsed().as_millis(); + assert!((100..150).contains(&runtime)); +} + +``` + +
+ +- Using `std::thread::sleep` blocks the thread, so it prevents the executor from running. It means that while all futures are spawned at the same time, they all run one after the other. The runtime is the sum of all the `sleep` times. Try changing the runtime to `multi_thread` in a multi core environment to see how it impacts the run time. +- A simple fix is to use `tokio::time::sleep`. Now, the `sleep` calls are `async` and they are properly scheduled by the executor. +- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual thread and transforms its handle into a future without blocking the executor. This thread is also scheduled as part of the executor's threadpool to grant better performance. + +- You should not think of tasks as OS threads. They do not map 1 to 1 and most executors will allow many tasks to run on a single OS thread. This creates multiple gotchas: + - For instance, using `std::sync::mutex` in an `async` runtime is very dangerous. When you lock the mutex in a thread then yield the executor using `.await` the thread might try to lock the mutex once more in a different task. Hence, prefer `async` alternatives like `tokio::sync::mutex`. + - Thread-local storage should also be used with care in async contexts as it doesn't map to specific tasks. + - Device drivers sometimes map to specific OS threads (for instance CUDA.) Prefer `tokio::task::spawn_blocking` when dealing with those. + - Some C libraries rely on thread local storage as well. + +
diff --git a/src/async/pitfalls/pin.md b/src/async/pitfalls/pin.md new file mode 100644 index 000000000000..cd99897eb4e3 --- /dev/null +++ b/src/async/pitfalls/pin.md @@ -0,0 +1,108 @@ +# Pin + +When you await a future, you effectively move the whole stack frame from which you called `.await` to an internal data structure of your executor. If your future has pointers to data on the stack, the addresses might get invalidated. This is extremely unsafe. Therefore, you want to guarantee that the addresses your future point to don't change. That is why we need to `pin` futures. In most cases, you won't have to think about it when using futures from common libraries unless you use `select` in a loop (which is a pretty common use case). If, you implement your own future, you will likely run into this issue. + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, PartialEq)] +struct Runner { + name: String, +} + +async fn race_finish_line(mut rcv: Receiver, timeout: Duration) -> Option> { + let mut performances: Vec = Vec::new(); + let timeout_sleep = sleep(timeout); + // Pinning here allows us to await `timeout_sleep` multiple times. + tokio::pin!(timeout_sleep); + + loop { + tokio::select! { + // Rcv.recv() returns a new future every time, hence it does not need to be pinned. + name = rcv.recv() => performances.push(Runner { name: name? }), + _ = timeout_sleep.as_mut() => break + } + } + Some(performances) +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + + let names_and_time = [ + ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), + ]; + + let finish_line_future = race_finish_line(receiver, Duration::from_secs(6)); + + for (name, duration_secs) in names_and_time { + let sender = sender.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(duration_secs)).await; + sender.send(String::from(name)).await.expect("Failed to send runner"); + }); + } + + println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); + // [Runner { name: "Milo" }, Runner { name: "Oliver" }] +} +``` + + +
+ +* `tokio::pin!` only works on futures that implement `Unpin`. Other futures need to use `box::pin`. +* Another alternative is to not use `tokio::pin!` at all but spawn another task that will send to a `oneshot` channel after the end of the `sleep` call. + +```rust,editable,compile_fail +use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::{sleep, Duration}; +use tokio::sync::oneshot; + +#[derive(Debug, PartialEq)] +struct Runner { + name: String, +} + +async fn race_finish_line(mut rcv: Receiver, mut timeout: oneshot::Receiver<()>) -> Option> { + let mut performances: Vec = Vec::new(); + loop { + tokio::select! { + name = rcv.recv() => performances.push(Runner { name: name? }), + _ = &mut timeout => break + } + } + Some(performances) +} + +#[tokio::main] +async fn main() { + let (sender, receiver) = mpsc::channel(32); + let (os_sender, os_receiver) = oneshot::channel(); + + let names_and_time = [ + ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), + ]; + + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + os_sender.send(()).expect("Failed to send oneshot."); + }); + let finish_line_future = race_finish_line(receiver, os_receiver); + + for (name, duration_secs) in names_and_time { + let sender = sender.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(duration_secs)).await; + sender.send(String::from(name)).await.expect("Failed to send runner"); + }); + } + + println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); + // [Runner { name: "Milo" }, Runner { name: "Oliver" }] +} +``` + +
From 281a8e618422cd74a690433869c96a234d4ebd99 Mon Sep 17 00:00:00 2001 From: sakex Date: Mon, 3 Apr 2023 16:12:25 +0000 Subject: [PATCH 06/10] async_trait --- src/SUMMARY.md | 3 +- src/async/pitfalls.md | 3 +- src/async/pitfalls/async-traits.md | 50 ++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 src/async/pitfalls/async-traits.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 7fb3dd1b80a3..58c4c5ddea38 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -241,8 +241,9 @@ - [Join](async/control-flow/join_all.md) - [Select](async/control-flow/select.md) - [Pitfalls](async/pitfalls.md) - - [Blocking the executor](async/pitfalls/blocking-executor.md) + - [Blocking the Executor](async/pitfalls/blocking-executor.md) - [Pin](async/pitfalls/pin.md) + - [Async Traits](async/pitfalls/pin.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md index 6fb8b1e73003..f04c7886ec4c 100644 --- a/src/async/pitfalls.md +++ b/src/async/pitfalls.md @@ -4,5 +4,6 @@ Async / await provides convenient and efficient abstraction for concurrent async --- -- [Blocking the executor](pitfalls/blocking-executor.md) +- [Blocking the Executor](pitfalls/blocking-executor.md) - [Pin](pitfalls/pin.md) +- [Async Traits](pitfall/async-traits.md) diff --git a/src/async/pitfalls/async-traits.md b/src/async/pitfalls/async-traits.md new file mode 100644 index 000000000000..e2b08f240e03 --- /dev/null +++ b/src/async/pitfalls/async-traits.md @@ -0,0 +1,50 @@ +# Async Traits + +Async methods in traits are not yet supported in the stable channel ([An experimental feature exists in nightly and should be stabilized in the mid term.](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html)) + +The crate [async_trait](https://docs.rs/async-trait/latest/async_trait/) provides a workaround through a macro: + +```rust,editable,compile_fail +use async_trait::async_trait; +use tokio::time::{sleep, Duration}; + +#[async_trait] +trait Sleeper { + async fn sleep(&self); +} + +struct FixedSleeper { + sleep_ms: u64, +} + +#[async_trait] +impl Sleeper for FixedSleeper { + async fn sleep(&self) { + sleep(Duration::from_millis(self.sleep_ms)).await; + } +} + +async fn run_all_sleepers_multiple_times(sleepers: Vec>, n_times: usize) { + for _ in 0..n_times { + for sleeper in &sleepers { + sleeper.sleep().await; + } + } +} + +#[tokio::main] +async fn main() { + let sleepers: Vec> = vec![ + Box::new(FixedSleeper { sleep_ms: 50 }), + Box::new(FixedSleeper { sleep_ms: 100 }), + ]; + run_all_sleepers_multiple_times(sleepers, 5).await; +} +``` + +
+ +* Try creating a new sleeper struct that will sleep for a random amount of time and adding it to the Vec. +* Try making the `sleep` call mutable. +* Try adding an associated type for the return value that would return how much time was actually slept. +
\ No newline at end of file From 8e121bcdce101167994dc67abf3d7751c4396720 Mon Sep 17 00:00:00 2001 From: rbehjati Date: Tue, 4 Apr 2023 08:39:33 +0100 Subject: [PATCH 07/10] Async fixes (#546) --- src/SUMMARY.md | 2 +- src/async.md | 4 ++-- src/async/channels.md | 17 +++++++++++++---- src/async/control-flow.md | 3 ++- src/async/futures.md | 10 +++++----- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 58c4c5ddea38..f417f854b74b 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -243,7 +243,7 @@ - [Pitfalls](async/pitfalls.md) - [Blocking the Executor](async/pitfalls/blocking-executor.md) - [Pin](async/pitfalls/pin.md) - - [Async Traits](async/pitfalls/pin.md) + - [Async Traits](async/pitfalls/async-traits.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async.md b/src/async.md index cd2487dd522a..28d39cfd8b51 100644 --- a/src/async.md +++ b/src/async.md @@ -7,8 +7,8 @@ limited number of threads. This is because the per-task overhead is typically very low and operating systems provide primitives for efficiently identifying I/O that is able to proceed. -Rust's asynchronous operation is based around "futures", which represent work -that may be completed in the future. Futures are "polled" until they signal that +Rust's asynchronous operation is based on "futures", which represent work that +may be completed in the future. Futures are "polled" until they signal that they are complete. Futures are polled by an async runtime, and several different runtimes are diff --git a/src/async/channels.md b/src/async/channels.md index bd3df621360d..65b0754341c6 100644 --- a/src/async/channels.md +++ b/src/async/channels.md @@ -18,8 +18,9 @@ async fn ping_handler(mut input: Receiver<()>) { async fn main() { let (sender, receiver) = mpsc::channel(32); let ping_handler_task = tokio::spawn(ping_handler(receiver)); - for _ in 0..10 { + for i in 0..10 { sender.send(()).await.expect("Failed to send ping."); + println!("Sent {} pings so far.", i + 1); } std::mem::drop(sender); @@ -29,8 +30,16 @@ async fn main() {
-- Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md). -- The `Flume` crate has channels that implement both `sync` and `async` `send` and `recv`. This can be convenient for complex application with both IO and heavy CPU processing tasks. -- What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow. +* Change the channel size to `3` and see how it affects the execution. + +* Overall, the interface is similar to the `sync` channels as seen in the + [morning class](concurrency/channels.md). + +* The `Flume` crate has channels that implement both `sync` and `async` `send` + and `recv`. This can be convenient for complex application with both IO and + heavy CPU processing tasks. + +* What makes working with `async` channels preferable is the ability to combine + them with other `future`s to combine them and create complex control flow.
diff --git a/src/async/control-flow.md b/src/async/control-flow.md index b0158d87b04d..5e5b950e2cfb 100644 --- a/src/async/control-flow.md +++ b/src/async/control-flow.md @@ -1,6 +1,7 @@ # Futures Control Flow -Futures can be combined together to produce concurrent compute flow graphs. We will cover multiple common operations: +Futures can be combined together to produce concurrent compute flow graphs. We +will cover multiple common operations: ---- diff --git a/src/async/futures.md b/src/async/futures.md index 18ef41b80c12..595fb3e56aba 100644 --- a/src/async/futures.md +++ b/src/async/futures.md @@ -15,7 +15,11 @@ async fn count_to(count: i32) -> i32 { #[tokio::main] async fn main() { - let _: () = count_to(13); + println!("Final count is: {}!", count_to(13).await); + + // Uncomment the following line to see the return type of the async call. + // let _: () = count_to(13); + } ``` @@ -49,10 +53,6 @@ block to pause until that Future is ready, and then evaluates to its output.
-* Run the example and look at the error message. `_: () = ..` is a common - technique for getting the type of an expression. Try adding a `.await` in - `main`. - * The `Future` and `Poll` types are conceptually quite simple, and implemented as such in `std::task`. From 8dee189b9d91c70d75bc9d0b50ffe34c26b894b3 Mon Sep 17 00:00:00 2001 From: rbehjati Date: Wed, 5 Apr 2023 15:36:41 +0100 Subject: [PATCH 08/10] Async: some ideas for simplifying the content (#550) * Simplify the async-await slide * Shorten futures and move it up * Add a page on Tokio --- src/SUMMARY.md | 4 +-- src/async/async-await.md | 47 ++++++++++++++---------------- src/async/async-blocks.md | 35 ----------------------- src/async/control-flow/daemon.md | 8 +++++- src/async/futures.md | 23 --------------- src/async/runtimes/tokio.md | 49 ++++++++++++++++++++++++++++++++ 6 files changed, 80 insertions(+), 86 deletions(-) delete mode 100644 src/async/async-blocks.md create mode 100644 src/async/runtimes/tokio.md diff --git a/src/SUMMARY.md b/src/SUMMARY.md index f417f854b74b..942f934004df 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -231,10 +231,10 @@ - [Async](async.md) - [async/await](async/async-await.md) - - [Async Blocks](async/async-blocks.md) - [Futures](async/futures.md) - [Runtimes](async/runtimes.md) - - [Tasks](async/tasks.md)ures](async/futures.md) + - [Tokio](async/runtimes/tokio.md) + - [Tasks](async/tasks.md) - [Async Channels](async/channels.md) - [Futures Control Flow](async/control-flow.md) - [Daemon](async/control-flow/daemon.md) diff --git a/src/async/async-await.md b/src/async/async-await.md index b0a2e4278ba7..05c9bdc351e7 100644 --- a/src/async/async-await.md +++ b/src/async/async-await.md @@ -3,23 +3,22 @@ At a high level, async Rust code looks very much like "normal" sequential code: ```rust,editable,compile_fail -use tokio::time; +use futures::executor::block_on; async fn count_to(count: i32) { for i in 1..=count { - println!("Count in task: {i}!"); - time::sleep(time::Duration::from_millis(5)).await; + println!("Count is: {i}!"); } } -#[tokio::main] -async fn main() { - tokio::spawn(count_to(10)); +async fn async_main(count: i32) { + let future = count_to(count); + future.await; +} - for i in 1..5 { - println!("Main task: {i}"); - time::sleep(time::Duration::from_millis(5)).await; - } +fn main() { + let future = async_main(10); + block_on(future); } ``` @@ -27,26 +26,24 @@ async fn main() { Key points: -* Tokio is one of several async runtimes available for Rust. - -* The function is decorated with the "async" keyword to indicate that it is async. The - `tokio::main` macro invocation is a convenience to wrap the `main` function as a task. - -* The `spawn` function creates a new, concurrent "task", just like spawning a thread. +* Note that this is a simplified example to show the syntax. There is no long + running operation or any real concurrency in it! -* Whenever a task would block, we add an `.await` which returns control to the runtime until the - blocking operation is ready to proceed. +* What is the return type of an async call? + * Change `let future = async_main(10);` to `let future: () = async_main(10);` + to see the type. -Further exploration: +* The "async" keyword is syntactic sugar. The compiler replaces the return type. -* Why does `count_to` not (usually) get to 10? This is an example of async cancellation. - `tokio::spawn` returns a handle which can be awaited to wait until it finishes. +* You cannot make `main` async, without additional instructions to the compiler + on how to use the returned future. -* Try `count_to(10).await` instead of spawning. +* You need an executor to run async code. `block_on` blocks the current thread + until the provided future has run to completion. -* Try importing `tokio::join` and using it to join multiple handles. +* `.await` asynchronously waits for the completion of another operation. Unlike + `block_on`, `.await` doesn't block the current thread. -Note that the Rust playground does not allow network connections, so examples like making HTTP -requests are not possible. +* `.await` can only be used inside an `async` block.
diff --git a/src/async/async-blocks.md b/src/async/async-blocks.md deleted file mode 100644 index 8ec21a63df8e..000000000000 --- a/src/async/async-blocks.md +++ /dev/null @@ -1,35 +0,0 @@ -# Async Blocks - -Similar to closures, a snippet of async code can be included inline in another -function with an async block: - -```rust,editable,compile_fail -use tokio::{time, task}; - -#[tokio::main] -async fn main() { - let mut joinset = task::JoinSet::new(); - - for i in 1..5 { - joinset.spawn(async move { - println!("task {i} starting"); - time::sleep(time::Duration::from_millis(i)).await; - println!("task {i} done"); - format!("hello from task {i}") - }); - } - - while let Some(res) = joinset.join_next().await { - let greeting = res.unwrap(); - println!("task joined with result: {greeting}"); - } -} -``` - -
- -An async block is similar to a closure, but does not take any arguments. - -Its return value is a Future, which is described on the next slide. - -
diff --git a/src/async/control-flow/daemon.md b/src/async/control-flow/daemon.md index eeb0425f55e9..0347a69e73b8 100644 --- a/src/async/control-flow/daemon.md +++ b/src/async/control-flow/daemon.md @@ -28,6 +28,12 @@ async fn main() {
-* It is good practice to make your deamons exit because some other blocking task might depend on them. Which would prevent your main thread from ever closing. You can use a `oneshot` channel to signal the task to terminate. You can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal. +* An async block is similar to a closure, but does not take any arguments. Its + return value is a Future, similar to `async fn`. + +* It is good practice to make your deamons exit because some other blocking + task might depend on them. Which would prevent your main thread from ever + closing. You can use a `oneshot` channel to signal the task to terminate. You + can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal.
diff --git a/src/async/futures.md b/src/async/futures.md index 595fb3e56aba..ffe65e0cc896 100644 --- a/src/async/futures.md +++ b/src/async/futures.md @@ -1,28 +1,5 @@ # Futures -What is the type of an async operation? - -```rust,editable,compile_fail -use tokio::time; - -async fn count_to(count: i32) -> i32 { - for i in 1..=count { - println!("Count in task: {i}!"); - time::sleep(time::Duration::from_millis(5)).await; - } - count -} - -#[tokio::main] -async fn main() { - println!("Final count is: {}!", count_to(13).await); - - // Uncomment the following line to see the return type of the async call. - // let _: () = count_to(13); - -} -``` - [Future](https://doc.rust-lang.org/nightly/src/core/future/future.rs.html#37) is a trait, implemented by objects that represent an operation that may not be complete yet. A future can be polled, and `poll` returns either diff --git a/src/async/runtimes/tokio.md b/src/async/runtimes/tokio.md new file mode 100644 index 000000000000..3ed208b1f628 --- /dev/null +++ b/src/async/runtimes/tokio.md @@ -0,0 +1,49 @@ +# Tokio + + +Tokio provides: + +* A multi-threaded runtime for executing asynchronous code. +* An asynchronous version of the standard library. +* A large ecosystem of libraries. + +```rust,editable,compile_fail +use tokio::time; + +async fn count_to(count: i32) { + for i in 1..=count { + println!("Count in task: {i}!"); + time::sleep(time::Duration::from_millis(5)).await; + } +} + +#[tokio::main] +async fn main() { + tokio::spawn(count_to(10)); + + for i in 1..5 { + println!("Main task: {i}"); + time::sleep(time::Duration::from_millis(5)).await; + } +} +``` + +
+ +* With the `tokio::main` macro we can now make `main` async. + +* The `spawn` function creates a new, concurrent "task". + +* Note: `spawn` takes a `Future`, you don't call `.await` on `count_to`. + +**Further exploration:** + +* Why does `count_to` not (usually) get to 10? This is an example of async + cancellation. `tokio::spawn` returns a handle which can be awaited to wait + until it finishes. + +* Try `count_to(10).await` instead of spawning. + +* Try importing `tokio::join` and using it to join multiple handles. + +
From 2db1f14c85d424de1bcb0d5359b7073626014c8e Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 6 Apr 2023 12:30:06 -0400 Subject: [PATCH 09/10] Modifications to the async section (#556) * Modifications to the async section * Remove the "Daemon" slide, as it largely duplicates the "Tasks" slide. The introduction to the "Control Flow" section mentions tasks as a kind of control flow. * Reorganize the structure in SUMMARY.md to correspond to the directory structure. * Simplify the "Pin" and "Blocking the Executor" slides with steps in the speaker notes to demonstrate / fix the issues. * Rename "join_all" to "Join". * Simplify some code samples to shorten them, and to print output rather than asserting. * Clarify speaker notes and include more "Try.." suggestions. * Be consistent about where `async` blocks are introduced (in the "Tasks" slide). * Explain `join` and `select` in prose. * Fix formatting of section-header slides. --- src/SUMMARY.md | 17 +- src/async/async-await.md | 15 +- src/async/channels.md | 12 +- src/async/control-flow.md | 7 +- src/async/control-flow/daemon.md | 39 ----- .../control-flow/{join_all.md => join.md} | 31 ++-- src/async/control-flow/select.md | 36 ++-- src/async/futures.md | 21 ++- src/async/pitfalls.md | 2 - src/async/pitfalls/async-traits.md | 14 +- src/async/pitfalls/blocking-executor.md | 66 ++++---- src/async/pitfalls/pin.md | 158 +++++++++--------- src/async/runtimes/tokio.md | 2 +- src/async/tasks.md | 8 +- 14 files changed, 202 insertions(+), 226 deletions(-) delete mode 100644 src/async/control-flow/daemon.md rename src/async/control-flow/{join_all.md => join.md} (52%) diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 942f934004df..da1c891499ad 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -229,21 +229,20 @@ ---- -- [Async](async.md) +- [Async Basics](async.md) - [async/await](async/async-await.md) - [Futures](async/futures.md) - [Runtimes](async/runtimes.md) - [Tokio](async/runtimes/tokio.md) - [Tasks](async/tasks.md) - [Async Channels](async/channels.md) - - [Futures Control Flow](async/control-flow.md) - - [Daemon](async/control-flow/daemon.md) - - [Join](async/control-flow/join_all.md) - - [Select](async/control-flow/select.md) - - [Pitfalls](async/pitfalls.md) - - [Blocking the Executor](async/pitfalls/blocking-executor.md) - - [Pin](async/pitfalls/pin.md) - - [Async Traits](async/pitfalls/async-traits.md) +- [Control Flow](async/control-flow.md) + - [Join](async/control-flow/join.md) + - [Select](async/control-flow/select.md) +- [Pitfalls](async/pitfalls.md) + - [Blocking the Executor](async/pitfalls/blocking-executor.md) + - [Pin](async/pitfalls/pin.md) + - [Async Traits](async/pitfalls/async-traits.md) - [Exercises](exercises/day-4/async.md) # Final Words diff --git a/src/async/async-await.md b/src/async/async-await.md index 05c9bdc351e7..790603385456 100644 --- a/src/async/async-await.md +++ b/src/async/async-await.md @@ -12,13 +12,11 @@ async fn count_to(count: i32) { } async fn async_main(count: i32) { - let future = count_to(count); - future.await; + count_to(count).await; } fn main() { - let future = async_main(10); - block_on(future); + block_on(async_main(10)); } ``` @@ -30,10 +28,10 @@ Key points: running operation or any real concurrency in it! * What is the return type of an async call? - * Change `let future = async_main(10);` to `let future: () = async_main(10);` - to see the type. + * Use `let future: () = async_main(10);` in `main` to see the type. -* The "async" keyword is syntactic sugar. The compiler replaces the return type. +* The "async" keyword is syntactic sugar. The compiler replaces the return type + with a future. * You cannot make `main` async, without additional instructions to the compiler on how to use the returned future. @@ -44,6 +42,7 @@ Key points: * `.await` asynchronously waits for the completion of another operation. Unlike `block_on`, `.await` doesn't block the current thread. -* `.await` can only be used inside an `async` block. +* `.await` can only be used inside an `async` function (or block; these are + introduced later).
diff --git a/src/async/channels.md b/src/async/channels.md index 65b0754341c6..9e0a5bf8177b 100644 --- a/src/async/channels.md +++ b/src/async/channels.md @@ -1,6 +1,6 @@ # Async Channels -Multiple Channels crates have support for `async`/`await`. For instance `tokio` channels: +Several crates have support for `async`/`await`. For instance `tokio` channels: ```rust,editable,compile_fail use tokio::sync::mpsc::{self, Receiver}; @@ -12,6 +12,8 @@ async fn ping_handler(mut input: Receiver<()>) { count += 1; println!("Received {count} pings so far."); } + + println!("ping_handler complete"); } #[tokio::main] @@ -35,9 +37,11 @@ async fn main() { * Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md). -* The `Flume` crate has channels that implement both `sync` and `async` `send` - and `recv`. This can be convenient for complex application with both IO and - heavy CPU processing tasks. +* Try removing the `std::mem::drop` call. What happens? Why? + +* The [Flume](https://docs.rs/flume/latest/flume/) crate has channels that + implement both `sync` and `async` `send` and `recv`. This can be convenient + for complex applications with both IO and heavy CPU processing tasks. * What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow. diff --git a/src/async/control-flow.md b/src/async/control-flow.md index 5e5b950e2cfb..69b1d03e9371 100644 --- a/src/async/control-flow.md +++ b/src/async/control-flow.md @@ -1,10 +1,7 @@ # Futures Control Flow Futures can be combined together to produce concurrent compute flow graphs. We -will cover multiple common operations: +have already seen tasks, that function as independent threads of execution. ----- - -- [Daemon](control-flow/daemon.md) -- [Join](control-flow/join_all.md) +- [Join](control-flow/join.md) - [Select](control-flow/select.md) diff --git a/src/async/control-flow/daemon.md b/src/async/control-flow/daemon.md deleted file mode 100644 index 0347a69e73b8..000000000000 --- a/src/async/control-flow/daemon.md +++ /dev/null @@ -1,39 +0,0 @@ -# Daemon - -Tasks can be spawned without having to be awaited. They will be scheduled like any other tasks by the executor but won't block any running task. This can be useful for tasks that function like actors, receiving messages and sending messages to other tasks through channels. It can also be useful to log metrics or ping system's health. - -```rust,editable,compile_fail - -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use tokio::time::{sleep, Duration}; - -#[tokio::main] -async fn main() { - let seconds_since_beginning = Arc::new(AtomicUsize::from(0)); - let counter = Arc::clone(&seconds_since_beginning); - tokio::spawn(async move { - loop { - sleep(Duration::from_secs(1)).await; - counter.fetch_add(1, Ordering::SeqCst); - } - }); - - sleep(Duration::from_millis(4500)).await; - assert_eq!(seconds_since_beginning.load(Ordering::Relaxed), 4); -} - - -``` - -
- -* An async block is similar to a closure, but does not take any arguments. Its - return value is a Future, similar to `async fn`. - -* It is good practice to make your deamons exit because some other blocking - task might depend on them. Which would prevent your main thread from ever - closing. You can use a `oneshot` channel to signal the task to terminate. You - can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal. - -
diff --git a/src/async/control-flow/join_all.md b/src/async/control-flow/join.md similarity index 52% rename from src/async/control-flow/join_all.md rename to src/async/control-flow/join.md index 65dca5c65cc9..71db9a0e9d13 100644 --- a/src/async/control-flow/join_all.md +++ b/src/async/control-flow/join.md @@ -1,13 +1,8 @@ -# join_all +# Join -Futures can be combined together to produce concurrent compute flow graphs. - -## Run a group of futures concurrently until they all resolve: `join_all` - -### Equivalents: - -- JS: `Promise.all` -- Python: `asyncio.gather` +A join operation waits until all of a set of futures are ready, and +returns a collection of their results. This is similar to `Promise.all` in +JavaScript or `asyncio.gather` in Python. ```rust,editable,compile_fail use anyhow::Result; @@ -38,11 +33,19 @@ async fn main() {
-* `join_all` should soon be stabilized as part of the standard library in `std::future`. -* For multiple futures of disjoint types, you can use `join!` but you must know how many futures you will have at compile time. -* You can also combine `join_all` with `join!` for instance to join all requests to an http service as well as a database query. -* The risk of `join` is that one of the future could never resolve, this would cause your program to stall. -* Try adding a timeout to the future. +Copy this example into your prepared `src/main.rs` and run it from there. + +* For multiple futures of disjoint types, you can use `std::future::join!` but + you must know how many futures you will have at compile time. This is + currently in the `futures` crate, soon to be stabilised in `std::future`. + +* The risk of `join` is that one of the futures may never resolve, this would + cause your program to stall. + +* You can also combine `join_all` with `join!` for instance to join all requests + to an http service as well as a database query. + +* Try adding a timeout to the future, using `futures::join!`.
diff --git a/src/async/control-flow/select.md b/src/async/control-flow/select.md index 6f53383be0d0..7e416ea14deb 100644 --- a/src/async/control-flow/select.md +++ b/src/async/control-flow/select.md @@ -1,11 +1,13 @@ # Select -## Run multiple futures concurrently until the first one resolves +A select operation waits until any of a set of futures is ready, and responds to +that future's result. In JavaScript, this is similar to `Promise.race`. In +Python, it compares to `asyncio.wait(task_set, +return_when=asyncio.FIRST_COMPLETED)`. -### Equivalents: - -- JS: `Promise.race` -- Python: `asyncio.new_event_loop().run_until_complete(asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED))` +This is usually a macro, similar to match, with each arm of the form `pattern = +future => statement`. When the future is ready, the statement is executed with the +variable bound to the future's result. ```rust,editable,compile_fail use tokio::sync::mpsc::{self, Receiver}; @@ -32,32 +34,42 @@ async fn main() { let (cat_sender, cat_receiver) = mpsc::channel(32); let (dog_sender, dog_receiver) = mpsc::channel(32); tokio::spawn(async move { - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_millis(500)).await; cat_sender .send(String::from("Felix")) .await .expect("Failed to send cat."); }); tokio::spawn(async move { - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_millis(50)).await; dog_sender .send(String::from("Rex")) .await - .expect("Failed to send cat."); + .expect("Failed to send dog."); }); let winner = first_animal_to_finish_race(cat_receiver, dog_receiver) .await .expect("Failed to receive winner"); - assert_eq!(winner, Animal::Dog {name: String::from("Rex")}); + println!("Winner is {winner:?}"); } ```
-* In this example, we have a race between a cat and a dog. `first_animal_to_finish_race` listens to both channels and will pick whichever arrives first. Since the dog takes 5 seconds, it wins against the cat that take 10 seconds. -* You can use `oneshot` channels in this example as the channels are supposed to receive only one `send`. -* You can try adding more contestants to the race and return a leaderboard. Also, you can add a deadline after which contestants get eliminated. +* In this example, we have a race between a cat and a dog. + `first_animal_to_finish_race` listens to both channels and will pick whichever + arrives first. Since the dog takes 50ms, it wins against the cat that + take 500ms seconds. + +* You can use `oneshot` channels in this example as the channels are supposed to + receive only one `send`. + +* Try adding a deadline to the race, demonstrating selecting different sorts of + futures. + +* Note that `select!` consumes the futures it is given, and is easiest to use + when every execution of `select!` creates new futures.
diff --git a/src/async/futures.md b/src/async/futures.md index ffe65e0cc896..d9d4347c667e 100644 --- a/src/async/futures.md +++ b/src/async/futures.md @@ -1,9 +1,9 @@ # Futures -[Future](https://doc.rust-lang.org/nightly/src/core/future/future.rs.html#37) +[`Future`](https://doc.rust-lang.org/std/future/trait.Future.html) is a trait, implemented by objects that represent an operation that may not be -complete yet. A future can be polled, and `poll` returns either -`Poll::Ready(result)` or `Poll::Pending`. +complete yet. A future can be polled, and `poll` returns a +[`Poll`](https://doc.rust-lang.org/std/task/enum.Poll.html). ```rust use std::pin::Pin; @@ -20,18 +20,17 @@ pub enum Poll { } ``` -An async function returns an `impl Future`, and an async block evaluates to an -`impl Future`. It's also possible (but uncommon) to implement `Future` for your -own types. For example, the `JoinHandle` returned from `tokio::spawn` implements -`Future` to allow joining to it. +An async function returns an `impl Future`. It's also possible (but uncommon) to +implement `Future` for your own types. For example, the `JoinHandle` returned +from `tokio::spawn` implements `Future` to allow joining to it. -The `.await` keyword, applied to a Future, causes the current async function or -block to pause until that Future is ready, and then evaluates to its output. +The `.await` keyword, applied to a Future, causes the current async function to +pause until that Future is ready, and then evaluates to its output.
-* The `Future` and `Poll` types are conceptually quite simple, and implemented as - such in `std::task`. +* The `Future` and `Poll` types are implemented exactly as shown; click the + links to show the implementations in the docs. * We will not get to `Pin` and `Context`, as we will focus on writing async code, rather than building new async primitives. Briefly: diff --git a/src/async/pitfalls.md b/src/async/pitfalls.md index f04c7886ec4c..a8ae89e299af 100644 --- a/src/async/pitfalls.md +++ b/src/async/pitfalls.md @@ -2,8 +2,6 @@ Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter: ---- - - [Blocking the Executor](pitfalls/blocking-executor.md) - [Pin](pitfalls/pin.md) - [Async Traits](pitfall/async-traits.md) diff --git a/src/async/pitfalls/async-traits.md b/src/async/pitfalls/async-traits.md index e2b08f240e03..69aeacca8dd0 100644 --- a/src/async/pitfalls/async-traits.md +++ b/src/async/pitfalls/async-traits.md @@ -6,6 +6,7 @@ The crate [async_trait](https://docs.rs/async-trait/latest/async_trait/) provide ```rust,editable,compile_fail use async_trait::async_trait; +use std::time::Instant; use tokio::time::{sleep, Duration}; #[async_trait] @@ -26,8 +27,11 @@ impl Sleeper for FixedSleeper { async fn run_all_sleepers_multiple_times(sleepers: Vec>, n_times: usize) { for _ in 0..n_times { + println!("running all sleepers.."); for sleeper in &sleepers { + let start = Instant::now(); sleeper.sleep().await; + println!("slept for {}ms", start.elapsed().as_millis()); } } } @@ -44,7 +48,11 @@ async fn main() {
+* `async_trait` is easy to use, but note that it's using heap allocations to + achieve this, which has performance implications. + * Try creating a new sleeper struct that will sleep for a random amount of time and adding it to the Vec. -* Try making the `sleep` call mutable. -* Try adding an associated type for the return value that would return how much time was actually slept. -
\ No newline at end of file + +* Try making the `sleep` call take `&mut self`. + +
diff --git a/src/async/pitfalls/blocking-executor.md b/src/async/pitfalls/blocking-executor.md index 8371855854ce..cfbfb4675fd4 100644 --- a/src/async/pitfalls/blocking-executor.md +++ b/src/async/pitfalls/blocking-executor.md @@ -8,53 +8,43 @@ An easy workaround is to use async equivalent methods where possible. use futures::future::join_all; use std::time::Instant; -// Uncomment to try with `spawn_blocking` around `std::thread::sleep`. -const USE_SPAWN_BLOCKING: bool = false; - -async fn std_sleep_ms(duration_ms: u64) { - if USE_SPAWN_BLOCKING { - tokio::task::spawn_blocking(move || { - std::thread::sleep(std::time::Duration::from_millis(duration_ms)); - }) - .await - .unwrap(); - } else { - std::thread::sleep(std::time::Duration::from_millis(duration_ms)); - } +async fn sleep_ms(start: &Instant, id: u64, duration_ms: u64) { + std::thread::sleep(std::time::Duration::from_millis(duration_ms)); + println!( + "future {id} slept for {duration_ms}ms, finished after {}ms", + start.elapsed().as_millis() + ); } -async fn tokio_sleep_ms(duration_ms: u64) { - tokio::time::sleep(tokio::time::Duration::from_millis(duration_ms)).await; +#[tokio::main(flavor = "current_thread")] +async fn main() { + let start = Instant::now(); + let sleep_futures = (1..=10).map(|t| sleep_ms(&start, t, t * 10)); + join_all(sleep_futures).await; } +``` -// Single threaded executor for better reproducibility in runtime. -#[tokio::main(flavor = "multi_thread", worker_threads = 10)] -async fn main() { - let std_sleep_futures = (1..=100).map(std_sleep_ms); - let tokio_sleep_futures = (1..=100).map(tokio_sleep_ms); +
- let now = Instant::now(); - join_all(std_sleep_futures).await; - assert!(now.elapsed().as_millis() >= 5050); +* Run the code and see that the sleeps happen consecutively rather than + concurrently. - let now = Instant::now(); - join_all(tokio_sleep_futures).await; - let runtime = now.elapsed().as_millis(); - assert!((100..150).contains(&runtime)); -} +* The `"current_thread"` flavor puts all tasks on a single thread. This makes the + effect more obvious, but the bug is still present in the multi-threaded + flavor. -``` +* Switch the `std::thread::sleep` to `tokio::time::sleep` and await its result. -
+* Another fix would be to `tokio::task::spawn_blocking` which spawns an actual + thread and transforms its handle into a future without blocking the executor. -- Using `std::thread::sleep` blocks the thread, so it prevents the executor from running. It means that while all futures are spawned at the same time, they all run one after the other. The runtime is the sum of all the `sleep` times. Try changing the runtime to `multi_thread` in a multi core environment to see how it impacts the run time. -- A simple fix is to use `tokio::time::sleep`. Now, the `sleep` calls are `async` and they are properly scheduled by the executor. -- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual thread and transforms its handle into a future without blocking the executor. This thread is also scheduled as part of the executor's threadpool to grant better performance. +* You should not think of tasks as OS threads. They do not map 1 to 1 and most + executors will allow many tasks to run on a single OS thread. This is + particularly problematic when interacting with other libraries via FFI, where + that library might depend on thread-local storage or map to specific OS + threads (e.g., CUDA). Prefer `tokio::task::spawn_blocking` in such situations. -- You should not think of tasks as OS threads. They do not map 1 to 1 and most executors will allow many tasks to run on a single OS thread. This creates multiple gotchas: - - For instance, using `std::sync::mutex` in an `async` runtime is very dangerous. When you lock the mutex in a thread then yield the executor using `.await` the thread might try to lock the mutex once more in a different task. Hence, prefer `async` alternatives like `tokio::sync::mutex`. - - Thread-local storage should also be used with care in async contexts as it doesn't map to specific tasks. - - Device drivers sometimes map to specific OS threads (for instance CUDA.) Prefer `tokio::task::spawn_blocking` when dealing with those. - - Some C libraries rely on thread local storage as well. +* Use sync mutexes with care. Holding a mutex over an `.await` may cause another + task to block, and that task may be running on the same thread.
diff --git a/src/async/pitfalls/pin.md b/src/async/pitfalls/pin.md index cd99897eb4e3..455e19ee7232 100644 --- a/src/async/pitfalls/pin.md +++ b/src/async/pitfalls/pin.md @@ -1,108 +1,110 @@ # Pin -When you await a future, you effectively move the whole stack frame from which you called `.await` to an internal data structure of your executor. If your future has pointers to data on the stack, the addresses might get invalidated. This is extremely unsafe. Therefore, you want to guarantee that the addresses your future point to don't change. That is why we need to `pin` futures. In most cases, you won't have to think about it when using futures from common libraries unless you use `select` in a loop (which is a pretty common use case). If, you implement your own future, you will likely run into this issue. +When you await a future, all local variables (that would ordinarily be stored on +a stack frame) are instead stored in the Future for the current async block. If your +future has pointers to data on the stack, those pointers might get invalidated. +This is unsafe. + +Therefore, you must guarantee that the addresses your future points to don't +change. That is why we need to `pin` futures. Using the same future repeatedly +in a `select!` often leads to issues with pinned values. ```rust,editable,compile_fail -use tokio::sync::mpsc::{self, Receiver}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::spawn; use tokio::time::{sleep, Duration}; -#[derive(Debug, PartialEq)] -struct Runner { - name: String, +// A work item. In this case, just sleep for the given time and respond +// with a message on the `respond_on` channel. +#[derive(Debug)] +struct Work { + input: u32, + respond_on: oneshot::Sender, } -async fn race_finish_line(mut rcv: Receiver, timeout: Duration) -> Option> { - let mut performances: Vec = Vec::new(); - let timeout_sleep = sleep(timeout); - // Pinning here allows us to await `timeout_sleep` multiple times. - tokio::pin!(timeout_sleep); - +// A worker which listens for work on a queue and performs it. +async fn worker(mut work_queue: mpsc::Receiver) { + let mut iterations = 0; loop { tokio::select! { - // Rcv.recv() returns a new future every time, hence it does not need to be pinned. - name = rcv.recv() => performances.push(Runner { name: name? }), - _ = timeout_sleep.as_mut() => break + Some(work) = work_queue.recv() => { + sleep(Duration::from_millis(10)).await; // Pretend to work. + work.respond_on + .send(work.input * 1000) + .expect("failed to send response"); + iterations += 1; + } + // TODO: report number of iterations every 100ms } } - Some(performances) +} + +// A requester which requests work and waits for it to complete. +async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 { + let (tx, rx) = oneshot::channel(); + work_queue + .send(Work { + input, + respond_on: tx, + }) + .await + .expect("failed to send on work queue"); + rx.await.expect("failed waiting for response") } #[tokio::main] async fn main() { - let (sender, receiver) = mpsc::channel(32); - - let names_and_time = [ - ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), - ]; - - let finish_line_future = race_finish_line(receiver, Duration::from_secs(6)); - - for (name, duration_secs) in names_and_time { - let sender = sender.clone(); - tokio::spawn(async move { - sleep(Duration::from_secs(duration_secs)).await; - sender.send(String::from(name)).await.expect("Failed to send runner"); - }); + let (tx, rx) = mpsc::channel(10); + spawn(worker(rx)); + for i in 0..100 { + let resp = do_work(&tx, i).await; + println!("work result for iteration {i}: {resp}"); } - - println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); - // [Runner { name: "Milo" }, Runner { name: "Oliver" }] } ``` -
-* `tokio::pin!` only works on futures that implement `Unpin`. Other futures need to use `box::pin`. -* Another alternative is to not use `tokio::pin!` at all but spawn another task that will send to a `oneshot` channel after the end of the `sleep` call. +* You may recognize this as an example of the actor pattern. Actors + typically call `select!` in a loop. -```rust,editable,compile_fail -use tokio::sync::mpsc::{self, Receiver}; -use tokio::time::{sleep, Duration}; -use tokio::sync::oneshot; +* This serves as a summation of a few of the previous lessons, so take your time + with it. -#[derive(Debug, PartialEq)] -struct Runner { - name: String, -} + * Naively add a `_ = sleep(Duration::from_millis(100)) => { println!(..) }` + to the `select!`. This will never execute. Why? -async fn race_finish_line(mut rcv: Receiver, mut timeout: oneshot::Receiver<()>) -> Option> { - let mut performances: Vec = Vec::new(); - loop { - tokio::select! { - name = rcv.recv() => performances.push(Runner { name: name? }), - _ = &mut timeout => break + * Instead, add a `timeout_fut` containing that future outside of the `loop`: + + ```rust,compile_fail + let mut timeout_fut = sleep(Duration::from_millis(100)); + loop { + select! { + .., + _ = timeout_fut => { println!(..); }, + } } - } - Some(performances) -} + ``` + * This still doesn't work. Follow the compiler errors, adding `&mut` to the + `timeout_fut` in the `select!` to work around the move, then using + `Box::pin`: + + ```rust,compile_fail + let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100))); + loop { + select! { + .., + _ = &mut timeout_fut => { println!(..); }, + } + } + ``` -#[tokio::main] -async fn main() { - let (sender, receiver) = mpsc::channel(32); - let (os_sender, os_receiver) = oneshot::channel(); - - let names_and_time = [ - ("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11), - ]; - - tokio::spawn(async move { - sleep(Duration::from_secs(5)).await; - os_sender.send(()).expect("Failed to send oneshot."); - }); - let finish_line_future = race_finish_line(receiver, os_receiver); - - for (name, duration_secs) in names_and_time { - let sender = sender.clone(); - tokio::spawn(async move { - sleep(Duration::from_secs(duration_secs)).await; - sender.send(String::from(name)).await.expect("Failed to send runner"); - }); - } + * This compiles, but once the timeout expires it is `Poll::Ready` on every + iteration (a fused future would help with this). Update to reset + `timeout_fut` every time it expires. - println!("{:?}", finish_line_future.await.expect("Failed to collect finish line")); - // [Runner { name: "Milo" }, Runner { name: "Oliver" }] -} -``` +* Box allocates on the heap. In some cases, `tokio::pin!` is also an option, but + that is difficult to use for a future that is reassigned. +* Another alternative is to not use `pin` at all but spawn another task that will send to a `oneshot` channel every 100ms.
diff --git a/src/async/runtimes/tokio.md b/src/async/runtimes/tokio.md index 3ed208b1f628..c9dfbd485e2b 100644 --- a/src/async/runtimes/tokio.md +++ b/src/async/runtimes/tokio.md @@ -44,6 +44,6 @@ async fn main() { * Try `count_to(10).await` instead of spawning. -* Try importing `tokio::join` and using it to join multiple handles. +* Try awaiting the task returned from `tokio::spawn`.
diff --git a/src/async/tasks.md b/src/async/tasks.md index 1011742c6223..e00499f8219a 100644 --- a/src/async/tasks.md +++ b/src/async/tasks.md @@ -5,10 +5,10 @@ less resource-intensive. A Task has a single top-level Future which the executor polls to make progress. That future may have one or more nested futures that its `poll` method polls, -corresponding loosely to a call stack. Concurrency is possible within a task by +corresponding loosely to a call stack. Concurrency within a task is possible by polling multiple child futures, such as racing a timer and an I/O operation. -```rust,editable,compile_fail +```rust,compile_fail use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; @@ -55,6 +55,10 @@ Copy this example into your prepared `src/main.rs` and run it from there. * Ask students to visualize what the state of the example server would be with a few connected clients. What tasks exist? What are their Futures? +* This is the first time we've seen an `async` block. This is similar to a + closure, but does not take any arguments. Its return value is a Future, + similar to an `async fn`. + * Refactor the async block into a function, and improve the error handling using `?`.
From 27135e1a56580f62fa929663c11f37c9e5da4d16 Mon Sep 17 00:00:00 2001 From: rbehjati Date: Tue, 11 Apr 2023 11:13:07 +0100 Subject: [PATCH 10/10] Add a note on async trait (#558) --- src/async/pitfalls/async-traits.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/async/pitfalls/async-traits.md b/src/async/pitfalls/async-traits.md index 69aeacca8dd0..93e9ef8aa322 100644 --- a/src/async/pitfalls/async-traits.md +++ b/src/async/pitfalls/async-traits.md @@ -48,10 +48,16 @@ async fn main() {
+* The difficulty with `async trait` is in that the resulting `Future` does not + have a size known at compile time, because the size of the `Future` depends + on the implementation. + * `async_trait` is easy to use, but note that it's using heap allocations to - achieve this, which has performance implications. + achieve this, and solve the unknow size problem above. This heap allocation + has performance overhead. -* Try creating a new sleeper struct that will sleep for a random amount of time and adding it to the Vec. +* Try creating a new sleeper struct that will sleep for a random amount of time + and adding it to the Vec. * Try making the `sleep` call take `&mut self`.