Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions prov/efa/src/efa_shm.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ void efa_shm_info_create(const struct fi_info *app_info, struct fi_info **shm_in
shm_hints->domain_attr->caps |= FI_LOCAL_COMM;
shm_hints->tx_attr->msg_order = FI_ORDER_SAS;
shm_hints->rx_attr->msg_order = FI_ORDER_SAS;
/*
* Unlike efa, shm does not have FI_COMPLETION in tx/rx_op_flags unless user request
* it via hints. That means if user does not request FI_COMPLETION in the hints, and bind
* shm cq to shm ep with FI_SELECTIVE_COMPLETION flags,
* shm will not write cqe for fi_send* (fi_sendmsg is an exception, as user can specify flags),
* similarly for the recv ops. It is common for application like ompi to
* bind cq with FI_SELECTIVE_COMPLETION, and call fi_senddata in which it expects libfabric to
* write cqe. We should follow this pattern and request FI_COMPLETION to shm as default tx/rx_op_flags.
*/
shm_hints->tx_attr->op_flags = FI_COMPLETION;
shm_hints->rx_attr->op_flags = FI_COMPLETION;
shm_hints->fabric_attr->name = strdup("shm");
shm_hints->fabric_attr->prov_name = strdup("shm");
shm_hints->ep_attr->type = FI_EP_RDM;
Expand Down
65 changes: 56 additions & 9 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,26 @@ const char *efa_rdm_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
static
int rxr_cq_close(struct fid *fid)
{
int ret;
efa_rdm_cq *cq;
int ret, retv;
struct efa_rdm_cq *cq;

retv = 0;

cq = container_of(fid, struct efa_rdm_cq, util_cq.cq_fid.fid);

if (cq->shm_cq) {
ret = fi_close(&cq->shm_cq->fid);
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to close shm cq: %s\n", fi_strerror(-ret));
retv = ret;
}
}

cq = container_of(fid, efa_rdm_cq, cq_fid.fid);
ret = ofi_cq_cleanup(cq);
ret = ofi_cq_cleanup(&cq->util_cq);
if (ret)
return ret;
free(cq);
return 0;
return retv;
}

static struct fi_ops efa_rdm_cq_fi_ops = {
Expand All @@ -70,10 +81,28 @@ static struct fi_ops efa_rdm_cq_fi_ops = {
.ops_open = fi_no_ops_open,
};

static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr)
{
struct efa_rdm_cq *cq;
ssize_t ret;

cq = container_of(cq_fid, struct efa_rdm_cq, util_cq.cq_fid.fid);

if (cq->shm_cq)
fi_cq_read(cq->shm_cq, NULL, 0);

ret = ofi_cq_read_entries(&cq->util_cq, buf, count, src_addr);

if (ret > 0)
return ret;

return ofi_cq_readfrom(&cq->util_cq.cq_fid, buf, count, src_addr);
}

static struct fi_ops_cq efa_rdm_cq_ops = {
.size = sizeof(struct fi_ops_cq),
.read = ofi_cq_read,
.readfrom = ofi_cq_readfrom,
.readfrom = efa_rdm_cq_readfrom,
.readerr = ofi_cq_readerr,
.sread = fi_no_cq_sread,
.sreadfrom = fi_no_cq_sreadfrom,
Expand All @@ -98,8 +127,10 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context)
{
int ret;
efa_rdm_cq *cq;
struct efa_rdm_cq *cq;
struct efa_domain *efa_domain;
struct fi_cq_attr shm_cq_attr = {0};
struct fi_peer_cq_context peer_cq_context = {0};

if (attr->wait_obj != FI_WAIT_NONE)
return -FI_ENOSYS;
Expand All @@ -113,15 +144,31 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
/* Override user cq size if it's less than recommended cq size */
attr->size = MAX(efa_domain->rdm_cq_size, attr->size);

ret = ofi_cq_init(&efa_prov, domain, attr, cq,
ret = ofi_cq_init(&efa_prov, domain, attr, &cq->util_cq,
&ofi_cq_progress, context);

if (ret)
goto free;

*cq_fid = &cq->cq_fid;
*cq_fid = &cq->util_cq.cq_fid;
(*cq_fid)->fid.ops = &efa_rdm_cq_fi_ops;
(*cq_fid)->ops = &efa_rdm_cq_ops;

/* open shm cq as peer cq */
if (efa_domain->shm_domain) {
memcpy(&shm_cq_attr, attr, sizeof(*attr));
/* Bind ep with shm provider's cq */
shm_cq_attr.flags |= FI_PEER;
peer_cq_context.size = sizeof(peer_cq_context);
peer_cq_context.cq = cq->util_cq.peer_cq;
ret = fi_cq_open(efa_domain->shm_domain, &shm_cq_attr,
&cq->shm_cq, &peer_cq_context);
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to open shm cq: %s\n", fi_strerror(-ret));
goto free;
}
}

return 0;
free:
free(cq);
Expand Down
5 changes: 4 additions & 1 deletion prov/efa/src/rdm/efa_rdm_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@

#include <ofi_util.h>

typedef struct util_cq efa_rdm_cq;
struct efa_rdm_cq {
struct util_cq util_cq;
struct fid_cq *shm_cq;
};

/*
* Control header with completion data. CQ data length is static.
Expand Down
1 change: 0 additions & 1 deletion prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ struct efa_rdm_peer {
uint32_t nextra_p3; /**< number of members in extra_info plus 3 (See protocol v4 document section 2.1) */
uint64_t extra_info[RXR_MAX_NUM_EXINFO]; /**< the feature/request flag for each version (See protocol v4 document section 2.1)*/
size_t efa_outstanding_tx_ops; /**< tracks outstanding tx ops (send/read) to this peer on EFA device */
size_t shm_outstanding_tx_ops; /**< tracks outstanding tx ops (send/read/write/atomic) to this peer on SHM */
struct dlist_entry outstanding_tx_pkts; /**< a list of outstanding pkts sent to the peer */
uint64_t rnr_backoff_begin_ts; /**< timestamp for RNR backoff period begin */
uint64_t rnr_backoff_wait_time; /**< how long the RNR backoff period last */
Expand Down
Loading