@@ -79,7 +79,6 @@ module FD = struct
79
79
Eio.Switch. remove_hook t.release_hook;
80
80
if t.close_unix then (
81
81
let res = Effect. perform (Close fd) in
82
- Log. debug (fun l -> l " close: woken up" );
83
82
if res < 0 then
84
83
raise (wrap_error (Uring. error_of_errno res) " close" (string_of_int (Obj. magic fd : int )))
85
84
)
@@ -115,11 +114,6 @@ module FD = struct
115
114
let uring_file_offset t =
116
115
if t.seekable then Optint.Int63. minus_one else Optint.Int63. zero
117
116
118
- let pp f t =
119
- match t.fd with
120
- | `Open fd -> Fmt. pf f " %d" (Obj. magic fd : int )
121
- | `Closed -> Fmt. string f " (closed)"
122
-
123
117
let fstat t =
124
118
(* todo: use uring *)
125
119
try
@@ -245,7 +239,6 @@ let enter fn = Effect.perform (Enter fn)
245
239
246
240
(* Cancellations always come from the same domain, so no need to send wake events here. *)
247
241
let rec enqueue_cancel job st =
248
- Log. debug (fun l -> l " cancel: submitting call" );
249
242
Ctf. label " cancel" ;
250
243
match Uring. cancel st.uring job Cancel_job with
251
244
| None -> Queue. push (fun st -> enqueue_cancel job st) st.io_q
@@ -315,7 +308,6 @@ let enqueue_read st action (file_offset,fd,buf,len) =
315
308
| None -> FD. uring_file_offset fd
316
309
in
317
310
let req = { op= `R ; file_offset; len; fd; cur_off = 0 ; buf; action } in
318
- Log. debug (fun l -> l " read: submitting call" );
319
311
Ctf. label " read" ;
320
312
submit_rw_req st req
321
313
@@ -355,7 +347,6 @@ let rec enqueue_writev args st action =
355
347
Queue. push (fun st -> enqueue_writev args st action) st.io_q
356
348
357
349
let rec enqueue_poll_add fd poll_mask st action =
358
- Log. debug (fun l -> l " poll_add: submitting call" );
359
350
Ctf. label " poll_add" ;
360
351
match FD. get " poll_add" fd with
361
352
| Error ex -> enqueue_failed_thread st action ex
@@ -368,7 +359,6 @@ let rec enqueue_poll_add fd poll_mask st action =
368
359
Queue. push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
369
360
370
361
let rec enqueue_poll_add_unix fd poll_mask st action cb =
371
- Log. debug (fun l -> l " poll_add: submitting call" );
372
362
Ctf. label " poll_add" ;
373
363
let retry = with_cancel_hook ~action st (fun () ->
374
364
Uring. poll_add st.uring fd poll_mask (Job_fn (action, cb))
@@ -378,7 +368,6 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
378
368
Queue. push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q
379
369
380
370
let rec enqueue_close st action fd =
381
- Log. debug (fun l -> l " close: submitting call" );
382
371
Ctf. label " close" ;
383
372
let subm = Uring. close st.uring fd (Job_no_cancel action) in
384
373
if subm = None then (* wait until an sqe is available *)
@@ -391,12 +380,10 @@ let enqueue_write st action (file_offset,fd,buf,len) =
391
380
| None -> FD. uring_file_offset fd
392
381
in
393
382
let req = { op= `W ; file_offset; len; fd; cur_off = 0 ; buf; action } in
394
- Log. debug (fun l -> l " write: submitting call" );
395
383
Ctf. label " write" ;
396
384
submit_rw_req st req
397
385
398
386
let rec enqueue_splice ~src ~dst ~len st action =
399
- Log. debug (fun l -> l " splice: submitting call" );
400
387
Ctf. label " splice" ;
401
388
match FD. get " splice-src" src, FD. get " splice-dst" dst with
402
389
| Error ex, _
@@ -410,7 +397,6 @@ let rec enqueue_splice ~src ~dst ~len st action =
410
397
Queue. push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
411
398
412
399
let rec enqueue_openat2 ((access , flags , perm , resolve , dir , path ) as args ) st action =
413
- Log. debug (fun l -> l " openat2: submitting call" );
414
400
Ctf. label " openat2" ;
415
401
let use fd =
416
402
let retry = with_cancel_hook ~action st (fun () ->
@@ -441,7 +427,6 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action =
441
427
Queue. push (fun st -> enqueue_unlink args st action) st.io_q
442
428
443
429
let rec enqueue_connect fd addr st action =
444
- Log. debug (fun l -> l " connect: submitting call" );
445
430
Ctf. label " connect" ;
446
431
match FD. get " connect" fd with
447
432
| Error ex -> enqueue_failed_thread st action ex
@@ -464,7 +449,6 @@ let rec extract_fds = function
464
449
| Ok fds -> Ok (fd :: fds)
465
450
466
451
let rec enqueue_send_msg fd ~fds ~dst buf st action =
467
- Log. debug (fun l -> l " send_msg: submitting call" );
468
452
Ctf. label " send_msg" ;
469
453
match FD. get " send_msg" fd, extract_fds fds with
470
454
| Error ex, _
@@ -478,7 +462,6 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action =
478
462
Queue. push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
479
463
480
464
let rec enqueue_recv_msg fd msghdr st action =
481
- Log. debug (fun l -> l " recv_msg: submitting call" );
482
465
Ctf. label " recv_msg" ;
483
466
match FD. get " recv_msg" fd with
484
467
| Error ex -> enqueue_failed_thread st action ex
@@ -491,7 +474,6 @@ let rec enqueue_recv_msg fd msghdr st action =
491
474
Queue. push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
492
475
493
476
let rec enqueue_accept fd client_addr st action =
494
- Log. debug (fun l -> l " accept: submitting call" );
495
477
Ctf. label " accept" ;
496
478
match FD. get " accept" fd with
497
479
| Error ex -> enqueue_failed_thread st action ex
@@ -505,7 +487,6 @@ let rec enqueue_accept fd client_addr st action =
505
487
)
506
488
507
489
let rec enqueue_noop st action =
508
- Log. debug (fun l -> l " noop: submitting call" );
509
490
Ctf. label " noop" ;
510
491
let retry = (Uring. noop st.uring (Job_no_cancel action) = None ) in
511
492
if retry then (
@@ -558,17 +539,13 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
558
539
Some (diff_ns /. 1e9 )
559
540
| `Nothing -> None
560
541
in
561
- Log. debug (fun l -> l " @[<v2>scheduler out of jobs, next timeout %s:@,%a@]"
562
- (match timeout with None -> " inf" | Some v -> string_of_float v)
563
- Uring.Stats. pp (Uring. get_debug_stats uring));
564
542
if not (Lf_queue. is_empty st.run_q) then (
565
543
Lf_queue. push run_q IO ; (* Re-inject IO job in the run queue *)
566
544
schedule st
567
545
) else if timeout = None && Uring. active_ops uring = 0 then (
568
546
(* Nothing further can happen at this point.
569
547
If there are no events in progress but also still no memory available, something has gone wrong! *)
570
548
assert (Queue. length mem_q = 0 );
571
- Log. debug (fun l -> l " schedule: exiting" ); (* Nothing left to do *)
572
549
Lf_queue. close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
573
550
`Exit_scheduler
574
551
) else (
@@ -600,10 +577,8 @@ and handle_complete st ~runnable result =
600
577
submit_pending_io st; (* If something was waiting for a slot, submit it now. *)
601
578
match runnable with
602
579
| Read req ->
603
- Log. debug (fun l -> l " read returned" );
604
580
complete_rw_req st req result
605
581
| Write req ->
606
- Log. debug (fun l -> l " write returned" );
607
582
complete_rw_req st req result
608
583
| Job k ->
609
584
Fiber_context. clear_cancel_fn k.fiber;
@@ -618,14 +593,14 @@ and handle_complete st ~runnable result =
618
593
| Job_no_cancel k ->
619
594
Suspended. continue k result
620
595
| Cancel_job ->
621
- Log. debug ( fun l -> l " cancel returned " );
622
- if result = - 2 then (
623
- Log. debug ( fun f -> f " Cancel returned ENOENT - operation completed before cancel took effect" )
624
- ) else if result = - 114 then (
625
- Log. debug ( fun f -> f " Cancel returned EALREADY - operation cancelled while already in progress " )
626
- ) else if result <> 0 then (
627
- Log. warn (fun f -> f " Cancel returned unexpected error: %s" (Unix. error_message (Uring. error_of_errno result )))
628
- ) ;
596
+ begin match result with
597
+ | 0 (* Operation cancelled successfully *)
598
+ | - 2 (* ENOENT - operation completed before cancel took effect * )
599
+ | - 114 (* EALREADY - operation already in progress *)
600
+ -> ( )
601
+ | errno ->
602
+ Log. warn (fun f -> f " Cancel returned unexpected error: %s" (Unix. error_message (Uring. error_of_errno errno )))
603
+ end ;
629
604
schedule st
630
605
| Job_fn (k , f ) ->
631
606
Fiber_context. clear_cancel_fn k.fiber;
@@ -662,7 +637,6 @@ module Low_level = struct
662
637
match st.mem with
663
638
| None -> Suspended. discontinue k (Failure " No fixed buffer available" )
664
639
| Some mem ->
665
- Log. debug (fun l -> l " alloc: %d" (Uring.Region. avail mem));
666
640
match Uring.Region. alloc mem with
667
641
| buf -> Suspended. continue k buf
668
642
| exception Uring.Region. No_space ->
@@ -676,7 +650,6 @@ module Low_level = struct
676
650
677
651
let noop () =
678
652
let result = enter enqueue_noop in
679
- Log. debug (fun l -> l " noop returned" );
680
653
if result <> 0 then raise (unclassified_error (Eio_unix. Unix_error (Uring. error_of_errno result, " noop" , " " )))
681
654
682
655
type _ Effect.t + = Sleep_until : Mtime .t -> unit Effect .t
@@ -687,14 +660,12 @@ module Low_level = struct
687
660
688
661
let read_exactly ?file_offset fd buf len =
689
662
let res = Effect. perform (ERead (file_offset, fd, buf, Exactly len)) in
690
- Log. debug (fun l -> l " read_exactly: woken up after read" );
691
663
if res < 0 then (
692
664
raise @@ wrap_error (Uring. error_of_errno res) " read_exactly" " "
693
665
)
694
666
695
667
let read_upto ?file_offset fd buf len =
696
668
let res = Effect. perform (ERead (file_offset, fd, buf, Upto len)) in
697
- Log. debug (fun l -> l " read_upto: woken up after read" );
698
669
if res < 0 then (
699
670
raise @@ wrap_error (Uring. error_of_errno res) " read_upto" " "
700
671
) else (
@@ -703,7 +674,6 @@ module Low_level = struct
703
674
704
675
let readv ?file_offset fd bufs =
705
676
let res = enter (enqueue_readv (file_offset, fd, bufs)) in
706
- Log. debug (fun l -> l " readv: woken up after read" );
707
677
if res < 0 then (
708
678
raise @@ wrap_error (Uring. error_of_errno res) " readv" " "
709
679
) else if res = 0 then (
@@ -714,7 +684,6 @@ module Low_level = struct
714
684
715
685
let writev_single ?file_offset fd bufs =
716
686
let res = enter (enqueue_writev (file_offset, fd, bufs)) in
717
- Log. debug (fun l -> l " writev: woken up after write" );
718
687
if res < 0 then (
719
688
raise @@ wrap_error (Uring. error_of_errno res) " writev" " "
720
689
) else (
@@ -737,14 +706,12 @@ module Low_level = struct
737
706
738
707
let await_readable fd =
739
708
let res = enter (enqueue_poll_add fd (Uring.Poll_mask. (pollin + pollerr))) in
740
- Log. debug (fun l -> l " await_readable: woken up" );
741
709
if res < 0 then (
742
710
raise (unclassified_error (Eio_unix. Unix_error (Uring. error_of_errno res, " await_readable" , " " )))
743
711
)
744
712
745
713
let await_writable fd =
746
714
let res = enter (enqueue_poll_add fd (Uring.Poll_mask. (pollout + pollerr))) in
747
- Log. debug (fun l -> l " await_writable: woken up" );
748
715
if res < 0 then (
749
716
raise (unclassified_error (Eio_unix. Unix_error (Uring. error_of_errno res, " await_writable" , " " )))
750
717
)
@@ -753,7 +720,6 @@ module Low_level = struct
753
720
754
721
let write ?file_offset fd buf len =
755
722
let res = Effect. perform (EWrite (file_offset, fd, buf, Exactly len)) in
756
- Log. debug (fun l -> l " write: woken up after write" );
757
723
if res < 0 then (
758
724
raise @@ wrap_error (Uring. error_of_errno res) " write" " "
759
725
)
@@ -769,14 +735,12 @@ module Low_level = struct
769
735
770
736
let splice src ~dst ~len =
771
737
let res = enter (enqueue_splice ~src ~dst ~len ) in
772
- Log. debug (fun l -> l " splice returned" );
773
738
if res > 0 then res
774
739
else if res = 0 then raise End_of_file
775
740
else raise @@ wrap_error (Uring. error_of_errno res) " splice" " "
776
741
777
742
let connect fd addr =
778
743
let res = enter (enqueue_connect fd addr) in
779
- Log. debug (fun l -> l " connect returned" );
780
744
if res < 0 then (
781
745
let ex =
782
746
match addr with
@@ -788,7 +752,6 @@ module Low_level = struct
788
752
789
753
let send_msg fd ?(fds =[] ) ?dst buf =
790
754
let res = enter (enqueue_send_msg fd ~fds ~dst buf) in
791
- Log. debug (fun l -> l " send_msg returned" );
792
755
if res < 0 then (
793
756
raise @@ wrap_error (Uring. error_of_errno res) " send_msg" " "
794
757
)
@@ -797,7 +760,6 @@ module Low_level = struct
797
760
let addr = Uring.Sockaddr. create () in
798
761
let msghdr = Uring.Msghdr. create ~addr buf in
799
762
let res = enter (enqueue_recv_msg fd msghdr) in
800
- Log. debug (fun l -> l " recv_msg returned" );
801
763
if res < 0 then (
802
764
raise @@ wrap_error (Uring. error_of_errno res) " recv_msg" " "
803
765
);
@@ -807,7 +769,6 @@ module Low_level = struct
807
769
let addr = Uring.Sockaddr. create () in
808
770
let msghdr = Uring.Msghdr. create ~n_fds: max_fds ~addr buf in
809
771
let res = enter (enqueue_recv_msg fd msghdr) in
810
- Log. debug (fun l -> l " recv_msg returned" );
811
772
if res < 0 then (
812
773
raise @@ wrap_error (Uring. error_of_errno res) " recv_msg" " "
813
774
);
@@ -827,7 +788,6 @@ module Low_level = struct
827
788
828
789
let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path =
829
790
let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in
830
- Log. debug (fun l -> l " openat2 returned" );
831
791
if res < 0 then (
832
792
Switch. check sw; (* If cancelled, report that instead. *)
833
793
raise @@ wrap_error_fs (Uring. error_of_errno res) " openat2" " "
@@ -912,7 +872,6 @@ module Low_level = struct
912
872
Ctf. label " accept" ;
913
873
let client_addr = Uring.Sockaddr. create () in
914
874
let res = enter (enqueue_accept fd client_addr) in
915
- Log. debug (fun l -> l " accept returned" );
916
875
if res < 0 then (
917
876
raise @@ wrap_error (Uring. error_of_errno res) " accept" " "
918
877
) else (
@@ -1397,7 +1356,6 @@ let monitor_event_fd t =
1397
1356
let buf = Cstruct. create 8 in
1398
1357
while true do
1399
1358
let got = Low_level. readv t.eventfd [buf] in
1400
- Log. debug (fun f -> f " Received wakeup on eventfd %a" FD. pp t.eventfd);
1401
1359
assert (got = 8 );
1402
1360
(* We just go back to sleep now, but this will cause the scheduler to look
1403
1361
at the run queue again and notice any new items. *)
@@ -1424,7 +1382,6 @@ let with_uring ~queue_depth ?polling_timeout ?(fallback=no_fallback) fn =
1424
1382
let rec run : type a .
1425
1383
?queue_depth :int -> ?n_blocks:int -> ?block_size:int -> ?polling_timeout:int -> ?fallback:(_ -> a) -> (_ -> a) -> a =
1426
1384
fun ?(queue_depth =64 ) ?n_blocks ?(block_size =4096 ) ?polling_timeout ?fallback main ->
1427
- Log. debug (fun l -> l " starting run" );
1428
1385
let n_blocks = Option. value n_blocks ~default: queue_depth in
1429
1386
let stdenv = stdenv ~run_event_loop: (run ~queue_depth ~n_blocks ~block_size ?polling_timeout ?fallback:None ) in
1430
1387
(* TODO unify this allocation API around baregion/uring *)
@@ -1446,7 +1403,6 @@ let rec run : type a.
1446
1403
let mem_q = Queue. create () in
1447
1404
let eventfd = FD. placeholder ~seekable: false ~close_unix: false in
1448
1405
let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic. make false ; sleep_q } in
1449
- Log. debug (fun l -> l " starting main thread" );
1450
1406
let rec fork ~new_fiber :fiber fn =
1451
1407
let open Effect.Deep in
1452
1408
Ctf. note_switch (Fiber_context. tid fiber);
@@ -1582,7 +1538,6 @@ let rec run : type a.
1582
1538
let unix = FD. to_unix `Take st.eventfd in
1583
1539
EventFD_pool. put unix
1584
1540
);
1585
- Log. debug (fun f -> f " Monitoring eventfd %a" FD. pp st.eventfd);
1586
1541
result := Some (
1587
1542
Fiber. first
1588
1543
(fun () -> main stdenv)
@@ -1591,5 +1546,4 @@ let rec run : type a.
1591
1546
)
1592
1547
)
1593
1548
in
1594
- Log. debug (fun l -> l " exit" );
1595
1549
Option. get ! result
0 commit comments