Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 5 additions & 69 deletions prov/efa/src/rdm/rxr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,10 @@ int rxr_ep_post_user_recv_buf(struct rxr_ep *ep, struct rxr_op_entry *rx_entry,
*
* @param[in] ep endpoint
* @param[in] flags flags passed to lower provider, can have FI_MORE
* @param[in] lower_ep_type lower endpoint type, can be either SHM_EP or EFA_EP
* @return On success, return 0
* On failure, return a negative error code.
*/
int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lower_ep_type lower_ep_type)
int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags)
{
void *desc;
struct rxr_pkt_entry *rx_pkt_entry = NULL;
Expand Down Expand Up @@ -317,13 +316,11 @@ int rxr_ep_post_internal_rx_pkt(struct rxr_ep *ep, uint64_t flags, enum rxr_lowe
*
* @param[in] ep endpint
* @param[in] nrecv number of receive buffers to post
* @param[in] lower_ep_type device type, can be SHM_EP or EFA_EP
* @return On success, return 0
* On failure, return negative libfabric error code
*/
static inline
ssize_t rxr_ep_bulk_post_internal_rx_pkts(struct rxr_ep *ep, int nrecv,
enum rxr_lower_ep_type lower_ep_type)
ssize_t rxr_ep_bulk_post_internal_rx_pkts(struct rxr_ep *ep, int nrecv)
{
int i;
ssize_t err;
Expand All @@ -334,7 +331,7 @@ ssize_t rxr_ep_bulk_post_internal_rx_pkts(struct rxr_ep *ep, int nrecv,
if (i == nrecv - 1)
flags = 0;

err = rxr_ep_post_internal_rx_pkt(ep, flags, lower_ep_type);
err = rxr_ep_post_internal_rx_pkt(ep, flags);
if (OFI_UNLIKELY(err))
return err;
}
Expand Down Expand Up @@ -1561,7 +1558,7 @@ void rxr_ep_progress_post_internal_rx_pkts(struct rxr_ep *ep)
}
}

err = rxr_ep_bulk_post_internal_rx_pkts(ep, ep->efa_rx_pkts_to_post, EFA_EP);
err = rxr_ep_bulk_post_internal_rx_pkts(ep, ep->efa_rx_pkts_to_post);
if (err)
goto err_exit;

Expand Down Expand Up @@ -1784,7 +1781,7 @@ static inline void rdm_ep_poll_ibv_cq_ex(struct rxr_ep *ep, size_t cqe_to_proces

pkt_entry->pkt_size = ibv_wc_read_byte_len(ep->ibv_cq_ex);
assert(pkt_entry->pkt_size > 0);
rxr_pkt_handle_recv_completion(ep, pkt_entry, EFA_EP);
rxr_pkt_handle_recv_completion(ep, pkt_entry);
#if ENABLE_DEBUG
ep->recv_comps++;
#endif
Expand Down Expand Up @@ -2409,67 +2406,6 @@ void rxr_ep_record_tx_op_completed(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_
}
}

/**
* @brief handle two types of completion entries from shm provider
*
* This function handles the following two scenarios:
* 1. RMA writes with immediate data at remote endpoint,
* 2. atomic completion on the requester
* For both cases, this function report completion to user.
*
* @param[in] ep endpoint
* @param[in] cq_entry CQ entry from shm provider
* @param[in] src_addr source address
*/
void rxr_ep_handle_misc_shm_completion(struct rxr_ep *ep,
struct fi_cq_data_entry *cq_entry,
fi_addr_t src_addr)
{
struct util_cq *target_cq;
int ret;

if (cq_entry->flags & FI_ATOMIC) {
target_cq = ep->base_ep.util_ep.tx_cq;
} else {
assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
target_cq = ep->base_ep.util_ep.rx_cq;
}

if (ep->base_ep.util_ep.caps & FI_SOURCE)
ret = ofi_cq_write_src(target_cq,
cq_entry->op_context,
cq_entry->flags,
cq_entry->len,
cq_entry->buf,
cq_entry->data,
0,
src_addr);
else
ret = ofi_cq_write(target_cq,
cq_entry->op_context,
cq_entry->flags,
cq_entry->len,
cq_entry->buf,
cq_entry->data,
0);

rxr_rm_rx_cq_check(ep, target_cq);

if (OFI_UNLIKELY(ret)) {
EFA_WARN(FI_LOG_CQ,
"Unable to write a cq entry for shm operation: %s\n",
fi_strerror(-ret));
efa_eq_write_error(&ep->base_ep.util_ep, FI_EIO, FI_EFA_ERR_WRITE_SHM_CQ_ENTRY);
}

if (cq_entry->flags & FI_ATOMIC) {
efa_cntr_report_tx_completion(&ep->base_ep.util_ep, cq_entry->flags);
} else {
assert(cq_entry->flags & FI_REMOTE_CQ_DATA);
efa_cntr_report_rx_completion(&ep->base_ep.util_ep, cq_entry->flags);
}
}

/**
* @brief handle RX completion due to FI_REMOTE_CQ_DATA via RECV_RDMA_WITH_IMM
*
Expand Down
9 changes: 0 additions & 9 deletions prov/efa/src/rdm/rxr_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ enum ibv_cq_ex_type {
EFADV_CQ
};

enum rxr_lower_ep_type {
EFA_EP = 1,
SHM_EP,
};

/** @brief Information of a queued copy.
*
* This struct is used when receiving buffer is on device.
Expand Down Expand Up @@ -321,10 +316,6 @@ void rxr_ep_queue_rnr_pkt(struct rxr_ep *ep,
struct dlist_entry *list,
struct rxr_pkt_entry *pkt_entry);

void rxr_ep_handle_misc_shm_completion(struct rxr_ep *ep,
struct fi_cq_data_entry *cq_entry,
fi_addr_t src_addr);

static inline
struct efa_domain *rxr_ep_domain(struct rxr_ep *ep)
{
Expand Down
13 changes: 3 additions & 10 deletions prov/efa/src/rdm/rxr_pkt_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1009,12 +1009,10 @@ fi_addr_t rxr_pkt_determine_addr(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_en
*
* @param ep[in,out] endpoint
* @param pkt_entry[in,out] received packet, will be released by this function
* @param lower_ep_type[in] indicates which type of lower device this packet was received from.
* Possible values are SHM_EP and EFA_EP.
*/
void rxr_pkt_handle_recv_completion(struct rxr_ep *ep,
struct rxr_pkt_entry *pkt_entry,
enum rxr_lower_ep_type lower_ep_type)
struct rxr_pkt_entry *pkt_entry)
{
int pkt_type;
struct efa_rdm_peer *peer;
Expand Down Expand Up @@ -1061,7 +1059,7 @@ void rxr_pkt_handle_recv_completion(struct rxr_ep *ep,
#endif
peer = rxr_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
if (peer->is_local && lower_ep_type == EFA_EP) {
if (peer->is_local) {
/*
* This happens when the peer is on same instance, but chose to
* use EFA device to communicate with me. In this case, we respect
Expand All @@ -1073,12 +1071,7 @@ void rxr_pkt_handle_recv_completion(struct rxr_ep *ep,

rxr_pkt_post_handshake_or_queue(ep, peer);

if (lower_ep_type == SHM_EP) {
ep->shm_rx_pkts_posted--;
} else {
assert(lower_ep_type == EFA_EP);
ep->efa_rx_pkts_posted--;
}
ep->efa_rx_pkts_posted--;

if (pkt_entry->alloc_type == RXR_PKT_FROM_USER_BUFFER) {
assert(pkt_entry->x_entry);
Expand Down
3 changes: 1 addition & 2 deletions prov/efa/src/rdm/rxr_pkt_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ void rxr_pkt_handle_recv_error(struct rxr_ep *ep,
int err, int prov_errno);

void rxr_pkt_handle_recv_completion(struct rxr_ep *ep,
struct rxr_pkt_entry *pkt_entry,
enum rxr_lower_ep_type lower_ep_type);
struct rxr_pkt_entry *pkt_entry);

ssize_t rxr_pkt_trigger_handshake(struct rxr_ep *ep,
fi_addr_t addr, struct efa_rdm_peer *peer);
Expand Down
53 changes: 12 additions & 41 deletions prov/efa/src/rdm/rxr_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,24 +190,19 @@ ssize_t rxr_read_mr_reg(struct rxr_ep *ep, struct rxr_read_entry *read_entry)
* to lower provider type. It also handle the case application does not
* provider descriptors.
*
* @param lower_ep_type[in] lower efa type, can be EFA_EP or SHM_EP.
* @param numdesc[in] number of descriptors in the array
* @param desc_in[in] descriptors provided by application
* @param desc_out[out] descriptors for lower provider.
*/
static inline
void rxr_read_copy_desc(enum rxr_lower_ep_type lower_ep_type,
int numdesc, void **desc_in, void **desc_out)
void rxr_read_copy_desc(int numdesc, void **desc_in, void **desc_out)
{
if (!desc_in) {
memset(desc_out, 0, numdesc * sizeof(void *));
return;
}

memcpy(desc_out, desc_in, numdesc * sizeof(void *));
if (lower_ep_type == SHM_EP) {
rxr_convert_desc_for_shm(numdesc, desc_out);
}
}

/* rxr_read_alloc_entry allocates a read entry.
Expand All @@ -216,16 +211,13 @@ void rxr_read_copy_desc(enum rxr_lower_ep_type lower_ep_type,
* x_entry: can be a tx_entry or an rx_entry.
* If x_entry is tx_entry, application called fi_read().
* If x_entry is rx_entry, read message protocol is being used.
* lower_ep_type: EFA_EP or SHM_EP
* Return:
* On success, return the pointer of allocated read_entry
* Otherwise, return NULL
*/
struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_entry *op_entry,
enum rxr_lower_ep_type lower_ep_type)
struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_entry *op_entry)
{
struct rxr_read_entry *read_entry;
int i;
size_t total_iov_len, total_rma_iov_len;

read_entry = ofi_buf_alloc(ep->read_entry_pool);
Expand All @@ -251,11 +243,10 @@ struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_ent
total_rma_iov_len = ofi_total_rma_iov_len(op_entry->rma_iov, op_entry->rma_iov_count);
read_entry->total_len = MIN(total_iov_len, total_rma_iov_len);

rxr_read_copy_desc(lower_ep_type, read_entry->iov_count, op_entry->desc, read_entry->mr_desc);
rxr_read_copy_desc(read_entry->iov_count, op_entry->desc, read_entry->mr_desc);

read_entry->context = op_entry;
read_entry->addr = op_entry->addr;
read_entry->lower_ep_type = lower_ep_type;

if (op_entry->type == RXR_TX_ENTRY) {
assert(op_entry->op == ofi_op_read_req);
Expand All @@ -273,15 +264,6 @@ struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_ent

memset(read_entry->mr, 0, read_entry->iov_count * sizeof(struct fid_mr *));

if (lower_ep_type == SHM_EP) {
assert(lower_ep_type == SHM_EP);
/* FI_MR_VIRT_ADDR is not being set, use 0-based offset instead. */
if (!(rxr_ep_domain(ep)->shm_info->domain_attr->mr_mode & FI_MR_VIRT_ADDR)) {
for (i = 0; i < read_entry->rma_iov_count; ++i)
read_entry->rma_iov[i].addr = 0;
}
}

return read_entry;
}

Expand Down Expand Up @@ -342,7 +324,6 @@ int rxr_read_post_local_read_or_queue(struct rxr_ep *ep,

read_entry->type = RXR_READ_ENTRY;
read_entry->read_id = ofi_buf_index(read_entry);
read_entry->lower_ep_type = EFA_EP;
read_entry->context_type = RXR_READ_CONTEXT_PKT_ENTRY;
read_entry->context = pkt_entry;
read_entry->state = RXR_RDMA_ENTRY_CREATED;
Expand All @@ -363,7 +344,7 @@ int rxr_read_post_local_read_or_queue(struct rxr_ep *ep,
read_entry->iov_count = rx_entry->iov_count;
memset(read_entry->mr, 0, sizeof(*read_entry->mr) * read_entry->iov_count);
memcpy(read_entry->iov, rx_entry->iov, rx_entry->iov_count * sizeof(struct iovec));
rxr_read_copy_desc(EFA_EP, rx_entry->iov_count, rx_entry->desc, read_entry->mr_desc);
rxr_read_copy_desc(rx_entry->iov_count, rx_entry->desc, read_entry->mr_desc);
ofi_consume_iov_desc(read_entry->iov, read_entry->mr_desc, &read_entry->iov_count, data_offset);
if (read_entry->iov_count == 0) {
EFA_WARN(FI_LOG_CQ,
Expand Down Expand Up @@ -418,10 +399,7 @@ int rxr_read_post(struct rxr_ep *ep, struct rxr_read_entry *read_entry)
/* because fi_send uses a pkt_entry as context
* we had to use a pkt_entry as context too
*/
if (read_entry->lower_ep_type == SHM_EP)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand All @@ -440,21 +418,17 @@ int rxr_read_post(struct rxr_ep *ep, struct rxr_read_entry *read_entry)

assert(read_entry->bytes_submitted < read_entry->total_len);
if (read_entry->context_type == RXR_READ_CONTEXT_PKT_ENTRY) {
assert(read_entry->lower_ep_type == EFA_EP);
ret = rxr_read_prepare_pkt_entry_mr(ep, read_entry);
if (ret)
return ret;
}

if (read_entry->lower_ep_type == EFA_EP) {
ret = rxr_read_mr_reg(ep, read_entry);
if (ret)
return ret;
}
ret = rxr_read_mr_reg(ep, read_entry);
if (ret)
return ret;

max_read_once_len = (read_entry->lower_ep_type == EFA_EP)
? MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size)
: SIZE_MAX;
max_read_once_len =
MIN(rxr_env.efa_read_segment_size, rxr_ep_domain(ep)->device->max_rdma_size);
assert(max_read_once_len > 0);

ret = rxr_locate_iov_pos(read_entry->iov, read_entry->iov_count,
Expand All @@ -478,7 +452,7 @@ int rxr_read_post(struct rxr_ep *ep, struct rxr_read_entry *read_entry)

while (read_entry->bytes_submitted < read_entry->total_len) {

if (read_entry->lower_ep_type == EFA_EP && ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
if (ep->efa_outstanding_tx_ops == ep->efa_max_outstanding_tx_ops)
return -FI_EAGAIN;

assert(iov_idx < read_entry->iov_count);
Expand All @@ -490,10 +464,7 @@ int rxr_read_post(struct rxr_ep *ep, struct rxr_read_entry *read_entry)
read_entry->rma_iov[rma_iov_idx].len - rma_iov_offset);
read_once_len = MIN(read_once_len, max_read_once_len);

if (read_entry->lower_ep_type == SHM_EP)
pkt_entry = rxr_pkt_entry_alloc(ep, ep->shm_tx_pkt_pool, RXR_PKT_FROM_SHM_TX_POOL);
else
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);
pkt_entry = rxr_pkt_entry_alloc(ep, ep->efa_tx_pkt_pool, RXR_PKT_FROM_EFA_TX_POOL);

if (OFI_UNLIKELY(!pkt_entry))
return -FI_EAGAIN;
Expand Down
4 changes: 1 addition & 3 deletions prov/efa/src/rdm/rxr_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ enum rxr_read_entry_state {
struct rxr_read_entry {
enum rxr_x_entry_type type;
int read_id;
enum rxr_lower_ep_type lower_ep_type;

void *context;
enum rxr_read_context_type context_type;
Expand All @@ -102,8 +101,7 @@ struct rxr_read_entry {
struct dlist_entry pending_entry;
};

struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_entry *x_entry,
enum rxr_lower_ep_type lower_ep_type);
struct rxr_read_entry *rxr_read_alloc_entry(struct rxr_ep *ep, struct rxr_op_entry *x_entry);

void rxr_read_release_entry(struct rxr_ep *ep, struct rxr_read_entry *read_entry);

Expand Down