Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions prov/efa/src/rdm/rxr_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ ssize_t rxr_atomic_generic_efa(struct rxr_ep *rxr_ep,
err = rxr_pkt_post_req(rxr_ep,
tx_entry,
RXR_DC_WRITE_RTA_PKT,
0,
0);
} else {
/*
Expand All @@ -211,7 +210,6 @@ ssize_t rxr_atomic_generic_efa(struct rxr_ep *rxr_ep,
err = rxr_pkt_post_req(rxr_ep,
tx_entry,
req_pkt_type_list[op],
0,
0);
}

Expand Down
14 changes: 6 additions & 8 deletions prov/efa/src/rdm/rxr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,13 @@ void rxr_ep_set_use_shm_for_tx(struct rxr_ep *ep)

/* App provided hints supercede environmental variables.
*
* Using the shm provider comes with some overheads, particularly in the
* progress engine when polling an empty completion queue, so avoid
* Using the shm provider comes with some overheads, so avoid
* initializing the provider if the app provides a hint that it does not
* require node-local communication. We can still loopback over the EFA
* device in cases where the app violates the hint and continues
* communicating with node-local peers.
*
* aws-ofi-nccl relies on this feature.
*/
if (ep->user_info
/* If the app requires explicitly remote communication */
Expand Down Expand Up @@ -1934,8 +1935,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)
continue;

assert(op_entry->rxr_flags & RXR_OP_ENTRY_QUEUED_CTRL);
ret = rxr_pkt_post(ep, op_entry, op_entry->queued_ctrl.type,
op_entry->queued_ctrl.inject, 0);
ret = rxr_pkt_post(ep, op_entry, op_entry->queued_ctrl_type, 0);
if (ret == -FI_EAGAIN)
break;

Expand Down Expand Up @@ -2004,7 +2004,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
break;
ret = rxr_pkt_post(ep, op_entry, RXR_DATA_PKT, false, flags);
ret = rxr_pkt_post(ep, op_entry, RXR_DATA_PKT, flags);
if (OFI_UNLIKELY(ret)) {
if (ret == -FI_EAGAIN)
goto out;
Expand Down Expand Up @@ -2067,9 +2067,7 @@ void rxr_ep_progress_internal(struct rxr_ep *ep)
* The core's TX queue is full so we can't do any
* additional work.
*/
bool use_shm = peer->is_local && ep->use_shm_for_tx;

if (!use_shm && ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
if (ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
goto out;

ret = rxr_op_entry_post_remote_read(op_entry);
Expand Down
4 changes: 2 additions & 2 deletions prov/efa/src/rdm/rxr_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ ssize_t rxr_msg_post_rtm(struct rxr_ep *ep, struct rxr_op_entry *tx_entry, int u

if (rtm_type < RXR_EXTRA_REQ_PKT_BEGIN) {
/* rtm requires only baseline feature, which peer should always support. */
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0, 0);
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0);
}

/*
Expand All @@ -172,7 +172,7 @@ ssize_t rxr_msg_post_rtm(struct rxr_ep *ep, struct rxr_op_entry *tx_entry, int u
if (!rxr_pkt_req_supported_by_peer(rtm_type, peer))
return -FI_EOPNOTSUPP;

return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0, 0);
return rxr_pkt_post_req(ep, tx_entry, rtm_type, 0);
}

ssize_t rxr_msg_generic_send(struct fid_ep *ep, const struct fi_msg *msg,
Expand Down
83 changes: 12 additions & 71 deletions prov/efa/src/rdm/rxr_op_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,17 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
* user's data buffer is on host memory (Though user can register
* its buffer, and provide its descriptor as an optimization).
*
* However, there are a few occations that EFA device and shm
* However, there are a few occations that EFA device
* require memory to be register with them:
*
* First, when EFA device is used to send data:
* When EFA device is used to send data:
*
* If a non-read based protocol (such as eager, meidum, longcts)
* is used, the send buffer must be registered with EFA device.
*
* If a read based protocol is used, both send buffer
* and receive buffer must be registered with EFA device.
*
* Second, when shm is used:
* If eager protocol is used, no registration is needed (because
* shm does not require registration for local buffer)
*
* If a read based protocol is used, the send buffer must
* be registered with shm, because send buffer is used as
* remote buffer in a read based protocol.
*
* Therefore, when user did not provide descriptors for the buffer(s),
* EFA provider need to bridge the gap.
*
Expand All @@ -258,9 +250,7 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
* Because of the high cost of memory registration, this happens
* only when MR cache is available, which is checked by the caller
* of this function on sender side. (this happens when
*
* 1. EFA device is used with non-eager protocols and
* 2. SHM is used with long read protocol
* EFA device is used with non-eager protocols and
*
* This function is not guaranteed to fill all descriptors (which
* is why the function name has try). When memory registration fail due
Expand All @@ -287,36 +277,15 @@ void rxr_rx_entry_release(struct rxr_op_entry *rx_entry)
void rxr_op_entry_try_fill_desc(struct rxr_op_entry *op_entry, int mr_iov_start, uint64_t access)
{
int i, err;
struct efa_rdm_peer *peer;

peer = rxr_ep_get_peer(op_entry->ep, op_entry->addr);

for (i = mr_iov_start; i < op_entry->iov_count; ++i) {
if (op_entry->desc[i])
continue;


if (peer->is_local && op_entry->ep->use_shm_for_tx) {
if (access == FI_REMOTE_READ) {
/* this happens when longread protocol message protocl was used
* with shm. The send buffer is going to be read by receiver,
* therefore must be registered with shm provider.
*/
assert(op_entry->type == RXR_TX_ENTRY);
err = efa_mr_reg_shm(&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i,
access, &op_entry->mr[i]);
} else {
assert(access == FI_SEND || access == FI_RECV);
/* shm does not require registration for send and recv */
err = 0;
}
} else {
err = fi_mr_regv(&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i, 1,
access,
0, 0, 0, &op_entry->mr[i], NULL);
}
err = fi_mr_regv(
&rxr_ep_domain(op_entry->ep)->util_domain.domain_fid,
op_entry->iov + i, 1, access, 0, 0, 0, &op_entry->mr[i],
NULL);

if (err) {
EFA_WARN(FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -441,10 +410,6 @@ size_t rxr_tx_entry_max_req_data_capacity(struct rxr_ep *ep, struct rxr_op_entry
peer = rxr_ep_get_peer(ep, tx_entry->addr);
assert(peer);

if (peer->is_local && ep->use_shm_for_tx) {
return rxr_env.shm_max_medium_size;
}

if (efa_rdm_peer_need_raw_addr_hdr(peer))
header_flags |= RXR_REQ_OPT_RAW_ADDR_HDR;
else if (efa_rdm_peer_need_connid(peer))
Expand Down Expand Up @@ -1041,8 +1006,6 @@ void rxr_op_entry_handle_recv_completed(struct rxr_op_entry *op_entry)
{
struct rxr_op_entry *tx_entry = NULL;
struct rxr_op_entry *rx_entry = NULL;
struct efa_rdm_peer *peer;
bool inject;
int err;

/* It is important to write completion before sending ctrl packet, because the
Expand Down Expand Up @@ -1106,19 +1069,11 @@ void rxr_op_entry_handle_recv_completed(struct rxr_op_entry *op_entry)
*
* Hence, the rx_entry can be safely released only when we got
* the send completion of the ctrl packet.
*
* Another interesting point is that when inject was used, the
* rx_entry was released by rxr_pkt_post_or_queue(), because
* when inject was used, lower device will not provider send
* completion for the ctrl packet.
*/
if (op_entry->rxr_flags & RXR_TX_ENTRY_DELIVERY_COMPLETE_REQUESTED) {
assert(op_entry->type == RXR_RX_ENTRY);
rx_entry = op_entry; /* Intentionally assigned for easier understanding */
peer = rxr_ep_get_peer(rx_entry->ep, rx_entry->addr);
assert(peer);
inject = peer->is_local && rx_entry->ep->use_shm_for_tx;
err = rxr_pkt_post_or_queue(rx_entry->ep, rx_entry, RXR_RECEIPT_PKT, inject);
err = rxr_pkt_post_or_queue(rx_entry->ep, rx_entry, RXR_RECEIPT_PKT);
if (OFI_UNLIKELY(err)) {
EFA_WARN(FI_LOG_CQ,
"Posting of ctrl packet failed when complete rx! err=%s(%d)\n",
Expand Down Expand Up @@ -1254,17 +1209,13 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
int iov_idx = 0, rma_iov_idx = 0;
size_t iov_offset = 0, rma_iov_offset = 0;
size_t read_once_len, max_read_once_len;
bool use_shm;
struct rxr_ep *ep;
struct efa_rdm_peer *peer;
struct rxr_pkt_entry *pkt_entry;

assert(op_entry->iov_count > 0);
assert(op_entry->rma_iov_count > 0);

ep = op_entry->ep;
peer = rxr_ep_get_peer(ep, op_entry->addr);
use_shm = peer->is_local && ep->use_shm_for_tx;

if (op_entry->bytes_read_total_len == 0) {

Expand All @@ -1275,10 +1226,7 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
* Note that because send operation used a pkt_entry as wr_id,
* we had to use a pkt_entry as context for read too.
*/
if (use_shm)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand All @@ -1297,11 +1245,9 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)

assert(op_entry->bytes_read_submitted < op_entry->bytes_read_total_len);

if (!use_shm) {
rxr_op_entry_try_fill_desc(op_entry, 0, FI_RECV);
}
rxr_op_entry_try_fill_desc(op_entry, 0, FI_RECV);

max_read_once_len = use_shm ? SIZE_MAX : MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size);
max_read_once_len = MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size);
assert(max_read_once_len > 0);

err = rxr_locate_iov_pos(op_entry->iov, op_entry->iov_count,
Expand All @@ -1327,7 +1273,6 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
assert(rma_iov_idx < op_entry->rma_iov_count);
assert(rma_iov_offset < op_entry->rma_iov[rma_iov_idx].len);

if (!use_shm) {
if (ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
return -FI_EAGAIN;

Expand All @@ -1340,12 +1285,8 @@ int rxr_op_entry_post_remote_read(struct rxr_op_entry *op_entry)
*/
return -FI_EAGAIN;
}
}

if (use_shm)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand Down
7 changes: 1 addition & 6 deletions prov/efa/src/rdm/rxr_op_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ enum rxr_op_comm_type {
RXR_RX_RECV, /* rx_entry large msg recv data pkts */
};

struct rxr_queued_ctrl_info {
int type;
int inject;
};

struct rxr_atomic_hdr {
/* atomic_op is different from tx_op */
uint32_t atomic_op;
Expand Down Expand Up @@ -116,7 +111,7 @@ struct rxr_op_entry {
uint64_t total_len;

enum rxr_op_comm_type state;
struct rxr_queued_ctrl_info queued_ctrl;
int queued_ctrl_type;

uint64_t fi_flags;
uint16_t rxr_flags;
Expand Down
Loading