Skip to content

Commit b9e3d2e

Browse files
authored
task: add Tracing instrumentation to spawned tasks (#2655)
## Motivation When debugging asynchronous systems, it can be very valuable to inspect what tasks are currently active (see #2510). The [`tracing` crate] and related libraries provide an interface for Rust libraries and applications to emit and consume structured, contextual, and async-aware diagnostic information. Because this diagnostic information is structured and machine-readable, it is a better fit for the task-tracking use case than textual logging — `tracing` spans can be consumed to generate metrics ranging from a simple counter of active tasks to histograms of poll durations, idle durations, and total task lifetimes. This information is potentially valuable to both Tokio users *and* to maintainers. Additionally, `tracing` is maintained by the Tokio project and is becoming widely adopted by other libraries in the "Tokio stack", such as [`hyper`], [`h2`], and [`tonic`] and in [other] [parts] of the broader Rust ecosystem. Therefore, it is suitable for use in Tokio itself. [`tracing` crate]: https://github.com/tokio-rs/tracing [`hyper`]: hyperium/hyper#2204 [`h2`]: hyperium/h2#475 [`tonic`]: https://github.com/hyperium/tonic/blob/570c606397e47406ec148fe1763586e87a8f5298/tonic/Cargo.toml#L48 [other]: rust-lang/chalk#525 [parts]: rust-lang/compiler-team#331 ## Solution This PR is an MVP for instrumenting Tokio with `tracing` spans. When the "tracing" optional dependency is enabled, every spawned future will be instrumented with a `tracing` span. The generated spans are at the `TRACE` verbosity level, and have the target "tokio::task", which may be used by consumers to filter whether they should be recorded. They include fields for the type name of the spawned future and for what kind of task the span corresponds to (a standard `spawn`ed task, a local task spawned by `spawn_local`, or a `blocking` task spawned by `spawn_blocking`). Because `tracing` has separate concepts of "opening/closing" and "entering/exiting" a span, we enter these spans every time the spawned task is polled. This allows collecting data such as: - the total lifetime of the task from `spawn` to `drop` - the number of times the task was polled before it completed - the duration of each individual time that the span was polled (and therefore, aggregated metrics like histograms or averages of poll durations) - the total time a span was actively being polled, and the total time it was alive but **not** being polled - the time between when the task was `spawn`ed and the first poll As an example, here is the output of a version of the `chat` example instrumented with `tracing`: ![image](https://user-images.githubusercontent.com/2796466/87231927-e50f6900-c36f-11ea-8a90-6da9b93b9601.png) And, with multiple connections actually sending messages: ![trace_example_1](https://user-images.githubusercontent.com/2796466/87231876-8d70fd80-c36f-11ea-91f1-0ad1a5b3112f.png) I haven't added any `tracing` spans in the example, only converted the existing `println!`s to `tracing::info` and `tracing::error` for consistency. The span durations in the above output are generated by `tracing-subscriber`. Of course, a Tokio-specific subscriber could generate even more detailed statistics, but that's follow-up work once basic tracing support has been added. Note that the `Instrumented` type from `tracing-futures`, which attaches a `tracing` span to a future, was reimplemented inside of Tokio to avoid a dependency on that crate. `tracing-futures` has a feature flag that enables an optional dependency on Tokio, and I believe that if another crate in a dependency graph enables that feature while Tokio's `tracing` support is also enabled, it would create a circular dependency that Cargo wouldn't be able to handle. Also, it avoids a dependency for a very small amount of code that is unlikely to ever change. There is, of course, room for plenty of future work here. This might include: - instrumenting other parts of `tokio`, such as I/O resources and channels (possibly via waker instrumentation) - instrumenting the threadpool so that the state of worker threads can be inspected - writing `tracing-subscriber` `Layer`s to collect and display Tokio-specific data from these traces - using `track_caller` (when it's stable) to record _where_ a task was `spawn`ed from However, this is intended as an MVP to get us started on that path. Signed-off-by: Eliza Weisman <[email protected]>
1 parent a23d2b2 commit b9e3d2e

File tree

9 files changed

+127
-8
lines changed

9 files changed

+127
-8
lines changed

examples/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ edition = "2018"
77
# If you copy one of the examples into a new project, you should be using
88
# [dependencies] instead.
99
[dev-dependencies]
10-
tokio = { version = "0.2.0", path = "../tokio", features = ["full"] }
10+
tokio = { version = "0.2.0", path = "../tokio", features = ["full", "tracing"] }
11+
tracing = "0.1"
12+
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
1113
tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] }
1214
bytes = "0.5"
1315
futures = "0.3.0"

examples/chat.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,26 @@ use std::task::{Context, Poll};
4343

4444
#[tokio::main]
4545
async fn main() -> Result<(), Box<dyn Error>> {
46+
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
47+
// Configure a `tracing` subscriber that logs traces emitted by the chat
48+
// server.
49+
tracing_subscriber::fmt()
50+
// Filter what traces are displayed based on the RUST_LOG environment
51+
// variable.
52+
//
53+
// Traces emitted by the example code will always be displayed. You
54+
// can set `RUST_LOG=tokio=trace` to enable additional traces emitted by
55+
// Tokio itself.
56+
.with_env_filter(EnvFilter::from_default_env().add_directive("chat=info".parse()?))
57+
// Log events when `tracing` spans are created, entered, exited, or
58+
// closed. When Tokio's internal tracing support is enabled (as
59+
// described above), this can be used to track the lifecycle of spawned
60+
// tasks on the Tokio runtime.
61+
.with_span_events(FmtSpan::FULL)
62+
// Set this subscriber as the default, to collect all traces emitted by
63+
// the program.
64+
.init();
65+
4666
// Create the shared state. This is how all the peers communicate.
4767
//
4868
// The server task will hold a handle to this. For every new client, the
@@ -59,7 +79,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
5979
// Note that this is the Tokio TcpListener, which is fully async.
6080
let mut listener = TcpListener::bind(&addr).await?;
6181

62-
println!("server running on {}", addr);
82+
tracing::info!("server running on {}", addr);
6383

6484
loop {
6585
// Asynchronously wait for an inbound TcpStream.
@@ -70,8 +90,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
7090

7191
// Spawn our handler to be run asynchronously.
7292
tokio::spawn(async move {
93+
tracing::debug!("accepted connection");
7394
if let Err(e) = process(state, stream, addr).await {
74-
println!("an error occurred; error = {:?}", e);
95+
tracing::info!("an error occurred; error = {:?}", e);
7596
}
7697
});
7798
}
@@ -200,7 +221,7 @@ async fn process(
200221
Some(Ok(line)) => line,
201222
// We didn't get a line so we return early here.
202223
_ => {
203-
println!("Failed to get username from {}. Client disconnected.", addr);
224+
tracing::error!("Failed to get username from {}. Client disconnected.", addr);
204225
return Ok(());
205226
}
206227
};
@@ -212,7 +233,7 @@ async fn process(
212233
{
213234
let mut state = state.lock().await;
214235
let msg = format!("{} has joined the chat", username);
215-
println!("{}", msg);
236+
tracing::info!("{}", msg);
216237
state.broadcast(addr, &msg).await;
217238
}
218239

@@ -233,9 +254,10 @@ async fn process(
233254
peer.lines.send(&msg).await?;
234255
}
235256
Err(e) => {
236-
println!(
257+
tracing::error!(
237258
"an error occurred while processing messages for {}; error = {:?}",
238-
username, e
259+
username,
260+
e
239261
);
240262
}
241263
}
@@ -248,7 +270,7 @@ async fn process(
248270
state.peers.remove(&addr);
249271

250272
let msg = format!("{} has left the chat", username);
251-
println!("{}", msg);
273+
tracing::info!("{}", msg);
252274
state.broadcast(addr, &msg).await;
253275
}
254276

tokio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ iovec = { version = "0.1.4", optional = true }
106106
num_cpus = { version = "1.8.0", optional = true }
107107
parking_lot = { version = "0.10.0", optional = true } # Not in full
108108
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
109+
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full
109110

110111
[target.'cfg(unix)'.dependencies]
111112
mio-uds = { version = "0.6.5", optional = true }

tokio/src/macros/cfg.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,25 @@ macro_rules! cfg_unstable {
364364
}
365365
}
366366

367+
macro_rules! cfg_trace {
368+
($($item:item)*) => {
369+
$(
370+
#[cfg(feature = "tracing")]
371+
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
372+
$item
373+
)*
374+
}
375+
}
376+
377+
macro_rules! cfg_not_trace {
378+
($($item:item)*) => {
379+
$(
380+
#[cfg(not(feature = "tracing"))]
381+
$item
382+
)*
383+
}
384+
}
385+
367386
macro_rules! cfg_coop {
368387
($($item:item)*) => {
369388
$(

tokio/src/task/blocking.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ cfg_blocking! {
114114
F: FnOnce() -> R + Send + 'static,
115115
R: Send + 'static,
116116
{
117+
#[cfg(feature = "tracing")]
118+
let f = {
119+
let span = tracing::trace_span!(
120+
target: "tokio::task",
121+
"task",
122+
kind = %"blocking",
123+
function = %std::any::type_name::<F>(),
124+
);
125+
move || {
126+
let _g = span.enter();
127+
f()
128+
}
129+
};
117130
crate::runtime::spawn_blocking(f)
118131
}
119132
}

tokio/src/task/local.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ cfg_rt_util! {
195195
F: Future + 'static,
196196
F::Output: 'static,
197197
{
198+
let future = crate::util::trace::task(future, "local");
198199
CURRENT.with(|maybe_cx| {
199200
let cx = maybe_cx
200201
.expect("`spawn_local` called from outside of a `task::LocalSet`");
@@ -277,6 +278,7 @@ impl LocalSet {
277278
F: Future + 'static,
278279
F::Output: 'static,
279280
{
281+
let future = crate::util::trace::task(future, "local");
280282
let (task, handle) = unsafe { task::joinable_local(future) };
281283
self.context.tasks.borrow_mut().queue.push_back(task);
282284
handle

tokio/src/task/spawn.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ doc_rt_core! {
129129
{
130130
let spawn_handle = runtime::context::spawn_handle()
131131
.expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
132+
let task = crate::util::trace::task(task, "task");
132133
spawn_handle.spawn(task)
133134
}
134135
}

tokio/src/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ cfg_rt_threaded! {
1919
pub(crate) use try_lock::TryLock;
2020
}
2121

22+
pub(crate) mod trace;
23+
2224
#[cfg(any(feature = "macros", feature = "stream"))]
2325
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
2426
pub use rand::thread_rng_n;

tokio/src/util/trace.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
cfg_trace! {
2+
cfg_rt_core! {
3+
use std::future::Future;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
use pin_project_lite::pin_project;
7+
8+
use tracing::Span;
9+
10+
pin_project! {
11+
/// A future that has been instrumented with a `tracing` span.
12+
#[derive(Debug, Clone)]
13+
pub(crate) struct Instrumented<T> {
14+
#[pin]
15+
inner: T,
16+
span: Span,
17+
}
18+
}
19+
20+
impl<T: Future> Future for Instrumented<T> {
21+
type Output = T::Output;
22+
23+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
24+
let this = self.project();
25+
let _enter = this.span.enter();
26+
this.inner.poll(cx)
27+
}
28+
}
29+
30+
impl<T> Instrumented<T> {
31+
pub(crate) fn new(inner: T, span: Span) -> Self {
32+
Self { inner, span }
33+
}
34+
}
35+
36+
#[inline]
37+
pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> {
38+
let span = tracing::trace_span!(
39+
target: "tokio::task",
40+
"task",
41+
%kind,
42+
future = %std::any::type_name::<F>(),
43+
);
44+
Instrumented::new(task, span)
45+
}
46+
}
47+
}
48+
49+
cfg_not_trace! {
50+
cfg_rt_core! {
51+
#[inline]
52+
pub(crate) fn task<F>(task: F, _: &'static str) -> F {
53+
// nop
54+
task
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)