Skip to content

Commit d2410e5

Browse files
authored
Merge pull request #653 from talex5/iomax
Fix handling of very long IO vectors
2 parents 211279e + b388d67 commit d2410e5

File tree

5 files changed

+97
-32
lines changed

5 files changed

+97
-32
lines changed

lib_eio_linux/eio_linux.ml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,23 @@ let _fast_copy_try_splice src dst =
7373

7474
(* XXX workaround for issue #319, PR #327 *)
7575
let fast_copy_try_splice src dst = fast_copy src dst
76+
77+
let[@tail_mod_cons] rec list_take n = function
78+
| [] -> []
79+
| x :: xs ->
80+
if n = 0 then []
81+
else x :: list_take (n - 1) xs
82+
83+
let truncate_to_iomax xs =
84+
if List.compare_length_with xs Uring.iov_max <= 0 then xs
85+
else list_take Uring.iov_max xs
7686

7787
(* Copy using the [Read_source_buffer] optimisation.
7888
Avoids a copy if the source already has the data. *)
7989
let copy_with_rsb rsb dst =
90+
let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
8091
try
81-
while true do
82-
rsb (Low_level.writev_single dst)
83-
done
92+
while true do rsb write done
8493
with End_of_file -> ()
8594

8695
(* Copy by allocating a chunk from the pre-shared buffer and asking
@@ -161,11 +170,11 @@ module Flow = struct
161170
Low_level.readv ~file_offset t bufs
162171

163172
let pwrite t ~file_offset bufs =
164-
Low_level.writev_single ~file_offset t bufs
173+
Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)
165174

166175
let read_methods = []
167176

168-
let single_write t bufs = Low_level.writev_single t bufs
177+
let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)
169178

170179
let copy t ~src =
171180
match Eio_unix.Resource.fd_opt src with

lib_eio_posix/eio_posix_stubs.c

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <caml/bigarray.h>
2626
#include <caml/socketaddr.h>
2727
#include <caml/custom.h>
28+
#include <caml/fail.h>
2829

2930
#include "fork_action.h"
3031

@@ -66,9 +67,16 @@ CAMLprim value caml_eio_posix_getrandom(value v_ba, value v_off, value v_len) {
6667
CAMLreturn(Val_long(ret));
6768
}
6869

69-
/* Fill [iov] with pointers to the cstructs in the array [v_bufs]. */
70-
static void fill_iov(struct iovec *iov, value v_bufs) {
70+
/* Allocates an array of C iovecs using the cstructs in the array [v_bufs]. */
71+
static struct iovec *alloc_iov(value v_bufs) {
72+
struct iovec *iov;
7173
int n_bufs = Wosize_val(v_bufs);
74+
75+
if (n_bufs == 0) return NULL;
76+
iov = caml_stat_calloc_noexc(n_bufs, sizeof(struct iovec));
77+
if (iov == NULL)
78+
caml_raise_out_of_memory();
79+
7280
for (int i = 0; i < n_bufs; i++) {
7381
value v_cs = Field(v_bufs, i);
7482
value v_ba = Field(v_cs, 0);
@@ -77,17 +85,18 @@ static void fill_iov(struct iovec *iov, value v_bufs) {
7785
iov[i].iov_base = (uint8_t *)Caml_ba_data_val(v_ba) + Long_val(v_off);
7886
iov[i].iov_len = Long_val(v_len);
7987
}
88+
return iov;
8089
}
8190

8291
CAMLprim value caml_eio_posix_readv(value v_fd, value v_bufs) {
8392
CAMLparam1(v_bufs);
8493
ssize_t r;
8594
int n_bufs = Wosize_val(v_bufs);
86-
struct iovec iov[n_bufs];
87-
88-
fill_iov(iov, v_bufs);
95+
struct iovec *iov;
8996

97+
iov = alloc_iov(v_bufs);
9098
r = readv(Int_val(v_fd), iov, n_bufs);
99+
caml_stat_free_preserving_errno(iov);
91100
if (r < 0) uerror("readv", Nothing);
92101

93102
CAMLreturn(Val_long(r));
@@ -97,11 +106,11 @@ CAMLprim value caml_eio_posix_writev(value v_fd, value v_bufs) {
97106
CAMLparam1(v_bufs);
98107
ssize_t r;
99108
int n_bufs = Wosize_val(v_bufs);
100-
struct iovec iov[n_bufs];
101-
102-
fill_iov(iov, v_bufs);
109+
struct iovec *iov;
103110

111+
iov = alloc_iov(v_bufs);
104112
r = writev(Int_val(v_fd), iov, n_bufs);
113+
caml_stat_free_preserving_errno(iov);
105114
if (r < 0) uerror("writev", Nothing);
106115

107116
CAMLreturn(Val_long(r));
@@ -111,11 +120,11 @@ CAMLprim value caml_eio_posix_preadv(value v_fd, value v_bufs, value v_offset) {
111120
CAMLparam2(v_bufs, v_offset);
112121
ssize_t r;
113122
int n_bufs = Wosize_val(v_bufs);
114-
struct iovec iov[n_bufs];
115-
116-
fill_iov(iov, v_bufs);
123+
struct iovec *iov;
117124

125+
iov = alloc_iov(v_bufs);
118126
r = preadv(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset));
127+
caml_stat_free_preserving_errno(iov);
119128
if (r < 0) uerror("preadv", Nothing);
120129

121130
CAMLreturn(Val_long(r));
@@ -125,11 +134,11 @@ CAMLprim value caml_eio_posix_pwritev(value v_fd, value v_bufs, value v_offset)
125134
CAMLparam2(v_bufs, v_offset);
126135
ssize_t r;
127136
int n_bufs = Wosize_val(v_bufs);
128-
struct iovec iov[n_bufs];
129-
130-
fill_iov(iov, v_bufs);
137+
struct iovec *iov;
131138

139+
iov = alloc_iov(v_bufs);
132140
r = pwritev(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset));
141+
caml_stat_free_preserving_errno(iov);
133142
if (r < 0) uerror("pwritev", Nothing);
134143

135144
CAMLreturn(Val_long(r));
@@ -402,12 +411,11 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, v
402411
CAMLparam3(v_fds, v_dst_opt, v_bufs);
403412
int n_bufs = Wosize_val(v_bufs);
404413
int n_fds = Int_val(v_n_fds);
405-
struct iovec iov[n_bufs];
406414
union sock_addr_union dst_addr;
415+
struct iovec *iov;
407416
int controllen = n_fds > 0 ? CMSG_SPACE(sizeof(int) * n_fds) : 0;
408417
char cmsg[controllen];
409418
struct msghdr msg = {
410-
.msg_iov = iov,
411419
.msg_iovlen = n_bufs,
412420
.msg_control = n_fds > 0 ? cmsg : NULL,
413421
.msg_controllen = controllen,
@@ -421,12 +429,14 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, v
421429
msg.msg_name = &dst_addr;
422430
}
423431

424-
fill_iov(iov, v_bufs);
432+
iov = alloc_iov(v_bufs);
433+
msg.msg_iov = iov;
425434
fill_fds(&msg, n_fds, v_fds);
426435

427436
caml_enter_blocking_section();
428437
r = sendmsg(Int_val(v_fd), &msg, 0);
429438
caml_leave_blocking_section();
439+
caml_stat_free_preserving_errno(iov);
430440
if (r < 0) uerror("send_msg", Nothing);
431441

432442
CAMLreturn(Val_long(r));
@@ -474,14 +484,13 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs
474484
CAMLlocal2(v_result, v_addr);
475485
int max_fds = Int_val(v_max_fds);
476486
int n_bufs = Wosize_val(v_bufs);
477-
struct iovec iov[n_bufs];
487+
struct iovec *iov;
478488
union sock_addr_union source_addr;
479489
int controllen = max_fds > 0 ? CMSG_SPACE(sizeof(int) * max_fds) : 0;
480490
char cmsg[controllen];
481491
struct msghdr msg = {
482492
.msg_name = &source_addr,
483493
.msg_namelen = sizeof(source_addr),
484-
.msg_iov = iov,
485494
.msg_iovlen = n_bufs,
486495
.msg_control = max_fds > 0 ? cmsg : NULL,
487496
.msg_controllen = controllen,
@@ -490,11 +499,13 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs
490499

491500
memset(cmsg, 0, controllen);
492501

493-
fill_iov(iov, v_bufs);
502+
iov = alloc_iov(v_bufs);
503+
msg.msg_iov = iov;
494504

495505
caml_enter_blocking_section();
496506
r = recvmsg(Int_val(v_fd), &msg, 0);
497507
caml_leave_blocking_section();
508+
caml_stat_free_preserving_errno(iov);
498509
if (r < 0) uerror("recv_msg", Nothing);
499510

500511
v_addr = safe_caml_unix_alloc_sockaddr(&source_addr, msg.msg_namelen, -1);

lib_eio_posix/flow.ml

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ let eio_of_stat x =
2727
ctime = float_of_time (Low_level.ctime_sec x) (Low_level.ctime_nsec x);
2828
}
2929

30+
let truncate_to_iomax xs =
31+
let rec count i = function
32+
| [] -> i
33+
| _ when i = Config.iov_max -> Config.iov_max
34+
| _ :: xs -> count (i + 1) xs
35+
in
36+
let len = count 0 xs in
37+
let arr = Array.make len Cstruct.empty in
38+
let rec fill i xs =
39+
if i = len then arr
40+
else (
41+
match xs with
42+
| x :: xs ->
43+
Array.set arr i x;
44+
fill (i + 1) xs
45+
| [] -> assert false
46+
) in
47+
fill 0 xs
48+
3049
module Impl = struct
3150
type tag = [`Generic | `Unix]
3251

@@ -41,7 +60,7 @@ module Impl = struct
4160

4261
let single_write t bufs =
4362
try
44-
Low_level.writev t (Array.of_list bufs)
63+
Low_level.writev t (truncate_to_iomax bufs)
4564
with Unix.Unix_error (code, name, arg) ->
4665
raise (Err.wrap code name arg)
4766

@@ -66,17 +85,17 @@ module Impl = struct
6685
let read_methods = []
6786

6887
let pread t ~file_offset bufs =
69-
let got = Low_level.preadv ~file_offset t (Array.of_list bufs) in
88+
let got = Low_level.preadv ~file_offset t (truncate_to_iomax bufs) in
7089
if got = 0 then raise End_of_file
7190
else got
7291

73-
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs)
92+
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (truncate_to_iomax bufs)
7493

7594
let send_msg t ~fds data =
76-
Low_level.send_msg ~fds t (Array.of_list data)
95+
Low_level.send_msg ~fds t (truncate_to_iomax data)
7796

7897
let recv_msg_with_fds t ~sw ~max_fds data =
79-
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (Array.of_list data) in
98+
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (truncate_to_iomax data) in
8099
n, fds
81100

82101
let seek = Low_level.lseek

lib_eio_posix/include/discover.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ let optional_flags = [
88

99
let () =
1010
C.main ~name:"discover" (fun c ->
11-
let c_flags = ["-D_LARGEFILE64_SOURCE"] in
11+
let c_flags = ["-D_LARGEFILE64_SOURCE"; "-D_XOPEN_SOURCE=700"; "-D_DARWIN_C_SOURCE"] in
1212
let includes = ["sys/types.h"; "sys/stat.h"; "fcntl.h"] in
1313
let extra_flags, missing_defs =
1414
C.C_define.import c ~c_flags ~includes
@@ -22,7 +22,7 @@ let () =
2222
in
2323
let present_defs =
2424
C.C_define.import c ~c_flags
25-
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"]
25+
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"; "limits.h"]
2626
C.C_define.Type.(extra_flags @ [
2727
"O_RDONLY", Int;
2828
"O_RDWR", Int;
@@ -41,6 +41,8 @@ let () =
4141

4242
"AT_FDCWD", Int;
4343
"AT_SYMLINK_NOFOLLOW", Int;
44+
45+
"IOV_MAX", Int;
4446
])
4547
|> List.map (function
4648
| name, C.C_define.Value.Int v when List.mem name optional_flags ->

tests/flow.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,27 @@ Make sure we don't crash on SIGPIPE:
173173
+Connection_reset (good)
174174
- : unit = ()
175175
```
176+
177+
## IO_MAX
178+
179+
Sending a very long vector over a flow should just send it in chunks, not fail:
180+
181+
```ocaml
182+
# Eio_main.run @@ fun env ->
183+
Switch.run @@ fun sw ->
184+
let r, w = Eio_unix.pipe sw in
185+
let a = Cstruct.of_string "abc" in
186+
let vecs = List.init 10_000 (Fun.const a) in
187+
Fiber.both
188+
(fun () ->
189+
Eio.Flow.write w vecs;
190+
Eio.Flow.close w
191+
)
192+
(fun () ->
193+
let got = Eio.Flow.read_all r in
194+
traceln "Read %d bytes" (String.length got);
195+
assert (got = Cstruct.to_string (Cstruct.concat vecs))
196+
)
197+
+Read 30000 bytes
198+
- : unit = ()
199+
```

0 commit comments

Comments
 (0)