Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bench/bench_http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let run_client ~n_requests id conn =
let main net domain_mgr ~n_client_domains ~n_server_domains ~n_connections_per_domain ~n_requests_per_connection =
let total = Atomic.make 0 in
let t0 = Unix.gettimeofday () in
Switch.run (fun sw ->
Switch.run ~name:"main" (fun sw ->
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8085) in
let backlog = n_connections_per_domain * n_client_domains in
let server_socket = Eio.Net.listen ~reuse_addr:true ~backlog ~sw net addr in
Expand All @@ -74,7 +74,7 @@ let main net domain_mgr ~n_client_domains ~n_server_domains ~n_connections_per_d
for domain = 1 to n_client_domains do
Fiber.fork ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () ->
Switch.run @@ fun sw ->
Switch.run ~name:"client-domain" @@ fun sw ->
for i = 1 to n_connections_per_domain do
Fiber.fork ~sw (fun () ->
let id = Printf.sprintf "domain %d / conn %d" domain i in
Expand Down
2 changes: 1 addition & 1 deletion examples/net/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ module Write = Eio.Buf_write

(* Connect to [addr] on [net], send a message and then read the reply. *)
let run ~net ~addr =
Switch.run ~name:"client" @@ fun sw ->
traceln "Connecting to server at %a..." Eio.Net.Sockaddr.pp addr;
Switch.run @@ fun sw ->
let flow = Eio.Net.connect ~sw net addr in
(* We use a buffered writer here so we can create the message in multiple
steps but still send it efficiently as a single packet: *)
Expand Down
2 changes: 1 addition & 1 deletion examples/net/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080)

(* Run a server and a test client, communicating using [net]. *)
let main ~net =
Switch.run @@ fun sw ->
Switch.run ~name:"main" @@ fun sw ->
(* We create the listening socket first so that we can be sure it is ready
as soon as the client wants to use it. *)
let listening_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Expand Down
2 changes: 1 addition & 1 deletion examples/trace/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ let trace ~finished (clock, delay) cursor =

(* The program to be traced. *)
let main net =
Switch.run @@ fun sw ->
Switch.run ~name:"main" @@ fun sw ->
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8123) in
let s = Eio.Net.listen ~sw ~backlog:1 ~reuse_addr:true net addr in
Fiber.both
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/buf_write.ml
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ let copy t flow =
with End_of_file -> ()

let with_flow ?(initial_size=0x1000) flow fn =
Switch.run @@ fun sw ->
Switch.run ~name:"Buf_write.with_flow" @@ fun sw ->
let t = create ~sw initial_size in
Fiber.fork ~sw (fun () -> copy t flow);
match fn t with
Expand Down
3 changes: 2 additions & 1 deletion lib_eio/core/cancel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ let cancel t ex =
Printexc.raise_with_backtrace ex bt
)

let sub_checked purpose fn =
let sub_checked ?name purpose fn =
let ctx = Effect.perform Get_context in
let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false purpose @@ fun t ->
Option.iter (Trace.name t.id) name;
fn t

let sub fn =
Expand Down
8 changes: 5 additions & 3 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ module Switch : sig

(** {2 Switch creation} *)

val run : (t -> 'a) -> 'a
val run : ?name:string -> (t -> 'a) -> 'a
(** [run fn] runs [fn] with a fresh switch (initially on).

When [fn] finishes, [run] waits for all fibers registered with the switch to finish,
and then releases all attached resources.

If {!fail} is called, [run] will re-raise the exception (after everything is cleaned up).
If [fn] raises an exception, it is passed to {!fail}. *)
If [fn] raises an exception, it is passed to {!fail}.

@param name Used to name the switch when tracing. *)

val run_protected : (t -> 'a) -> 'a
val run_protected : ?name:string -> (t -> 'a) -> 'a
(** [run_protected fn] is like [run] but ignores cancellation requests from the parent context. *)

(** {2 Cancellation and failure} *)
Expand Down
8 changes: 4 additions & 4 deletions lib_eio/core/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ let fork_promise_exn ~sw f =
p

let all xs =
Switch.run @@ fun sw ->
Switch.run ~name:"all" @@ fun sw ->
List.iter (fork ~sw) xs

let both f g = all [f; g]

let pair f g =
Switch.run @@ fun sw ->
Switch.run ~name:"pair" @@ fun sw ->
let x = fork_promise ~sw f in
let y = g () in
(Promise.await_exn x, y)
Expand Down Expand Up @@ -225,7 +225,7 @@ module List = struct
match items with
| [] -> [] (* Avoid creating a switch in the simple case *)
| items ->
Switch.run @@ fun sw ->
Switch.run ~name:"filter_map" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> []
Expand All @@ -244,7 +244,7 @@ module List = struct
match items with
| [] -> () (* Avoid creating a switch in the simple case *)
| items ->
Switch.run @@ fun sw ->
Switch.run ~name:"iter" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> ()
Expand Down
5 changes: 3 additions & 2 deletions lib_eio/core/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ let run_internal t fn =
maybe_raise_exs t;
assert false

let run fn = Cancel.sub_checked Switch (fun cc -> run_internal (create cc) fn)
let run ?name fn = Cancel.sub_checked ?name Switch (fun cc -> run_internal (create cc) fn)

let run_protected fn =
let run_protected ?name fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.with_cc ~ctx ~parent:ctx.cancel_context ~protected:true Switch @@ fun cancel ->
Option.iter (Trace.name cancel.id) name;
run_internal (create cancel) fn

(* Run [fn ()] in [t]'s cancellation context.
Expand Down
1 change: 1 addition & 0 deletions lib_eio/core/trace.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ let create_fiber ~cc id =
add_event RE.create_fiber (id, cc)

let log = add_event RE.log
let name id x = add_event RE.name (id, x)
let enter_span = add_event RE.enter_span
let exit_span = add_event RE.exit_span
let fiber = add_event RE.fiber
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/core/trace.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ val mint_id : unit -> id
val log : string -> unit
(** [log msg] attaches text [msg] to the current fiber. *)

val name : id -> string -> unit
(** [name id label] sets [label] as the name for [id]. *)

val with_span : string -> (unit -> 'a) -> 'a
(** [with_span op fn] runs [fn ()], labelling the timespan during which it runs with [op]. *)

Expand Down
2 changes: 1 addition & 1 deletion lib_eio/executor_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let max_capacity_f = float max_capacity
Each worker runs in its own domain,
taking jobs from [queue] whenever it has spare capacity. *)
let run_worker { queue } =
Switch.run @@ fun sw ->
Switch.run ~name:"run_worker" @@ fun sw ->
let capacity = ref 0 in
let condition = Condition.create () in
(* The main worker loop. *)
Expand Down
16 changes: 9 additions & 7 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ let getnameinfo (type tag) (t:[> tag ty] r) sockaddr =
let close = Resource.close

let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
Switch.run @@ fun sw ->
Switch.run ~name:"with_tcp_connect" @@ fun sw ->
match
let rec aux = function
| [] -> raise @@ err (Connection_failure No_matching_addresses)
Expand All @@ -355,8 +355,7 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

(* Run a server loop in a single domain. *)
let run_server_loop ~connections ~on_error ~stop listening_socket connection_handler =
Switch.run @@ fun sw ->
let run_server_loop ~sw ~connections ~on_error ~stop listening_socket connection_handler =
let rec accept () =
Semaphore.acquire connections;
accept_fork ~sw ~on_error listening_socket (fun conn addr ->
Expand All @@ -371,13 +370,16 @@ let run_server_loop ~connections ~on_error ~stop listening_socket connection_han

let run_server ?(max_connections=Int.max_int) ?(additional_domains) ?stop ~on_error listening_socket connection_handler : 'a =
if max_connections <= 0 then invalid_arg "max_connections";
Switch.run @@ fun sw ->
Switch.run ~name:"run_server" @@ fun sw ->
let connections = Semaphore.make max_connections in
let run_server_loop () = run_server_loop ~connections ~on_error ~stop listening_socket connection_handler in
let run_server_loop sw = run_server_loop ~sw ~connections ~on_error ~stop listening_socket connection_handler in
additional_domains |> Option.iter (fun (domain_mgr, domains) ->
if domains < 0 then invalid_arg "additional_domains";
for _ = 1 to domains do
Fiber.fork ~sw (fun () -> Domain_manager.run domain_mgr (fun () -> ignore (run_server_loop () : 'a)))
Fiber.fork ~sw (fun () -> Domain_manager.run domain_mgr (fun () ->
Switch.run ~name:"run_server" @@ fun sw ->
ignore (run_server_loop sw : 'a)
))
done;
);
run_server_loop ()
run_server_loop sw
8 changes: 4 additions & 4 deletions lib_eio/path.ml
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ let is_directory t =
kind ~follow:true t = `Directory

let with_open_in path fn =
Switch.run @@ fun sw -> fn (open_in ~sw path)
Switch.run ~name:"with_open_in" @@ fun sw -> fn (open_in ~sw path)

let with_open_out ?append ~create path fn =
Switch.run @@ fun sw -> fn (open_out ~sw ?append ~create path)
Switch.run ~name:"with_open_out" @@ fun sw -> fn (open_out ~sw ?append ~create path)

let with_open_dir path fn =
Switch.run @@ fun sw -> fn (open_dir ~sw path)
Switch.run ~name:"with_open_dir" @@ fun sw -> fn (open_dir ~sw path)

let with_lines path fn =
with_open_in path @@ fun flow ->
Expand Down Expand Up @@ -174,7 +174,7 @@ let catch_missing ~missing_ok fn x =
let rec rmtree ~missing_ok t =
match kind ~follow:false t with
| `Directory ->
Switch.run (fun sw ->
Switch.run ~name:"rmtree" (fun sw ->
match
let t = open_dir ~sw t in
t, read_dir t
Expand Down
4 changes: 2 additions & 2 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ let spawn (type tag) ~sw (t : [> tag mgr_ty] r) ?cwd ?stdin ?stdout ?stderr ?env
?stderr:(stderr :> Flow.sink_ty r option)

let run t ?cwd ?stdin ?stdout ?stderr ?(is_success = Int.equal 0) ?env ?executable args =
Switch.run @@ fun sw ->
Switch.run ~name:"Process.run" @@ fun sw ->
let child = spawn ~sw t ?cwd ?stdin ?stdout ?stderr ?env ?executable args in
match await child with
| `Exited code when is_success code -> ()
Expand All @@ -146,7 +146,7 @@ let pipe (type tag) ~sw ((Resource.T (v, ops)) : [> tag mgr_ty] r) =
X.pipe v ~sw

let parse_out (type tag) (t : [> tag mgr_ty] r) parse ?cwd ?stdin ?stderr ?is_success ?env ?executable args =
Switch.run @@ fun sw ->
Switch.run ~name:"Process.parse_out" @@ fun sw ->
let r, w = pipe t ~sw in
try
let child = spawn ~sw t ?cwd ?stdin ~stdout:w ?stderr ?env ?executable args in
Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ let process =

(* fchdir wants just a directory FD, not an FD and a path like the *at functions. *)
let with_dir dir_fd path fn =
Switch.run @@ fun sw ->
Switch.run ~name:"with_dir" @@ fun sw ->
Low_level.openat ~sw
~seekable:false
~access:`R
Expand Down Expand Up @@ -527,7 +527,7 @@ end = struct
let mkdir t ~perm path = Low_level.mkdir_beneath ~perm t.fd path

let read_dir t path =
Switch.run @@ fun sw ->
Switch.run ~name:"read_dir" @@ fun sw ->
let fd = Low_level.open_dir ~sw t.fd (if path = "" then "." else path) in
Low_level.read_dir fd

Expand Down Expand Up @@ -569,7 +569,7 @@ end = struct
}
) else (
(* Linux < 5.18 *)
Switch.run @@ fun sw ->
Switch.run ~name:"stat" @@ fun sw ->
let fd = Low_level.openat ~sw ~seekable:false t.fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path + (if follow then empty else nofollow))
Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ let getrandom { Cstruct.buffer; off; len } =
let with_parent_dir_fd dir path fn =
let dir_path = Filename.dirname path in
let leaf = Filename.basename path in
Switch.run (fun sw ->
Switch.run ~name:"with_parent_dir" (fun sw ->
match dir with
| _ when leaf = ".." ->
let fd =
Expand Down Expand Up @@ -414,7 +414,7 @@ let statx_confined ~mask ~follow fd path buf =
with_parent_dir_fd fd path @@ fun parent leaf ->
statx ~mask ~fd:parent leaf buf flags
| Cwd | FD _ ->
Switch.run @@ fun sw ->
Switch.run ~name:"statx" @@ fun sw ->
let fd = openat ~sw ~seekable:false fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path)
Expand Down Expand Up @@ -508,7 +508,7 @@ let pipe ~sw =
(r, w)

let with_pipe fn =
Switch.run @@ fun sw ->
Switch.run ~name:"with_pipe" @@ fun sw ->
let r, w = pipe ~sw in
fn r w

Expand Down
2 changes: 1 addition & 1 deletion lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ let run ~extra_effects st main arg =
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Switch.run_protected ~name:"eio_linux" (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/fs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ end = struct
else dir, leaf
in
let dir = resolve t dir in
Switch.run @@ fun sw ->
Switch.run ~name:"with_parent_dir" @@ fun sw ->
let dirfd = Low_level.openat ~sw ~mode:0 dir Low_level.Open_flags.(directory + rdonly + nofollow) in
fn (Some dirfd) leaf
) else fn None path
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ module Impl = struct
| None -> Fmt.invalid_arg "cwd is not an OS directory!"
| Some posix ->
Fs.Dir.with_parent_dir posix path @@ fun dirfd s ->
Switch.run @@ fun launch_sw ->
Switch.run ~name:"spawn_unix" @@ fun launch_sw ->
let cwd = Low_level.openat ?dirfd ~sw:launch_sw ~mode:0 s Low_level.Open_flags.(rdonly + directory) in
fn (Low_level.Process.Fork_action.fchdir cwd :: actions)
in
Expand Down
2 changes: 1 addition & 1 deletion stress/stress_proc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ let run_in_domain mgr =
let main ~dm mgr =
let t0 = Unix.gettimeofday () in
for i = 1 to n_rounds do
Switch.run (fun sw ->
Switch.run ~name:"round" (fun sw ->
for _ = 1 to n_domains - 1 do
Fiber.fork ~sw (fun () -> Eio.Domain_manager.run dm (fun () -> run_in_domain mgr))
done;
Expand Down