Skip to content

Commit 6ddd0e9

Browse files
committed
Add msg_control access for UdpSocket::recvmsg.
1 parent ee8318d commit 6ddd0e9

File tree

3 files changed

+29
-13
lines changed

3 files changed

+29
-13
lines changed

src/io/recvmsg.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,23 @@ use std::{
77
{boxed::Box, io, net::SocketAddr},
88
};
99

10-
pub(crate) struct RecvMsg<T> {
10+
pub(crate) struct RecvMsg<T, U = Vec<u8>> {
1111
#[allow(dead_code)]
1212
fd: SharedFd,
1313
pub(crate) buf: Vec<T>,
1414
#[allow(dead_code)]
1515
io_slices: Vec<IoSliceMut<'static>>,
1616
pub(crate) socket_addr: Box<SockAddr>,
17+
pub(crate) msg_control: Option<U>,
1718
pub(crate) msghdr: Box<libc::msghdr>,
1819
}
1920

20-
impl<T: BoundedBufMut> Op<RecvMsg<T>> {
21-
pub(crate) fn recvmsg(fd: &SharedFd, mut bufs: Vec<T>) -> io::Result<Op<RecvMsg<T>>> {
21+
impl<T: BoundedBufMut, U: BoundedBufMut> Op<RecvMsg<T, U>> {
22+
pub(crate) fn recvmsg(
23+
fd: &SharedFd,
24+
mut bufs: Vec<T>,
25+
mut msg_control: Option<U>,
26+
) -> io::Result<Op<RecvMsg<T, U>>> {
2227
use io_uring::{opcode, types};
2328

2429
let mut io_slices = Vec::with_capacity(bufs.len());
@@ -35,6 +40,10 @@ impl<T: BoundedBufMut> Op<RecvMsg<T>> {
3540
msghdr.msg_iovlen = io_slices.len() as _;
3641
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
3742
msghdr.msg_namelen = socket_addr.len();
43+
if let Some(msg_control) = &mut msg_control {
44+
msghdr.msg_control = msg_control.stable_mut_ptr().cast();
45+
msghdr.msg_controllen = msg_control.bytes_total();
46+
}
3847

3948
CONTEXT.with(|x| {
4049
x.handle().expect("Not in a runtime context").submit_op(
@@ -43,6 +52,7 @@ impl<T: BoundedBufMut> Op<RecvMsg<T>> {
4352
buf: bufs,
4453
io_slices,
4554
socket_addr,
55+
msg_control,
4656
msghdr,
4757
},
4858
|recv_from| {
@@ -57,11 +67,12 @@ impl<T: BoundedBufMut> Op<RecvMsg<T>> {
5767
}
5868
}
5969

60-
impl<T> Completable for RecvMsg<T>
70+
impl<T, U> Completable for RecvMsg<T, U>
6171
where
6272
T: BoundedBufMut,
73+
U: BoundedBufMut,
6374
{
64-
type Output = BufResult<(usize, SocketAddr), Vec<T>>;
75+
type Output = BufResult<(usize, SocketAddr, Option<U>), Vec<T>>;
6576

6677
fn complete(self, cqe: CqeResult) -> Self::Output {
6778
// Convert the operation result to `usize`
@@ -71,6 +82,8 @@ where
7182

7283
let socket_addr = (*self.socket_addr).as_socket();
7384

85+
let msg_control = self.msg_control;
86+
7487
let res = res.map(|n| {
7588
let socket_addr: SocketAddr = socket_addr.unwrap();
7689

@@ -89,7 +102,7 @@ where
89102
break;
90103
}
91104
}
92-
(n, socket_addr)
105+
(n, socket_addr, msg_control)
93106
});
94107

95108
(res, bufs)

src/io/socket.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,12 @@ impl Socket {
189189
op.await
190190
}
191191

192-
pub(crate) async fn recvmsg<T: BoundedBufMut>(
192+
pub(crate) async fn recvmsg<T: BoundedBufMut, U: BoundedBufMut>(
193193
&self,
194194
buf: Vec<T>,
195-
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
196-
let op = Op::recvmsg(&self.fd, buf).unwrap();
195+
msg_control: Option<U>,
196+
) -> crate::BufResult<(usize, SocketAddr, Option<U>), Vec<T>> {
197+
let op = Op::recvmsg(&self.fd, buf, msg_control).unwrap();
197198
op.await
198199
}
199200

src/net/udp.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,14 @@ impl UdpSocket {
305305

306306
/// Receives a single datagram message on the socket, into multiple buffers
307307
///
308-
/// On success, returns the number of bytes read and the origin.
309-
pub async fn recvmsg<T: BoundedBufMut>(
308+
/// On success, returns the number of bytes read, the origin, and the msg_control, as modified
309+
/// by the kernel.
310+
pub async fn recvmsg<T: BoundedBufMut, U: BoundedBufMut>(
310311
&self,
311312
buf: Vec<T>,
312-
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
313-
self.inner.recvmsg(buf).await
313+
msg_control: Option<U>,
314+
) -> crate::BufResult<(usize, SocketAddr, Option<U>), Vec<T>> {
315+
self.inner.recvmsg(buf, msg_control).await
314316
}
315317

316318
/// Reads a packet of data from the socket into the buffer.

0 commit comments

Comments
 (0)