Skip to content

Commit 279bd07

Browse files
committed
Fix handling of very long IO vectors
`Flow.write` doesn't place any limit on how long the list of vectors can be, but real operating systems do have limits and will fail if given too many. Also, Eio_posix was allocating the array on the stack, which could fail if it was very large.
1 parent 211279e commit 279bd07

File tree

5 files changed

+94
-18
lines changed

5 files changed

+94
-18
lines changed

lib_eio_linux/eio_linux.ml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,23 @@ let _fast_copy_try_splice src dst =
7373

7474
(* XXX workaround for issue #319, PR #327 *)
7575
let fast_copy_try_splice src dst = fast_copy src dst
76+
77+
let rec list_take n = function
78+
| [] -> []
79+
| x :: xs ->
80+
if n = 0 then []
81+
else x :: list_take (n - 1) xs
82+
83+
let truncate_to_iomax xs =
84+
if List.compare_length_with xs Uring.iov_max <= 0 then xs
85+
else list_take Uring.iov_max xs
7686

7787
(* Copy using the [Read_source_buffer] optimisation.
7888
Avoids a copy if the source already has the data. *)
7989
let copy_with_rsb rsb dst =
90+
let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
8091
try
81-
while true do
82-
rsb (Low_level.writev_single dst)
83-
done
92+
while true do rsb write done
8493
with End_of_file -> ()
8594

8695
(* Copy by allocating a chunk from the pre-shared buffer and asking
@@ -161,11 +170,11 @@ module Flow = struct
161170
Low_level.readv ~file_offset t bufs
162171

163172
let pwrite t ~file_offset bufs =
164-
Low_level.writev_single ~file_offset t bufs
173+
Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)
165174

166175
let read_methods = []
167176

168-
let single_write t bufs = Low_level.writev_single t bufs
177+
let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)
169178

170179
let copy t ~src =
171180
match Eio_unix.Resource.fd_opt src with

lib_eio_posix/eio_posix_stubs.c

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <caml/bigarray.h>
2626
#include <caml/socketaddr.h>
2727
#include <caml/custom.h>
28+
#include <caml/fail.h>
2829

2930
#include "fork_action.h"
3031

@@ -66,6 +67,27 @@ CAMLprim value caml_eio_posix_getrandom(value v_ba, value v_off, value v_len) {
6667
CAMLreturn(Val_long(ret));
6768
}
6869

70+
/* Allocates an array of C iovecs using the cstructs in the array [v_bufs]. */
71+
static struct iovec *alloc_iov(value v_bufs) {
72+
struct iovec *iov;
73+
int n_bufs = Wosize_val(v_bufs);
74+
75+
if (n_bufs == 0) return NULL;
76+
iov = calloc(n_bufs, sizeof(struct iovec));
77+
if (iov == NULL)
78+
caml_raise_out_of_memory();
79+
80+
for (int i = 0; i < n_bufs; i++) {
81+
value v_cs = Field(v_bufs, i);
82+
value v_ba = Field(v_cs, 0);
83+
value v_off = Field(v_cs, 1);
84+
value v_len = Field(v_cs, 2);
85+
iov[i].iov_base = (uint8_t *)Caml_ba_data_val(v_ba) + Long_val(v_off);
86+
iov[i].iov_len = Long_val(v_len);
87+
}
88+
return iov;
89+
}
90+
6991
/* Fill [iov] with pointers to the cstructs in the array [v_bufs]. */
7092
static void fill_iov(struct iovec *iov, value v_bufs) {
7193
int n_bufs = Wosize_val(v_bufs);
@@ -97,11 +119,11 @@ CAMLprim value caml_eio_posix_writev(value v_fd, value v_bufs) {
97119
CAMLparam1(v_bufs);
98120
ssize_t r;
99121
int n_bufs = Wosize_val(v_bufs);
100-
struct iovec iov[n_bufs];
101-
102-
fill_iov(iov, v_bufs);
122+
struct iovec *iov;
103123

124+
iov = alloc_iov(v_bufs);
104125
r = writev(Int_val(v_fd), iov, n_bufs);
126+
free(iov);
105127
if (r < 0) uerror("writev", Nothing);
106128

107129
CAMLreturn(Val_long(r));
@@ -125,11 +147,11 @@ CAMLprim value caml_eio_posix_pwritev(value v_fd, value v_bufs, value v_offset)
125147
CAMLparam2(v_bufs, v_offset);
126148
ssize_t r;
127149
int n_bufs = Wosize_val(v_bufs);
128-
struct iovec iov[n_bufs];
129-
130-
fill_iov(iov, v_bufs);
150+
struct iovec *iov;
131151

152+
iov = alloc_iov(v_bufs);
132153
r = pwritev(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset));
154+
free(iov);
133155
if (r < 0) uerror("pwritev", Nothing);
134156

135157
CAMLreturn(Val_long(r));

lib_eio_posix/flow.ml

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ let eio_of_stat x =
2727
ctime = float_of_time (Low_level.ctime_sec x) (Low_level.ctime_nsec x);
2828
}
2929

30+
let truncate_to_iomax xs =
31+
let rec count i = function
32+
| [] -> i
33+
| _ when i = Config.iov_max -> Config.iov_max
34+
| _ :: xs -> count (i + 1) xs
35+
in
36+
let len = count 0 xs in
37+
let arr = Array.make len Cstruct.empty in
38+
let rec fill i xs =
39+
if i = len then arr
40+
else (
41+
match xs with
42+
| x :: xs ->
43+
Array.set arr i x;
44+
fill (i + 1) xs
45+
| [] -> assert false
46+
) in
47+
fill 0 xs
48+
3049
module Impl = struct
3150
type tag = [`Generic | `Unix]
3251

@@ -41,7 +60,7 @@ module Impl = struct
4160

4261
let single_write t bufs =
4362
try
44-
Low_level.writev t (Array.of_list bufs)
63+
Low_level.writev t (truncate_to_iomax bufs)
4564
with Unix.Unix_error (code, name, arg) ->
4665
raise (Err.wrap code name arg)
4766

@@ -66,17 +85,17 @@ module Impl = struct
6685
let read_methods = []
6786

6887
let pread t ~file_offset bufs =
69-
let got = Low_level.preadv ~file_offset t (Array.of_list bufs) in
88+
let got = Low_level.preadv ~file_offset t (truncate_to_iomax bufs) in
7089
if got = 0 then raise End_of_file
7190
else got
7291

73-
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs)
92+
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (truncate_to_iomax bufs)
7493

7594
let send_msg t ~fds data =
76-
Low_level.send_msg ~fds t (Array.of_list data)
95+
Low_level.send_msg ~fds t (truncate_to_iomax data)
7796

7897
let recv_msg_with_fds t ~sw ~max_fds data =
79-
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (Array.of_list data) in
98+
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (truncate_to_iomax data) in
8099
n, fds
81100

82101
let seek = Low_level.lseek

lib_eio_posix/include/discover.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ let optional_flags = [
88

99
let () =
1010
C.main ~name:"discover" (fun c ->
11-
let c_flags = ["-D_LARGEFILE64_SOURCE"] in
11+
let c_flags = ["-D_LARGEFILE64_SOURCE"; "-D_XOPEN_SOURCE=700"; "-D_DARWIN_C_SOURCE"] in
1212
let includes = ["sys/types.h"; "sys/stat.h"; "fcntl.h"] in
1313
let extra_flags, missing_defs =
1414
C.C_define.import c ~c_flags ~includes
@@ -22,7 +22,7 @@ let () =
2222
in
2323
let present_defs =
2424
C.C_define.import c ~c_flags
25-
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"]
25+
~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"; "limits.h"]
2626
C.C_define.Type.(extra_flags @ [
2727
"O_RDONLY", Int;
2828
"O_RDWR", Int;
@@ -41,6 +41,8 @@ let () =
4141

4242
"AT_FDCWD", Int;
4343
"AT_SYMLINK_NOFOLLOW", Int;
44+
45+
"IOV_MAX", Int;
4446
])
4547
|> List.map (function
4648
| name, C.C_define.Value.Int v when List.mem name optional_flags ->

tests/flow.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,27 @@ Make sure we don't crash on SIGPIPE:
173173
+Connection_reset (good)
174174
- : unit = ()
175175
```
176+
177+
## IO_MAX
178+
179+
Sending a very long vector over a flow should just send it in chunks, not fail:
180+
181+
```ocaml
182+
# Eio_main.run @@ fun env ->
183+
Switch.run @@ fun sw ->
184+
let r, w = Eio_unix.pipe sw in
185+
let a = Cstruct.of_string "abc" in
186+
let vecs = List.init 10_000 (Fun.const a) in
187+
Fiber.both
188+
(fun () ->
189+
Eio.Flow.write w vecs;
190+
Eio.Flow.close w
191+
)
192+
(fun () ->
193+
let got = Eio.Flow.read_all r in
194+
traceln "Read %d bytes" (String.length got);
195+
assert (got = Cstruct.to_string (Cstruct.concat vecs))
196+
)
197+
+Read 30000 bytes
198+
- : unit = ()
199+
```

0 commit comments

Comments
 (0)