Skip to content

Commit 1635269

Browse files
committed
prov/efa: Fix the rx flags procedure.
This patch fixed two issues in efa provider that will cause trouble when sharing rx to a peer provider. 1. rxr_msg_recv (trecv, recvv, trecvv) should pass flags as rxr_rx_flags(ep) to rxr_msg_generic_recv, which means it should enable FI_COMPLETION in the flags as long as the prov_info->rx_attr.flags support FI_COMPLETION. For rxr_msg_recvmsg (trecvmsg), it should pass application flags |= util_ep.rx_msg_flags, which will have NO FI_COMPLETION when application binds rx cq with FI_SELECTIVE_COMPLETION, and does not have FI_COMPLETION in the flags of fi_recvmsg. 2. when calling ofi_need_completion in rxr_rx_entry_report_completion, the rxr_rx_flags(ep) should not be passed in as the cq_flags. Instead, cq_flags should be either 0 or FI_SELECTIVE_COMPLETION which can be derived from util_ep.rx_msg_flags. Signed-off-by: Shi Jin <[email protected]>
1 parent cb3d10f commit 1635269

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

prov/efa/src/rdm/rxr_msg.c

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,20 +1145,13 @@ ssize_t rxr_msg_generic_recv(struct fid_ep *ep, const struct fi_msg *msg,
11451145
struct rxr_ep *rxr_ep;
11461146
struct dlist_entry *unexp_list;
11471147
struct rxr_op_entry *rx_entry;
1148-
uint64_t rx_op_flags;
11491148

11501149
rxr_ep = container_of(ep, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
11511150

11521151
assert(msg->iov_count <= rxr_ep->rx_iov_limit);
11531152

11541153
efa_perfset_start(rxr_ep, perf_efa_recv);
11551154

1156-
assert(rxr_ep->base_ep.util_ep.rx_msg_flags == 0 || rxr_ep->base_ep.util_ep.rx_msg_flags == FI_COMPLETION);
1157-
rx_op_flags = rxr_ep->base_ep.util_ep.rx_op_flags;
1158-
if (rxr_ep->base_ep.util_ep.rx_msg_flags == 0)
1159-
rx_op_flags &= ~FI_COMPLETION;
1160-
flags = flags | rx_op_flags;
1161-
11621155
ofi_mutex_lock(&rxr_ep->base_ep.util_ep.lock);
11631156
if (OFI_UNLIKELY(is_rx_res_full(rxr_ep))) {
11641157
ret = -FI_EAGAIN;
@@ -1355,32 +1348,42 @@ static
13551348
ssize_t rxr_msg_recvmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,
13561349
uint64_t flags)
13571350
{
1358-
return rxr_msg_generic_recv(ep_fid, msg, 0, 0, ofi_op_msg, flags);
1351+
struct rxr_ep *ep;
1352+
1353+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
1354+
1355+
return rxr_msg_generic_recv(ep_fid, msg, 0, 0, ofi_op_msg, flags | ep->base_ep.util_ep.rx_msg_flags);
13591356
}
13601357

13611358
static
1362-
ssize_t rxr_msg_recv(struct fid_ep *ep, void *buf, size_t len,
1359+
ssize_t rxr_msg_recv(struct fid_ep *ep_fid, void *buf, size_t len,
13631360
void *desc, fi_addr_t src_addr, void *context)
13641361
{
13651362
struct fi_msg msg = {0};
13661363
struct iovec iov;
1364+
struct rxr_ep *ep;
1365+
1366+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
13671367

13681368
iov.iov_base = buf;
13691369
iov.iov_len = len;
13701370

13711371
rxr_msg_construct(&msg, &iov, &desc, 1, src_addr, context, 0);
1372-
return rxr_msg_recvmsg(ep, &msg, 0);
1372+
return rxr_msg_recvmsg(ep_fid, &msg, rxr_rx_flags(ep));
13731373
}
13741374

13751375
static
1376-
ssize_t rxr_msg_recvv(struct fid_ep *ep, const struct iovec *iov,
1376+
ssize_t rxr_msg_recvv(struct fid_ep *ep_fid, const struct iovec *iov,
13771377
void **desc, size_t count, fi_addr_t src_addr,
13781378
void *context)
13791379
{
13801380
struct fi_msg msg = {0};
1381+
struct rxr_ep *ep;
1382+
1383+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
13811384

13821385
rxr_msg_construct(&msg, iov, desc, count, src_addr, context, 0);
1383-
return rxr_msg_recvmsg(ep, &msg, 0);
1386+
return rxr_msg_recvmsg(ep_fid, &msg, rxr_rx_flags(ep));
13841387
}
13851388

13861389
/**
@@ -1393,12 +1396,15 @@ ssize_t rxr_msg_trecv(struct fid_ep *ep_fid, void *buf, size_t len, void *desc,
13931396
{
13941397
struct fi_msg msg = {0};
13951398
struct iovec iov;
1399+
struct rxr_ep *ep;
1400+
1401+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
13961402

13971403
iov.iov_base = (void *)buf;
13981404
iov.iov_len = len;
13991405

14001406
rxr_msg_construct(&msg, &iov, &desc, 1, src_addr, context, 0);
1401-
return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1407+
return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, rxr_rx_flags(ep));
14021408
}
14031409

14041410
static
@@ -1407,9 +1413,12 @@ ssize_t rxr_msg_trecvv(struct fid_ep *ep_fid, const struct iovec *iov,
14071413
uint64_t tag, uint64_t ignore, void *context)
14081414
{
14091415
struct fi_msg msg = {0};
1416+
struct rxr_ep *ep;
1417+
1418+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
14101419

14111420
rxr_msg_construct(&msg, iov, desc, count, src_addr, context, 0);
1412-
return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, 0);
1421+
return rxr_msg_generic_recv(ep_fid, &msg, tag, ignore, ofi_op_tagged, rxr_rx_flags(ep));
14131422
}
14141423

14151424
static
@@ -1418,6 +1427,9 @@ ssize_t rxr_msg_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tmsg
14181427
{
14191428
ssize_t ret;
14201429
struct fi_msg msg = {0};
1430+
struct rxr_ep *ep;
1431+
1432+
ep = container_of(ep_fid, struct rxr_ep, base_ep.util_ep.ep_fid.fid);
14211433

14221434
if (flags & FI_PEEK) {
14231435
ret = rxr_msg_peek_trecv(ep_fid, tmsg, flags);
@@ -1429,7 +1441,7 @@ ssize_t rxr_msg_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *tmsg
14291441

14301442
rxr_msg_construct(&msg, tmsg->msg_iov, tmsg->desc, tmsg->iov_count, tmsg->addr, tmsg->context, tmsg->data);
14311443
ret = rxr_msg_generic_recv(ep_fid, &msg, tmsg->tag, tmsg->ignore,
1432-
ofi_op_tagged, flags);
1444+
ofi_op_tagged, flags | ep->base_ep.util_ep.rx_msg_flags);
14331445

14341446
out:
14351447
return ret;

prov/efa/src/rdm/rxr_op_entry.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,9 @@ void rxr_rx_entry_report_completion(struct rxr_op_entry *rx_entry)
716716
struct rxr_ep *ep = rx_entry->ep;
717717
struct util_cq *rx_cq = ep->base_ep.util_ep.rx_cq;
718718
int ret = 0;
719+
uint64_t cq_flags;
719720

721+
cq_flags = (ep->base_ep.util_ep.rx_msg_flags == FI_COMPLETION) ? 0 : FI_SELECTIVE_COMPLETION;
720722
if (OFI_UNLIKELY(rx_entry->cq_entry.len < rx_entry->total_len)) {
721723
EFA_WARN(FI_LOG_CQ,
722724
"Message truncated! tag: %"PRIu64" incoming message size: %"PRIu64" receiving buffer size: %zu\n",
@@ -748,7 +750,7 @@ void rxr_rx_entry_report_completion(struct rxr_op_entry *rx_entry)
748750
}
749751

750752
if (!(rx_entry->rxr_flags & RXR_RX_ENTRY_RECV_CANCEL) &&
751-
(ofi_need_completion(rxr_rx_flags(ep), rx_entry->fi_flags) ||
753+
(ofi_need_completion(cq_flags, rx_entry->fi_flags) ||
752754
(rx_entry->cq_entry.flags & FI_MULTI_RECV))) {
753755
EFA_DBG(FI_LOG_CQ,
754756
"Writing recv completion for rx_entry from peer: %"

0 commit comments

Comments
 (0)