Skip to content

Commit 636558e

Browse files
talex5Lucas Pluvinage
andcommitted
Improve tracing
This adds tracing of cancellation contexts and OS operations, and documents and cleans up the API a bit. When reading events, instead of the consumer providing one function per event, we now use a single function taking a variant. This makes the API easier to use and lets the caller decide whether they want to handle all events (checked by the compiler) or just some of them. Note: eio_posix yields before each operation to avoid starvation. However, if it needed to block then it did another yield afterwards when retrying, which isn't necessary and clutters up the traces. This commit also removes that. Co-authored-by: Lucas Pluvinage <[email protected]>
1 parent 21e05a8 commit 636558e

33 files changed

+451
-286
lines changed

examples/trace/main.ml

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@
99

1010
open Eio.Std
1111

12-
(* [handle pp] handles an event by writing the timestamp, ring ID and user data with [traceln].
13-
[pp] is used to format the user data. *)
14-
let handle pp : _ Eio_runtime_events.handler =
15-
fun ring ts v ->
16-
(* Note: don't use traceln here, as it will just generate more log events! *)
17-
Fmt.epr "%9Ld:ring %d: %a@." (Runtime_events.Timestamp.to_int64 ts) ring pp v
18-
1912
let callbacks =
2013
Runtime_events.Callbacks.create ()
2114
(* Uncomment to trace GC events too: *)
@@ -25,11 +18,10 @@ let callbacks =
2518
*)
2619
~lost_events:(fun ring n -> traceln "ring %d lost %d events" ring n)
2720
|> Eio_runtime_events.add_callbacks
28-
~fiber:(handle (Fmt.fmt "running fiber %d"))
29-
~create:(handle (fun f (id, ty) -> Fmt.pf f "create %s %d" (Eio_runtime_events.ty_to_string ty) id))
30-
~resolve:(handle (Fmt.fmt "resolve %d"))
31-
~log:(handle (Fmt.fmt "log %S"))
32-
~name:(handle (fun f (id, name) -> Fmt.pf f "%d is named %S" id name))
21+
(fun ring ts e ->
22+
(* Note: don't use traceln here, as it will just generate more log events! *)
23+
Fmt.epr "%9Ld:ring %d: %a@." (Runtime_events.Timestamp.to_int64 ts) ring Eio_runtime_events.pp_event e
24+
)
3325
(* (see lib_eio/runtime_events/eio_runtime_events.mli for more event types) *)
3426

3527
(* Read and display trace events from [cursor] until [finished]. *)

lib_eio/buf_write.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ let rec await_batch t =
456456
| Closed, false -> raise End_of_file
457457
| (Active | Closed), true -> Buffers.to_list t.scheduled
458458
| Paused, _ | Active, false ->
459-
Suspend.enter (fun ctx enqueue ->
459+
Suspend.enter "Buf_write.await_batch" (fun ctx enqueue ->
460460
Fiber_context.set_cancel_fn ctx (fun ex ->
461461
t.wake_writer <- ignore;
462462
enqueue (Error ex)

lib_eio/condition.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ let lock_protected m =
1212

1313
let await_generic ?mutex t =
1414
match
15-
Suspend.enter_unchecked (fun ctx enqueue ->
15+
Suspend.enter_unchecked "Condition.await" (fun ctx enqueue ->
1616
match Fiber_context.get_error ctx with
1717
| Some ex ->
1818
Option.iter Eio_mutex.unlock mutex;
@@ -90,7 +90,7 @@ let rec loop_no_mutex t fn =
9090
ensure_cancelled request;
9191
x
9292
| None ->
93-
Suspend.enter_unchecked (fun ctx enqueue ->
93+
Suspend.enter_unchecked "Condition.loop_no_mutex" (fun ctx enqueue ->
9494
match Fiber_context.get_error ctx with
9595
| Some ex ->
9696
ensure_cancelled request;

lib_eio/core/cancel.ml

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type state =
1414
If a function can succeed in a separate domain,
1515
the user's cancel function is responsible for ensuring that this is done atomically. *)
1616
type t = {
17+
id : Trace.id;
1718
mutable state : state;
1819
children : t Lwt_dllist.t;
1920
fibers : fiber_context Lwt_dllist.t;
@@ -89,10 +90,12 @@ let move_fiber_to t fiber =
8990
fiber.cancel_node <- Some new_node
9091

9192
(* Note: the new value is not linked into the cancellation tree. *)
92-
let create ~protected =
93+
let create ~protected purpose =
9394
let children = Lwt_dllist.create () in
9495
let fibers = Lwt_dllist.create () in
95-
{ state = Finished; children; protected; fibers; domain = Domain.self () }
96+
let id = Trace.mint_id () in
97+
Trace.create_cc id purpose;
98+
{ id; state = Finished; children; protected; fibers; domain = Domain.self () }
9699

97100
(* Links [t] into the tree as a child of [parent] and returns a function to remove it again. *)
98101
let activate t ~parent =
@@ -106,19 +109,19 @@ let activate t ~parent =
106109
Lwt_dllist.remove node
107110

108111
(* Runs [fn] with a fresh cancellation context. *)
109-
let with_cc ~ctx:fiber ~parent ~protected fn =
112+
let with_cc ~ctx:fiber ~parent ~protected purpose fn =
110113
if not protected then check parent;
111-
let t = create ~protected in
114+
let t = create ~protected purpose in
112115
let deactivate = activate t ~parent in
113116
move_fiber_to t fiber;
114117
let cleanup () = move_fiber_to parent fiber; deactivate () in
115118
match fn t with
116-
| x -> cleanup (); x
117-
| exception ex -> cleanup (); raise ex
119+
| x -> cleanup (); Trace.exit_cc (); x
120+
| exception ex -> cleanup (); Trace.error t.id ex; Trace.exit_cc (); raise ex
118121

119122
let protect fn =
120123
let ctx = Effect.perform Get_context in
121-
with_cc ~ctx ~parent:ctx.cancel_context ~protected:true @@ fun _ ->
124+
with_cc ~ctx ~parent:ctx.cancel_context ~protected:true Protect @@ fun _ ->
122125
(* Note: there is no need to check the new context after [fn] returns;
123126
the goal of cancellation is only to finish the thread promptly, not to report the error.
124127
We also do not check the parent context, to make sure the caller has a chance to handle the result. *)
@@ -167,18 +170,21 @@ let cancel t ex =
167170
Printexc.raise_with_backtrace ex bt
168171
)
169172

170-
let sub fn =
173+
let sub_checked purpose fn =
171174
let ctx = Effect.perform Get_context in
172175
let parent = ctx.cancel_context in
173-
with_cc ~ctx ~parent ~protected:false @@ fun t ->
176+
with_cc ~ctx ~parent ~protected:false purpose @@ fun t ->
174177
fn t
175178

179+
let sub fn =
180+
sub_checked Sub fn
181+
176182
(* Like [sub], but it's OK if the new context is cancelled.
177183
(instead, return the parent context on exit so the caller can check that) *)
178-
let sub_unchecked fn =
184+
let sub_unchecked purpose fn =
179185
let ctx = Effect.perform Get_context in
180186
let parent = ctx.cancel_context in
181-
with_cc ~ctx ~parent ~protected:false @@ fun t ->
187+
with_cc ~ctx ~parent ~protected:false purpose @@ fun t ->
182188
fn t;
183189
parent
184190

@@ -198,17 +204,18 @@ module Fiber_context = struct
198204

199205
let make ~cc ~vars =
200206
let tid = Trace.mint_id () in
201-
Trace.create tid Fiber;
207+
Trace.create_fiber tid ~cc:cc.id;
202208
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
203209
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
204210
t
205211

206212
let make_root () =
207-
let cc = create ~protected:false in
213+
let cc = create ~protected:false Root in
208214
cc.state <- On;
209215
make ~cc ~vars:Hmap.empty
210216

211217
let destroy t =
218+
Trace.exit_fiber t.tid;
212219
Option.iter Lwt_dllist.remove t.cancel_node
213220

214221
let vars t = t.vars

lib_eio/core/dla.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ let prepare_for_await () =
77
| _ -> ()
88
and await () =
99
if Atomic.get state != `Released then
10-
Suspend.enter @@ fun ctx enqueue ->
10+
Suspend.enter "domain-local-await" @@ fun ctx enqueue ->
1111
let awaiting = `Awaiting enqueue in
1212
if Atomic.compare_and_set state `Init awaiting then (
1313
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->

lib_eio/core/eio__core.mli

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -698,15 +698,18 @@ module Private : sig
698698

699699
(** Suspend a fiber and enter the scheduler. *)
700700
module Suspend : sig
701-
val enter : (Fiber_context.t -> 'a Effects.enqueue -> unit) -> 'a
702-
(** [enter fn] suspends the calling fiber and calls [fn ctx enqueue] in the scheduler's context.
701+
val enter : string -> (Fiber_context.t -> 'a Effects.enqueue -> unit) -> 'a
702+
(** [enter op fn] suspends the calling fiber and calls [fn ctx enqueue] in the scheduler's context.
703+
703704
This should arrange for [enqueue] to be called when the fiber should be resumed.
704705
[enqueue] is thread-safe and so can be called from another domain or systhread.
705706
706707
[ctx] should be used to set a cancellation function. Otherwise, the operation is non-interruptable.
707-
If the caller's cancellation context is already cancelled, [enter] immediately aborts. *)
708+
If the caller's cancellation context is already cancelled, [enter] immediately aborts.
709+
710+
[op] is used when tracing to label the operation. *)
708711

709-
val enter_unchecked : (Fiber_context.t -> 'a Effects.enqueue -> unit) -> 'a
712+
val enter_unchecked : string -> (Fiber_context.t -> 'a Effects.enqueue -> unit) -> 'a
710713
(** [enter_unchecked] is like [enter] except that it does not perform the initial check
711714
that the fiber isn't cancelled (this is useful if you want to do the check yourself, e.g.
712715
because you need to unlock a mutex if cancelled). *)

lib_eio/core/fiber.ml

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t
44

55
let yield () =
6-
let fiber = Suspend.enter (fun fiber enqueue -> enqueue (Ok fiber)) in
6+
let fiber = Suspend.enter "" (fun fiber enqueue -> enqueue (Ok fiber)) in
77
Cancel.check fiber.cancel_context
88

99
(* Note: [f] must not raise an exception, as that would terminate the whole scheduler. *)
@@ -17,12 +17,11 @@ let fork ~sw f =
1717
let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in
1818
fork_raw new_fiber @@ fun () ->
1919
Switch.with_op sw @@ fun () ->
20-
match f () with
21-
| () ->
22-
Trace.resolve (Cancel.Fiber_context.tid new_fiber)
23-
| exception ex ->
24-
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
25-
Trace.resolve_error (Cancel.Fiber_context.tid new_fiber) ex
20+
try
21+
f ()
22+
with ex ->
23+
let bt = Printexc.get_raw_backtrace () in
24+
Switch.fail ~bt sw ex; (* The [with_op] ensures this will succeed *)
2625
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
2726

2827
let fork_daemon ~sw f =
@@ -35,13 +34,12 @@ let fork_daemon ~sw f =
3534
match f () with
3635
| `Stop_daemon ->
3736
(* The daemon asked to stop. *)
38-
Trace.resolve (Cancel.Fiber_context.tid new_fiber)
37+
()
3938
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
4039
(* The daemon was cancelled because all non-daemon fibers are finished. *)
41-
Trace.resolve (Cancel.Fiber_context.tid new_fiber)
40+
()
4241
| exception ex ->
4342
Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *)
44-
Trace.resolve_error (Cancel.Fiber_context.tid new_fiber) ex
4543
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
4644

4745
let fork_promise ~sw f =
@@ -86,13 +84,13 @@ let pair f g =
8684
exception Not_first
8785

8886
let await_cancel () =
89-
Suspend.enter @@ fun fiber enqueue ->
87+
Suspend.enter "await_cancel" @@ fun fiber enqueue ->
9088
Cancel.Fiber_context.set_cancel_fn fiber (fun ex -> enqueue (Error ex))
9189

9290
let any fs =
9391
let r = ref `None in
9492
let parent_c =
95-
Cancel.sub_unchecked (fun cc ->
93+
Cancel.sub_unchecked Any (fun cc ->
9694
let wrap h =
9795
match h () with
9896
| x ->
@@ -198,7 +196,7 @@ module List = struct
198196
}
199197

200198
let await_free t =
201-
if t.free_fibers = 0 then Single_waiter.await t.cond t.sw.id;
199+
if t.free_fibers = 0 then Single_waiter.await t.cond "Limiter.await_free" t.sw.cancel.id;
202200
(* If we got woken up then there was a free fiber then. And since we're the
203201
only fiber that uses [t], and we were sleeping, it must still be free. *)
204202
assert (t.free_fibers > 0);
@@ -306,7 +304,7 @@ let unwrap_cancelled state =
306304
let run_coroutine ~state fn =
307305
let await_request ~prev ~on_suspend =
308306
(* Suspend and wait for the consumer to resume us: *)
309-
Suspend.enter (fun ctx enqueue ->
307+
Suspend.enter "await-consumer" (fun ctx enqueue ->
310308
let ready = `Ready enqueue in
311309
if Atomic.compare_and_set state prev ready then (
312310
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
@@ -355,7 +353,7 @@ let fork_coroutine ~sw fn =
355353
raise ex
356354
);
357355
fun () ->
358-
Suspend.enter (fun ctx enqueue ->
356+
Suspend.enter "await-producer" (fun ctx enqueue ->
359357
let rec aux () =
360358
match Atomic.get state with
361359
| `Ready resume as prev ->

lib_eio/core/promise.ml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,22 @@ let create_with_id id =
2626

2727
let create ?label () =
2828
let id = Trace.mint_id () in
29-
Trace.create ?label id Promise;
29+
Trace.create_obj ?label id Promise;
3030
create_with_id id
3131

3232
let create_resolved x =
3333
let id = Trace.mint_id () in
34-
Trace.create id Promise;
34+
Trace.create_obj id Promise;
3535
to_public_promise { id; state = Atomic.make (Resolved x) }
3636

3737
let await t =
3838
let t = of_public_promise t in
3939
match Atomic.get t.state with
4040
| Resolved x ->
41-
Trace.read t.id;
41+
Trace.get t.id;
4242
x
4343
| Unresolved b ->
44-
Suspend.enter (fun ctx enqueue ->
44+
Suspend.enter "Promise.await" (fun ctx enqueue ->
4545
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
4646
| None -> () (* We got resumed immediately *)
4747
| Some request ->
@@ -53,15 +53,15 @@ let await t =
5353
| Unresolved _ ->
5454
(* We observed the promise to be still unresolved after registering a waiter.
5555
Therefore any resolution must happen after we were registered and we will be notified. *)
56-
Trace.try_read t.id;
56+
Trace.try_get t.id;
5757
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
5858
if Broadcast.cancel request then enqueue (Error ex)
5959
(* else already resumed *)
6060
)
6161
);
6262
match Atomic.get t.state with
6363
| Resolved x ->
64-
Trace.read t.id;
64+
Trace.get t.id;
6565
x
6666
| Unresolved _ -> assert false
6767

@@ -76,7 +76,7 @@ let try_resolve t v =
7676
| Resolved _ -> false
7777
| Unresolved b as prev ->
7878
if Atomic.compare_and_set t.state prev (Resolved v) then (
79-
Trace.resolve t.id;
79+
Trace.put t.id;
8080
Broadcast.resume_all b;
8181
true
8282
) else (

lib_eio/core/single_waiter.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ let create () = { wake = ignore }
1010

1111
let wake t v = t.wake v
1212

13-
let await t id =
13+
let await t op id =
1414
let x =
15-
Suspend.enter @@ fun ctx enqueue ->
15+
Suspend.enter op @@ fun ctx enqueue ->
1616
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
1717
t.wake <- ignore;
1818
enqueue (Error ex)
@@ -23,5 +23,5 @@ let await t id =
2323
enqueue x
2424
)
2525
in
26-
Trace.read id;
26+
Trace.get id;
2727
x

lib_eio/core/suspend.ml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
type 'a enqueue = ('a, exn) result -> unit
22
type _ Effect.t += Suspend : (Cancel.fiber_context -> 'a enqueue -> unit) -> 'a Effect.t
33

4-
let enter_unchecked fn = Effect.perform (Suspend fn)
4+
let enter_unchecked op fn =
5+
Trace.suspend_fiber op;
6+
Effect.perform (Suspend fn)
57

6-
let enter fn =
7-
enter_unchecked @@ fun fiber enqueue ->
8+
let enter op fn =
9+
enter_unchecked op @@ fun fiber enqueue ->
810
match Cancel.Fiber_context.get_error fiber with
911
| None -> fn fiber enqueue
1012
| Some ex -> enqueue (Error ex)

0 commit comments

Comments
 (0)