Skip to content

Commit 0a822f8

Browse files
authored
Merge pull request #4821 from nrspruit/OFI_mtl_multi_event_progress
MTL OFI: Added support for reading multiple CQ events in ofi progress
2 parents c0c70a8 + e7bff50 commit 0a822f8

File tree

3 files changed

+66
-14
lines changed

3 files changed

+66
-14
lines changed

ompi/mca/mtl/ofi/mtl_ofi.h

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ __opal_attribute_always_inline__ static inline int
6363
ompi_mtl_ofi_progress(void)
6464
{
6565
ssize_t ret;
66-
int count = 0;
67-
struct fi_cq_tagged_entry wc = { 0 };
66+
int count = 0, i, events_read;
6867
struct fi_cq_err_entry error = { 0 };
6968
ompi_mtl_ofi_request_t *ofi_req = NULL;
7069

@@ -74,19 +73,23 @@ ompi_mtl_ofi_progress(void)
7473
* Call the request's callback.
7574
*/
7675
while (true) {
77-
ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, 1);
76+
ret = fi_cq_read(ompi_mtl_ofi.cq, ompi_mtl_ofi.progress_entries,
77+
ompi_mtl_ofi.ofi_progress_event_count);
7878
if (ret > 0) {
79-
count++;
80-
if (NULL != wc.op_context) {
81-
ofi_req = TO_OFI_REQ(wc.op_context);
82-
assert(ofi_req);
83-
ret = ofi_req->event_callback(&wc, ofi_req);
84-
if (OMPI_SUCCESS != ret) {
85-
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
86-
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
87-
__FILE__, __LINE__, ret);
88-
fflush(stderr);
89-
exit(1);
79+
count+= ret;
80+
events_read = ret;
81+
for (i = 0; i < events_read; i++) {
82+
if (NULL != ompi_mtl_ofi.progress_entries[i].op_context) {
83+
ofi_req = TO_OFI_REQ(ompi_mtl_ofi.progress_entries[i].op_context);
84+
assert(ofi_req);
85+
ret = ofi_req->event_callback(&ompi_mtl_ofi.progress_entries[i], ofi_req);
86+
if (OMPI_SUCCESS != ret) {
87+
opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
88+
"*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
89+
__FILE__, __LINE__, ret);
90+
fflush(stderr);
91+
exit(1);
92+
}
9093
}
9194
}
9295
} else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {

ompi/mca/mtl/ofi/mtl_ofi_component.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ ompi_mtl_ofi_component_register(void)
9898
{
9999
int ret;
100100
mca_base_var_enum_t *new_enum = NULL;
101+
char *desc;
101102

102103
param_priority = 25; /* for now give a lower priority than the psm mtl */
103104
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
@@ -125,6 +126,18 @@ ompi_mtl_ofi_component_register(void)
125126
MCA_BASE_VAR_SCOPE_READONLY,
126127
&prov_exclude);
127128

129+
ompi_mtl_ofi.ofi_progress_event_count = 100;
130+
asprintf(&desc, "Max number of events to read each call to OFI progress (default: %d events will be read per OFI progress call)", ompi_mtl_ofi.ofi_progress_event_count);
131+
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
132+
"progress_event_cnt",
133+
desc,
134+
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
135+
OPAL_INFO_LVL_6,
136+
MCA_BASE_VAR_SCOPE_READONLY,
137+
&ompi_mtl_ofi.ofi_progress_event_count);
138+
139+
free(desc);
140+
128141
ret = mca_base_var_enum_create ("control_prog_type", control_prog_type, &new_enum);
129142
if (OPAL_SUCCESS != ret) {
130143
return ret;
@@ -465,6 +478,19 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
465478
* - dynamic memory-spanning memory region
466479
*/
467480
cq_attr.format = FI_CQ_FORMAT_TAGGED;
481+
482+
/**
483+
* If a user has set an ofi_progress_event_count > the default, then
484+
* the CQ size hint is set to the user's desired value such that
485+
* the CQ created will have enough slots to store up to
486+
* ofi_progress_event_count events. If a user has not set the
487+
* ofi_progress_event_count, then the provider is trusted to set a
488+
* default high CQ size and the CQ size hint is left unspecified.
489+
*/
490+
if (ompi_mtl_ofi.ofi_progress_event_count > 100) {
491+
cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
492+
}
493+
468494
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL);
469495
if (ret) {
470496
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
@@ -473,6 +499,17 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
473499
goto error;
474500
}
475501

502+
/**
503+
* Allocate memory for storing the CQ events read in OFI progress.
504+
*/
505+
ompi_mtl_ofi.progress_entries = calloc(ompi_mtl_ofi.ofi_progress_event_count, sizeof(struct fi_cq_tagged_entry));
506+
if (OPAL_UNLIKELY(!ompi_mtl_ofi.progress_entries)) {
507+
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
508+
"%s:%d: alloc of CQ event storage failed: %s\n",
509+
__FILE__, __LINE__, strerror(errno));
510+
goto error;
511+
}
512+
476513
/**
477514
* The remote fi_addr will be stored in the ofi_endpoint struct.
478515
*/
@@ -595,6 +632,10 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
595632
if (ompi_mtl_ofi.fabric) {
596633
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
597634
}
635+
if (ompi_mtl_ofi.progress_entries) {
636+
free(ompi_mtl_ofi.progress_entries);
637+
}
638+
598639
return NULL;
599640
}
600641

@@ -626,6 +667,8 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
626667
goto finalize_err;
627668
}
628669

670+
free(ompi_mtl_ofi.progress_entries);
671+
629672
return OMPI_SUCCESS;
630673

631674
finalize_err:

ompi/mca/mtl/ofi/mtl_ofi_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ typedef struct mca_mtl_ofi_module_t {
4949
/** Maximum inject size */
5050
size_t max_inject_size;
5151

52+
/** Maximum number of CQ events to read in OFI Progress */
53+
int ofi_progress_event_count;
54+
55+
/** CQ event storage */
56+
struct fi_cq_tagged_entry *progress_entries;
57+
5258
} mca_mtl_ofi_module_t;
5359

5460
extern mca_mtl_ofi_module_t ompi_mtl_ofi;

0 commit comments

Comments
 (0)