Skip to content

Commit cd17c6b

Browse files
authored
Async chapter (#524)
* Add async channels chapter * Async control flow * Async pitfalls * Separate in multiple chapters + add daemon section * Merge reentering threads in blocking-executor
1 parent 1cff6e3 commit cd17c6b

File tree

9 files changed

+376
-3
lines changed

9 files changed

+376
-3
lines changed

src/SUMMARY.md

+11-3
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,17 @@
232232
- [Async](async.md)
233233
- [async/await](async/async-await.md)
234234
- [Async Blocks](async/async-blocks.md)
235-
- [Futures](async/futures.md)
236-
- [Runtimes](async/runtimes.md)
237-
- [Tasks](async/tasks.md)
235+
- [Futures](async/futures.md)
236+
- [Runtimes](async/runtimes.md)
237+
- [Tasks](async/tasks.md)ures](async/futures.md)
238+
- [Async Channels](async/channels.md)
239+
- [Futures Control Flow](async/control-flow.md)
240+
- [Daemon](async/control-flow/daemon.md)
241+
- [Join](async/control-flow/join_all.md)
242+
- [Select](async/control-flow/select.md)
243+
- [Pitfalls](async/pitfalls.md)
244+
- [Blocking the executor](async/pitfalls/blocking-executor.md)
245+
- [Pin](async/pitfalls/pin.md)
238246
- [Exercises](exercises/day-4/async.md)
239247

240248
# Final Words

src/async/channels.md

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Async Channels
2+
3+
Multiple Channels crates have support for `async`/`await`. For instance `tokio` channels:
4+
5+
```rust,editable,compile_fail
6+
use tokio::sync::mpsc::{self, Receiver};
7+
8+
async fn ping_handler(mut input: Receiver<()>) {
9+
let mut count: usize = 0;
10+
11+
while let Some(_) = input.recv().await {
12+
count += 1;
13+
println!("Received {count} pings so far.");
14+
}
15+
}
16+
17+
#[tokio::main]
18+
async fn main() {
19+
let (sender, receiver) = mpsc::channel(32);
20+
let ping_handler_task = tokio::spawn(ping_handler(receiver));
21+
for _ in 0..10 {
22+
sender.send(()).await.expect("Failed to send ping.");
23+
}
24+
25+
std::mem::drop(sender);
26+
ping_handler_task.await.expect("Something went wrong in ping handler task.");
27+
}
28+
```
29+
30+
<details>
31+
32+
- Overall, the interface is similar to the `sync` channels as seen in the [morning class](concurrency/channels.md).
33+
- The `Flume` crate has channels that implement both `sync` and `async` `send` and `recv`. This can be convenient for complex application with both IO and heavy CPU processing tasks.
34+
- What makes working with `async` channels preferable is the ability to combine them with other `future`s to combine them and create complex control flow.
35+
36+
</details>

src/async/control-flow.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Futures Control Flow
2+
3+
Futures can be combined together to produce concurrent compute flow graphs. We will cover multiple common operations:
4+
5+
----
6+
7+
- [Daemon](control-flow/daemon.md)
8+
- [Join](control-flow/join_all.md)
9+
- [Select](control-flow/select.md)

src/async/control-flow/daemon.md

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Daemon
2+
3+
Tasks can be spawned without having to be awaited. They will be scheduled like any other tasks by the executor but won't block any running task. This can be useful for tasks that function like actors, receiving messages and sending messages to other tasks through channels. It can also be useful to log metrics or ping system's health.
4+
5+
```rust,editable,compile_fail
6+
7+
use std::sync::atomic::{AtomicUsize, Ordering};
8+
use std::sync::Arc;
9+
use tokio::time::{sleep, Duration};
10+
11+
#[tokio::main]
12+
async fn main() {
13+
let seconds_since_beginning = Arc::new(AtomicUsize::from(0));
14+
let counter = Arc::clone(&seconds_since_beginning);
15+
tokio::spawn(async move {
16+
loop {
17+
sleep(Duration::from_secs(1)).await;
18+
counter.fetch_add(1, Ordering::SeqCst);
19+
}
20+
});
21+
22+
sleep(Duration::from_millis(4500)).await;
23+
assert_eq!(seconds_since_beginning.load(Ordering::Relaxed), 4);
24+
}
25+
26+
27+
```
28+
29+
<details>
30+
31+
* It is good practice to make your deamons exit because some other blocking task might depend on them. Which would prevent your main thread from ever closing. You can use a `oneshot` channel to signal the task to terminate. You can also use the `ctrl+c` signal handler from `tokio` as an interrupt signal.
32+
33+
</details>

src/async/control-flow/join_all.md

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# join_all
2+
3+
Futures can be combined together to produce concurrent compute flow graphs.
4+
5+
## Run a group of futures concurrently until they all resolve: `join_all`
6+
7+
### Equivalents:
8+
9+
- JS: `Promise.all`
10+
- Python: `asyncio.gather`
11+
12+
```rust,editable,compile_fail
13+
use anyhow::Result;
14+
use futures::future;
15+
use reqwest;
16+
use std::collections::HashMap;
17+
18+
async fn size_of_page(url: &str) -> Result<usize> {
19+
let resp = reqwest::get(url).await?;
20+
Ok(resp.text().await?.len())
21+
}
22+
23+
#[tokio::main]
24+
async fn main() {
25+
let urls: [&str; 4] = [
26+
"https://google.com",
27+
"https://httpbin.org/ip",
28+
"https://play.rust-lang.org/",
29+
"BAD_URL",
30+
];
31+
let futures_iter = urls.into_iter().map(size_of_page);
32+
let results = future::join_all(futures_iter).await;
33+
let page_sizes_dict: HashMap<&str, Result<usize>> =
34+
urls.into_iter().zip(results.into_iter()).collect();
35+
println!("{:?}", page_sizes_dict);
36+
}
37+
```
38+
39+
<details>
40+
41+
* `join_all` should soon be stabilized as part of the standard library in `std::future`.
42+
* For multiple futures of disjoint types, you can use `join!` but you must know how many futures you will have at compile time.
43+
* You can also combine `join_all` with `join!` for instance to join all requests to an http service as well as a database query.
44+
* The risk of `join` is that one of the future could never resolve, this would cause your program to stall.
45+
* Try adding a timeout to the future.
46+
47+
</details>
48+

src/async/control-flow/select.md

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Select
2+
3+
## Run multiple futures concurrently until the first one resolves
4+
5+
### Equivalents:
6+
7+
- JS: `Promise.race`
8+
- Python: `asyncio.new_event_loop().run_until_complete(asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED))`
9+
10+
```rust,editable,compile_fail
11+
use tokio::sync::mpsc::{self, Receiver};
12+
use tokio::time::{sleep, Duration};
13+
14+
#[derive(Debug, PartialEq)]
15+
enum Animal {
16+
Cat { name: String },
17+
Dog { name: String },
18+
}
19+
20+
async fn first_animal_to_finish_race(
21+
mut cat_rcv: Receiver<String>,
22+
mut dog_rcv: Receiver<String>,
23+
) -> Option<Animal> {
24+
tokio::select! {
25+
cat_name = cat_rcv.recv() => Some(Animal::Cat { name: cat_name? }),
26+
dog_name = dog_rcv.recv() => Some(Animal::Dog { name: dog_name? })
27+
}
28+
}
29+
30+
#[tokio::main]
31+
async fn main() {
32+
let (cat_sender, cat_receiver) = mpsc::channel(32);
33+
let (dog_sender, dog_receiver) = mpsc::channel(32);
34+
tokio::spawn(async move {
35+
sleep(Duration::from_secs(10)).await;
36+
cat_sender
37+
.send(String::from("Felix"))
38+
.await
39+
.expect("Failed to send cat.");
40+
});
41+
tokio::spawn(async move {
42+
sleep(Duration::from_secs(5)).await;
43+
dog_sender
44+
.send(String::from("Rex"))
45+
.await
46+
.expect("Failed to send cat.");
47+
});
48+
49+
let winner = first_animal_to_finish_race(cat_receiver, dog_receiver)
50+
.await
51+
.expect("Failed to receive winner");
52+
53+
assert_eq!(winner, Animal::Dog {name: String::from("Rex")});
54+
}
55+
```
56+
57+
<details>
58+
59+
* In this example, we have a race between a cat and a dog. `first_animal_to_finish_race` listens to both channels and will pick whichever arrives first. Since the dog takes 5 seconds, it wins against the cat that take 10 seconds.
60+
* You can use `oneshot` channels in this example as the channels are supposed to receive only one `send`.
61+
* You can try adding more contestants to the race and return a leaderboard. Also, you can add a deadline after which contestants get eliminated.
62+
63+
</details>

src/async/pitfalls.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Pitfalls of async/await
2+
3+
Async / await provides convenient and efficient abstraction for concurrent asynchronous programming. However, the async/await model in Rust also comes with its share of pitfalls and footguns. We illustrate some of them in this chapter:
4+
5+
---
6+
7+
- [Blocking the executor](pitfalls/blocking-executor.md)
8+
- [Pin](pitfalls/pin.md)
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Blocking the executor
2+
3+
Most async runtimes only allow IO tasks to run concurrently.
4+
This means that CPU blocking tasks will block the executor and prevent other tasks from being executed.
5+
An easy workaround is to use async equivalent methods where possible.
6+
7+
```rust,editable,compile_fail
8+
use futures::future::join_all;
9+
use std::time::Instant;
10+
11+
// Uncomment to try with `spawn_blocking` around `std::thread::sleep`.
12+
const USE_SPAWN_BLOCKING: bool = false;
13+
14+
async fn std_sleep_ms(duration_ms: u64) {
15+
if USE_SPAWN_BLOCKING {
16+
tokio::task::spawn_blocking(move || {
17+
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
18+
})
19+
.await
20+
.unwrap();
21+
} else {
22+
std::thread::sleep(std::time::Duration::from_millis(duration_ms));
23+
}
24+
}
25+
26+
async fn tokio_sleep_ms(duration_ms: u64) {
27+
tokio::time::sleep(tokio::time::Duration::from_millis(duration_ms)).await;
28+
}
29+
30+
// Single threaded executor for better reproducibility in runtime.
31+
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
32+
async fn main() {
33+
let std_sleep_futures = (1..=100).map(std_sleep_ms);
34+
let tokio_sleep_futures = (1..=100).map(tokio_sleep_ms);
35+
36+
let now = Instant::now();
37+
join_all(std_sleep_futures).await;
38+
assert!(now.elapsed().as_millis() >= 5050);
39+
40+
let now = Instant::now();
41+
join_all(tokio_sleep_futures).await;
42+
let runtime = now.elapsed().as_millis();
43+
assert!((100..150).contains(&runtime));
44+
}
45+
46+
```
47+
48+
<details>
49+
50+
- Using `std::thread::sleep` blocks the thread, so it prevents the executor from running. It means that while all futures are spawned at the same time, they all run one after the other. The runtime is the sum of all the `sleep` times. Try changing the runtime to `multi_thread` in a multi core environment to see how it impacts the run time.
51+
- A simple fix is to use `tokio::time::sleep`. Now, the `sleep` calls are `async` and they are properly scheduled by the executor.
52+
- Another fix would be to `tokio::task::spawn_blocking` which spawns an actual thread and transforms its handle into a future without blocking the executor. This thread is also scheduled as part of the executor's threadpool to grant better performance.
53+
54+
- You should not think of tasks as OS threads. They do not map 1 to 1 and most executors will allow many tasks to run on a single OS thread. This creates multiple gotchas:
55+
- For instance, using `std::sync::mutex` in an `async` runtime is very dangerous. When you lock the mutex in a thread then yield the executor using `.await` the thread might try to lock the mutex once more in a different task. Hence, prefer `async` alternatives like `tokio::sync::mutex`.
56+
- Thread-local storage should also be used with care in async contexts as it doesn't map to specific tasks.
57+
- Device drivers sometimes map to specific OS threads (for instance CUDA.) Prefer `tokio::task::spawn_blocking` when dealing with those.
58+
- Some C libraries rely on thread local storage as well.
59+
60+
</details>

src/async/pitfalls/pin.md

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Pin
2+
3+
When you await a future, you effectively move the whole stack frame from which you called `.await` to an internal data structure of your executor. If your future has pointers to data on the stack, the addresses might get invalidated. This is extremely unsafe. Therefore, you want to guarantee that the addresses your future point to don't change. That is why we need to `pin` futures. In most cases, you won't have to think about it when using futures from common libraries unless you use `select` in a loop (which is a pretty common use case). If, you implement your own future, you will likely run into this issue.
4+
5+
```rust,editable,compile_fail
6+
use tokio::sync::mpsc::{self, Receiver};
7+
use tokio::time::{sleep, Duration};
8+
9+
#[derive(Debug, PartialEq)]
10+
struct Runner {
11+
name: String,
12+
}
13+
14+
async fn race_finish_line(mut rcv: Receiver<String>, timeout: Duration) -> Option<Vec<Runner>> {
15+
let mut performances: Vec<Runner> = Vec::new();
16+
let timeout_sleep = sleep(timeout);
17+
// Pinning here allows us to await `timeout_sleep` multiple times.
18+
tokio::pin!(timeout_sleep);
19+
20+
loop {
21+
tokio::select! {
22+
// Rcv.recv() returns a new future every time, hence it does not need to be pinned.
23+
name = rcv.recv() => performances.push(Runner { name: name? }),
24+
_ = timeout_sleep.as_mut() => break
25+
}
26+
}
27+
Some(performances)
28+
}
29+
30+
#[tokio::main]
31+
async fn main() {
32+
let (sender, receiver) = mpsc::channel(32);
33+
34+
let names_and_time = [
35+
("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11),
36+
];
37+
38+
let finish_line_future = race_finish_line(receiver, Duration::from_secs(6));
39+
40+
for (name, duration_secs) in names_and_time {
41+
let sender = sender.clone();
42+
tokio::spawn(async move {
43+
sleep(Duration::from_secs(duration_secs)).await;
44+
sender.send(String::from(name)).await.expect("Failed to send runner");
45+
});
46+
}
47+
48+
println!("{:?}", finish_line_future.await.expect("Failed to collect finish line"));
49+
// [Runner { name: "Milo" }, Runner { name: "Oliver" }]
50+
}
51+
```
52+
53+
54+
<details>
55+
56+
* `tokio::pin!` only works on futures that implement `Unpin`. Other futures need to use `box::pin`.
57+
* Another alternative is to not use `tokio::pin!` at all but spawn another task that will send to a `oneshot` channel after the end of the `sleep` call.
58+
59+
```rust,editable,compile_fail
60+
use tokio::sync::mpsc::{self, Receiver};
61+
use tokio::time::{sleep, Duration};
62+
use tokio::sync::oneshot;
63+
64+
#[derive(Debug, PartialEq)]
65+
struct Runner {
66+
name: String,
67+
}
68+
69+
async fn race_finish_line(mut rcv: Receiver<String>, mut timeout: oneshot::Receiver<()>) -> Option<Vec<Runner>> {
70+
let mut performances: Vec<Runner> = Vec::new();
71+
loop {
72+
tokio::select! {
73+
name = rcv.recv() => performances.push(Runner { name: name? }),
74+
_ = &mut timeout => break
75+
}
76+
}
77+
Some(performances)
78+
}
79+
80+
#[tokio::main]
81+
async fn main() {
82+
let (sender, receiver) = mpsc::channel(32);
83+
let (os_sender, os_receiver) = oneshot::channel();
84+
85+
let names_and_time = [
86+
("Leo", 9),("Milo", 3),("Luna", 13),("Oliver", 5),("Charlie", 11),
87+
];
88+
89+
tokio::spawn(async move {
90+
sleep(Duration::from_secs(5)).await;
91+
os_sender.send(()).expect("Failed to send oneshot.");
92+
});
93+
let finish_line_future = race_finish_line(receiver, os_receiver);
94+
95+
for (name, duration_secs) in names_and_time {
96+
let sender = sender.clone();
97+
tokio::spawn(async move {
98+
sleep(Duration::from_secs(duration_secs)).await;
99+
sender.send(String::from(name)).await.expect("Failed to send runner");
100+
});
101+
}
102+
103+
println!("{:?}", finish_line_future.await.expect("Failed to collect finish line"));
104+
// [Runner { name: "Milo" }, Runner { name: "Oliver" }]
105+
}
106+
```
107+
108+
</details>

0 commit comments

Comments
 (0)