Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 63 additions & 48 deletions lib/src/dev/mt_af_xdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <xdp/xsk.h>

#include "../mt_flow.h"
#include "../mt_instance.h"
#include "../mt_log.h"
#include "../mt_stat.h"

Expand Down Expand Up @@ -366,55 +367,61 @@ static int xdp_rx_prod_init(struct mt_xdp_queue* xq) {
return 0;
}

static int xdp_socket_update_xskmap(struct mt_xdp_queue* xq, const char* ifname) {
static int xdp_socket_update_xskmap(struct mtl_main_impl* impl, struct mt_xdp_queue* xq,
const char* ifname) {
enum mtl_port port = xq->port;
uint16_t q = xq->q;
int ret;
int xsks_map_fd = -1;

int sock = xdp_connect_control_sock();
if (sock < 0) {
err("%s(%d,%u), unix socket create fail, %s\n", __func__, port, q, strerror(errno));
return errno;
}
if (mt_is_manager_connected(impl)) {
xsks_map_fd = mt_instance_request_xsks_map_fd(impl, if_nametoindex(ifname));
} else {
int sock = xdp_connect_control_sock();
if (sock < 0) {
err("%s(%d,%u), unix socket create fail, %s\n", __func__, port, q, strerror(errno));
return errno;
}

char command[64];
snprintf(command, sizeof(command), "imtl:if:%s:get_xsk_map", ifname);
char command[64];
snprintf(command, sizeof(command), "imtl:if:%s:get_xsk_map", ifname);

send(sock, command, sizeof(command), 0);
send(sock, command, sizeof(command), 0);

char cms[CMSG_SPACE(sizeof(int))];
struct cmsghdr* cmsg;
struct msghdr msg;
struct iovec iov;
int value;
int len;
char cms[CMSG_SPACE(sizeof(int))];
struct cmsghdr* cmsg;
struct msghdr msg;
struct iovec iov;
int value;
int len;

iov.iov_base = &value;
iov.iov_len = sizeof(int);
iov.iov_base = &value;
iov.iov_len = sizeof(int);

memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = (caddr_t)cms;
msg.msg_controllen = sizeof(cms);
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = (caddr_t)cms;
msg.msg_controllen = sizeof(cms);

len = recvmsg(sock, &msg, 0);
len = recvmsg(sock, &msg, 0);

close(sock);
close(sock);

if (len <= 0) {
err("%s(%d,%u), recvmsg wrong length %d\n", __func__, port, q, len);
return -EINVAL;
}
if (len <= 0) {
err("%s(%d,%u), recvmsg wrong length %d\n", __func__, port, q, len);
return -EINVAL;
}

cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS ||
cmsg->cmsg_len != CMSG_LEN(sizeof(int))) {
err("%s(%d,%u), invalid cmsg for map fd\n", __func__, port, q);
return -EINVAL;
cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS ||
cmsg->cmsg_len != CMSG_LEN(sizeof(int))) {
err("%s(%d,%u), invalid cmsg for map fd\n", __func__, port, q);
return -EINVAL;
}
xsks_map_fd = *(int*)CMSG_DATA(cmsg);
}
xsks_map_fd = *(int*)CMSG_DATA(cmsg);

if (xsks_map_fd < 0) {
err("%s(%d,%u), get xsks_map_fd fail, %s\n", __func__, port, q, strerror(errno));
return errno;
Expand All @@ -440,7 +447,7 @@ static int xdp_socket_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
cfg.rx_size = mt_if_nb_rx_desc(impl, port);
cfg.tx_size = mt_if_nb_tx_desc(impl, port);
cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST;
if (!mt_is_privileged(impl) && xdp->has_ctrl) /* this will skip load xdp prog */
if (xdp->has_ctrl) /* this will skip load xdp prog */
cfg.libxdp_flags = XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
// cfg.bind_flags = XDP_USE_NEED_WAKEUP;

Expand Down Expand Up @@ -472,8 +479,7 @@ static int xdp_socket_init(struct mt_xdp_priv* xdp, struct mt_xdp_queue* xq) {
}
xq->socket_fd = xsk_socket__fd(xq->socket);

if (!mt_is_privileged(impl) && xdp->has_ctrl)
return xdp_socket_update_xskmap(xq, if_name);
if (xdp->has_ctrl) return xdp_socket_update_xskmap(impl, xq, if_name);

return 0;
}
Expand Down Expand Up @@ -773,16 +779,20 @@ int mt_dev_xdp_init(struct mt_interface* inf) {
xdp->queues_cnt = RTE_MAX(inf->nb_tx_q, inf->nb_rx_q);
mt_pthread_mutex_init(&xdp->queues_lock, NULL);

int ctrl_sock = xdp_connect_control_sock();
if (ctrl_sock > 0) {
char buf[10];
snprintf(buf, sizeof(buf), "imtl:ping");
send(ctrl_sock, buf, sizeof(buf), 0);
recv(ctrl_sock, buf, sizeof(buf), 0);
if (strncmp(buf, "pong", 4) == 0) xdp->has_ctrl = true;
close(ctrl_sock);
if (!mt_is_privileged(impl)) {
int ctrl_sock = xdp_connect_control_sock();
if (ctrl_sock > 0) {
char buf[10];
snprintf(buf, sizeof(buf), "imtl:ping");
send(ctrl_sock, buf, sizeof(buf), 0);
recv(ctrl_sock, buf, sizeof(buf), 0);
if (strncmp(buf, "pong", 4) == 0) xdp->has_ctrl = true;
close(ctrl_sock);
}
}

if (mt_is_manager_connected(impl)) xdp->has_ctrl = true;

xdp_parse_combined_info(xdp);
if ((xdp->start_queue + xdp->queues_cnt) > xdp->combined_count) {
err("%s(%d), too many queues requested, start_queue %u queues_cnt %u combined_count "
Expand Down Expand Up @@ -934,7 +944,12 @@ uint16_t mt_tx_xdp_burst(struct mt_tx_xdp_entry* entry, struct rte_mbuf** tx_pkt
return xdp_tx(entry->parent, entry->xq, tx_pkts, nb_pkts);
}

static int xdp_socket_update_dp(const char* if_name, int dp, bool add) {
static int xdp_socket_update_dp(struct mtl_main_impl* impl, const char* if_name, int dp,
bool add) {
if (mt_is_manager_connected(impl)) {
dbg("%s, not implemented\n", __func__);
return 0;
}
int sock = xdp_connect_control_sock();
if (sock < 0) {
err("%s, unix socket create fail, %s\n", __func__, strerror(errno));
Expand Down Expand Up @@ -1020,7 +1035,7 @@ struct mt_rx_xdp_entry* mt_rx_xdp_get(struct mtl_main_impl* impl, enum mtl_port
return NULL;
}
if (xdp->has_ctrl)
xdp_socket_update_dp(mt_kernel_if_name(impl, port), flow->dst_port, true);
xdp_socket_update_dp(impl, mt_kernel_if_name(impl, port), flow->dst_port, true);
}

uint8_t* ip = flow->dip_addr;
Expand All @@ -1041,7 +1056,7 @@ int mt_rx_xdp_put(struct mt_rx_xdp_entry* entry) {
mt_rx_flow_free(impl, port, entry->flow_rsp);
entry->flow_rsp = NULL;
if (xdp->has_ctrl)
xdp_socket_update_dp(mt_kernel_if_name(impl, port), flow->dst_port, false);
xdp_socket_update_dp(impl, mt_kernel_if_name(impl, port), flow->dst_port, false);
}
if (xq) {
xdp_queue_rx_stat(xq);
Expand Down
61 changes: 61 additions & 0 deletions lib/src/mt_instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,61 @@ int mt_instance_get_lcore(struct mtl_main_impl* impl, unsigned int lcore_id) {
return -response;
}

int mt_instance_request_xsks_map_fd(struct mtl_main_impl* impl, unsigned int ifindex) {
int ret;
int xsks_map_fd = -1;
int sock = impl->instance_fd;

mtl_message_t mtl_msg;
mtl_msg.header.magic = htonl(MTL_MANAGER_MAGIC);
mtl_msg.header.type = htonl(MTL_MSG_TYPE_REQUEST_MAP_FD);
mtl_msg.body.request_map_fd_msg.ifindex = htonl(ifindex);
mtl_msg.header.body_len = htonl(sizeof(mtl_request_map_fd_message_t));

ret = send(sock, &mtl_msg, sizeof(mtl_message_t), 0);
if (ret < 0) {
err("%s(%u), send message fail\n", __func__, ifindex);
return ret;
}

char cms[CMSG_SPACE(sizeof(int))];
struct cmsghdr* cmsg;
struct msghdr msg;
struct iovec iov;
int value;
int len;

iov.iov_base = &value;
iov.iov_len = sizeof(int);

memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = (caddr_t)cms;
msg.msg_controllen = sizeof(cms);

len = recvmsg(sock, &msg, 0);
if (len < 0) {
err("%s(%u), recv message fail\n", __func__, ifindex);
return len;
}

cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS ||
cmsg->cmsg_len != CMSG_LEN(sizeof(int))) {
err("%s(%u), invalid cmsg for map fd\n", __func__, ifindex);
return -EINVAL;
}

xsks_map_fd = *(int*)CMSG_DATA(cmsg);
if (xsks_map_fd < 0) {
err("%s(%u), get xsks_map_fd fail, %s\n", __func__, ifindex, strerror(errno));
return errno;
}

return xsks_map_fd;
}

int mt_instance_init(struct mtl_main_impl* impl, struct mtl_init_params* p) {
impl->instance_fd = -1;
int sock = socket(AF_UNIX, SOCK_STREAM, 0);
Expand Down Expand Up @@ -163,4 +218,10 @@ int mt_instance_put_lcore(struct mtl_main_impl* impl, unsigned int lcore_id) {
return -ENOTSUP;
}

int mt_instance_request_xsks_map_fd(struct mtl_main_impl* impl, unsigned int ifindex) {
MTL_MAY_UNUSED(impl);
MTL_MAY_UNUSED(ifindex);
return -ENOTSUP;
}

#endif
1 change: 1 addition & 0 deletions lib/src/mt_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ int mt_instance_uinit(struct mtl_main_impl* impl);

int mt_instance_get_lcore(struct mtl_main_impl* impl, unsigned int lcore_id);
int mt_instance_put_lcore(struct mtl_main_impl* impl, unsigned int lcore_id);
int mt_instance_request_xsks_map_fd(struct mtl_main_impl* impl, unsigned int ifindex);

#endif
20 changes: 14 additions & 6 deletions manager/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ exec_env = host_machine.system()
set_variable('is_windows', exec_env == 'windows')

if is_windows
build = false
reason = 'not supported on Windows'
subdir_done()
message('not supported on Windows')
subdir_done()
endif

message('BUILD Environment: ' + exec_env)
Expand All @@ -34,8 +33,17 @@ if get_option('enable_asan') == true
asan_dep = cpp_c.find_library('asan', required : true)
endif

# xdp check
libxdp_dep = dependency('libxdp', required: false)
libbpf_dep = dependency('libbpf', required: false)
if libxdp_dep.found() and libbpf_dep.found()
add_global_arguments('-DMTL_HAS_XDP_BACKEND', language : 'cpp')
else
message('libxdp and libbpf not found, no af_xdp backend')
endif

executable('MtlManager', sources,
cpp_args: cpp_args,
link_args: link_cpp_args,
dependencies: [asan_dep]
cpp_args: cpp_args,
link_args: link_cpp_args,
dependencies: [asan_dep, libxdp_dep, libbpf_dep]
)
61 changes: 56 additions & 5 deletions manager/mtl_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class mtl_instance {
int handle_message_get_lcore(mtl_lcore_message_t* lcore_msg);
int handle_message_put_lcore(mtl_lcore_message_t* lcore_msg);
int handle_message_register(mtl_register_message_t* register_msg);
int handle_message_request_map_fd(mtl_request_map_fd_message_t* request_map_fd_msg);
int send_response(bool success) {
mtl_message_t msg;
msg.header.magic = htonl(MTL_MANAGER_MAGIC);
Expand Down Expand Up @@ -83,6 +84,9 @@ int mtl_instance::handle_message(const char* buf, int len) {
case MTL_MSG_TYPE_PUT_LCORE:
handle_message_put_lcore(&msg->body.lcore_msg);
break;
case MTL_MSG_TYPE_REQUEST_MAP_FD:
handle_message_request_map_fd(&msg->body.request_map_fd_msg);
break;
default:
log(log_level::ERROR, "Unknown message type");
break;
Expand Down Expand Up @@ -149,7 +153,7 @@ int mtl_instance::handle_message_register(mtl_register_message_t* register_msg)
std::shared_ptr<mtl_interface> mtl_instance::get_interface(const unsigned int ifindex) {
auto it = interfaces.find(ifindex);
if (it != interfaces.end()) {
log(log_level::INFO, "Returning existing interface.");
log(log_level::DEBUG, "Returning existing interface.");
return it->second;
}

Expand All @@ -164,10 +168,57 @@ std::shared_ptr<mtl_interface> mtl_instance::get_interface(const unsigned int if

/* Interface does not exist, create and initialize it */
log(log_level::INFO, "Initializing a new interface " + std::to_string(ifindex));
auto new_interface = std::make_shared<mtl_interface>(ifindex);
g_interfaces[ifindex] = new_interface;
interfaces[ifindex] = new_interface;
return new_interface;
try {
auto new_interface = std::make_shared<mtl_interface>(ifindex);
g_interfaces[ifindex] = new_interface;
interfaces[ifindex] = new_interface;
return new_interface;
} catch (const std::exception& e) {
log(log_level::ERROR, "Failed to initialize interface: " + std::string(e.what()));
return nullptr;
}
}

int mtl_instance::handle_message_request_map_fd(
mtl_request_map_fd_message_t* request_map_fd_msg) {
unsigned int ifindex = ntohl(request_map_fd_msg->ifindex);
auto interface = get_interface(ifindex);

struct msghdr msg;
struct iovec iov[1];
struct cmsghdr* cmsg = NULL;
char ctrl_buf[CMSG_SPACE(sizeof(int))];
char data[1];

memset(&msg, 0, sizeof(struct msghdr));
memset(ctrl_buf, 0, CMSG_SPACE(sizeof(int)));

data[0] = ' ';
iov[0].iov_base = data;
iov[0].iov_len = sizeof(data);

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_controllen = CMSG_SPACE(sizeof(int));
msg.msg_control = ctrl_buf;

cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));

if (interface == nullptr) {
log(log_level::ERROR, "Failed to get interface " + std::to_string(ifindex));
*((int*)CMSG_DATA(cmsg)) = -1;
sendmsg(conn_fd, &msg, 0);
return -1;
}

*((int*)CMSG_DATA(cmsg)) = interface->get_xsks_map_fd();

return sendmsg(conn_fd, &msg, 0);
}

#endif
Loading