Skip to content

Commit 4b38b6b

Browse files
committed
Fix multiple issues with the collective requests.
This patch addresses most (if not all) @derbeyn concerns expressed on #1015. I added checks for the requests allocation in all functions, ompi_coll_base_free_reqs is called with the right number of requests, I removed the unnecessary basic_module_comm_t and use the base_module_comm_t instead, I remove all uses of the COLL_BASE_BCAST_USE_BLOCKING define, and other minor fixes.
1 parent dec23f3 commit 4b38b6b

21 files changed

+168
-169
lines changed

ompi/mca/coll/base/coll_base_alltoall.c

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2015 The University of Tennessee and The University
6+
* Copyright (c) 2004-2016 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -44,7 +44,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
4444
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
4545
int i, j, size, rank, err = MPI_SUCCESS, line;
4646
OPAL_PTRDIFF_TYPE ext, gap;
47-
MPI_Request *preq;
47+
ompi_request_t **preq, **reqs;
4848
char *tmp_buffer;
4949
size_t max_size;
5050

@@ -62,19 +62,20 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
6262
ompi_datatype_type_extent (rdtype, &ext);
6363
max_size = opal_datatype_span(&rdtype->super, rcount, &gap);
6464

65+
/* Initiate all send/recv to/from others. */
66+
reqs = coll_base_comm_get_reqs(base_module->base_data, 2);
67+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
68+
6569
/* Allocate a temporary buffer */
6670
tmp_buffer = calloc (max_size, 1);
67-
if (NULL == tmp_buffer) {
68-
return OMPI_ERR_OUT_OF_RESOURCE;
69-
}
71+
if (NULL == tmp_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; }
7072
tmp_buffer -= gap;
7173
max_size = ext * rcount;
7274

7375
/* in-place alltoall slow algorithm (but works) */
7476
for (i = 0 ; i < size ; ++i) {
7577
for (j = i+1 ; j < size ; ++j) {
76-
/* Initiate all send/recv to/from others. */
77-
preq = coll_base_comm_get_reqs(base_module->base_data, size * 2);
78+
preq = reqs;
7879

7980
if (i == rank) {
8081
/* Copy the data into the temporary buffer */
@@ -111,7 +112,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
111112
}
112113

113114
/* Wait for the requests to complete */
114-
err = ompi_request_wait_all (2, base_module->base_data->mcct_reqs, MPI_STATUSES_IGNORE);
115+
err = ompi_request_wait_all (2, reqs, MPI_STATUSES_IGNORE);
115116
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
116117
}
117118
}
@@ -125,7 +126,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
125126
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
126127
rank));
127128
(void)line; // silence compiler warning
128-
ompi_coll_base_free_reqs(base_module->base_data->mcct_reqs, 2);
129+
ompi_coll_base_free_reqs(reqs, 2);
129130
}
130131

131132
/* All done */
@@ -400,20 +401,20 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
400401

401402
/* Post first batch or ireceive and isend requests */
402403
for (nreqs = 0, nrreqs = 0, ri = (rank + 1) % size; nreqs < total_reqs;
403-
ri = (ri + 1) % size, ++nreqs, ++nrreqs) {
404-
error =
405-
MCA_PML_CALL(irecv
406-
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
407-
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
404+
ri = (ri + 1) % size, ++nrreqs) {
405+
nreqs++;
406+
error = MCA_PML_CALL(irecv
407+
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
408+
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
408409
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
409410
}
410-
for ( nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
411-
si = (si + size - 1) % size, ++nreqs, ++nsreqs) {
412-
error =
413-
MCA_PML_CALL(isend
414-
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
415-
MCA_COLL_BASE_TAG_ALLTOALL,
416-
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
411+
for (nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
412+
si = (si + size - 1) % size, ++nsreqs) {
413+
nreqs++;
414+
error = MCA_PML_CALL(isend
415+
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
416+
MCA_COLL_BASE_TAG_ALLTOALL,
417+
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
417418
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
418419
}
419420

@@ -441,11 +442,10 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
441442
ncreqs++;
442443
if (completed < total_reqs) {
443444
if (nrreqs < (size - 1)) {
444-
error =
445-
MCA_PML_CALL(irecv
446-
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
447-
MCA_COLL_BASE_TAG_ALLTOALL, comm,
448-
&reqs[completed]));
445+
error = MCA_PML_CALL(irecv
446+
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
447+
MCA_COLL_BASE_TAG_ALLTOALL, comm,
448+
&reqs[completed]));
449449
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
450450
++nrreqs;
451451
ri = (ri + 1) % size;
@@ -457,6 +457,7 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
457457
MCA_COLL_BASE_TAG_ALLTOALL,
458458
MCA_PML_BASE_SEND_STANDARD, comm,
459459
&reqs[completed]));
460+
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
460461
++nsreqs;
461462
si = (si + size - 1) % size;
462463
}
@@ -472,7 +473,7 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
472473
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error,
473474
rank));
474475
(void)line; // silence compiler warning
475-
ompi_coll_base_free_reqs(reqs, 2 * total_reqs);
476+
ompi_coll_base_free_reqs(reqs, nreqs);
476477
return error;
477478
}
478479

@@ -610,14 +611,16 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
610611
/* Initiate all send/recv to/from others. */
611612

612613
req = rreq = coll_base_comm_get_reqs(data, (size - 1) * 2);
614+
if (NULL == req) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }
613615

614616
prcv = (char *) rbuf;
615617
psnd = (char *) sbuf;
616618

617619
/* Post all receives first -- a simple optimization */
618620

619621
for (nreqs = 0, i = (rank + 1) % size; i != rank;
620-
i = (i + 1) % size, ++rreq, ++nreqs) {
622+
i = (i + 1) % size, ++rreq) {
623+
nreqs++;
621624
err = MCA_PML_CALL(irecv_init
622625
(prcv + (ptrdiff_t)i * rcvinc, rcount, rdtype, i,
623626
MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq));
@@ -630,7 +633,8 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
630633
*/
631634
sreq = rreq;
632635
for (i = (rank + size - 1) % size; i != rank;
633-
i = (i + size - 1) % size, ++sreq, ++nreqs) {
636+
i = (i + size - 1) % size, ++sreq) {
637+
nreqs++;
634638
err = MCA_PML_CALL(isend_init
635639
(psnd + (ptrdiff_t)i * sndinc, scount, sdtype, i,
636640
MCA_COLL_BASE_TAG_ALLTOALL,

ompi/mca/coll/base/coll_base_alltoallv.c

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2015 The University of Tennessee and The University
6+
* Copyright (c) 2004-2016 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -44,7 +44,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
4444
{
4545
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
4646
int i, j, size, rank, err=MPI_SUCCESS;
47-
MPI_Request *preq;
47+
ompi_request_t **preq, **reqs;
4848
char *tmp_buffer;
4949
size_t max_size, rdtype_size;
5050
OPAL_PTRDIFF_TYPE ext, gap = 0;
@@ -75,11 +75,14 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
7575
}
7676
tmp_buffer += gap;
7777

78+
/* Initiate all send/recv to/from others. */
79+
reqs = preq = coll_base_comm_get_reqs(base_module->base_data, 2);
80+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; }
81+
7882
/* in-place alltoallv slow algorithm (but works) */
7983
for (i = 0 ; i < size ; ++i) {
8084
for (j = i+1 ; j < size ; ++j) {
81-
/* Initiate all send/recv to/from others. */
82-
preq = coll_base_comm_get_reqs(base_module->base_data, 2);
85+
preq = reqs;
8386

8487
if (i == rank && rcounts[j]) {
8588
/* Copy the data into the temporary buffer */
@@ -116,7 +119,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
116119
}
117120

118121
/* Wait for the requests to complete */
119-
err = ompi_request_wait_all (2, base_module->base_data->mcct_reqs, MPI_STATUSES_IGNORE);
122+
err = ompi_request_wait_all (2, reqs, MPI_STATUSES_IGNORE);
120123
if (MPI_SUCCESS != err) { goto error_hndl; }
121124
}
122125
}
@@ -125,7 +128,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
125128
/* Free the temporary buffer */
126129
free (tmp_buffer);
127130
if( MPI_SUCCESS != err ) {
128-
ompi_coll_base_free_reqs(base_module->base_data->mcct_reqs, 2 );
131+
ompi_coll_base_free_reqs(reqs, 2 );
129132
}
130133

131134
/* All done */
@@ -207,7 +210,7 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
207210
int i, size, rank, err, nreqs;
208211
char *psnd, *prcv;
209212
ptrdiff_t sext, rext;
210-
MPI_Request *preq;
213+
ompi_request_t **preq, **reqs;
211214
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
212215
mca_coll_base_comm_t *data = base_module->base_data;
213216

@@ -243,22 +246,21 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
243246

244247
/* Now, initiate all send/recv to/from others. */
245248
nreqs = 0;
246-
preq = coll_base_comm_get_reqs(data, 2 * size);
249+
reqs = preq = coll_base_comm_get_reqs(data, 2 * size);
250+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; }
247251

248252
/* Post all receives first */
249253
for (i = 0; i < size; ++i) {
250254
if (i == rank || 0 == rcounts[i]) {
251255
continue;
252256
}
253257

258+
++nreqs;
254259
prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[i] * rext;
255260
err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
256261
i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
257262
preq++));
258-
++nreqs;
259-
if (MPI_SUCCESS != err) {
260-
goto err_hndl;
261-
}
263+
if (MPI_SUCCESS != err) { goto err_hndl; }
262264
}
263265

264266
/* Now post all sends */
@@ -267,31 +269,30 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
267269
continue;
268270
}
269271

272+
++nreqs;
270273
psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[i] * sext;
271274
err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
272275
i, MCA_COLL_BASE_TAG_ALLTOALLV,
273276
MCA_PML_BASE_SEND_STANDARD, comm,
274277
preq++));
275-
++nreqs;
276-
if (MPI_SUCCESS != err) {
277-
goto err_hndl;
278-
}
278+
if (MPI_SUCCESS != err) { goto err_hndl; }
279279
}
280280

281281
/* Start your engines. This will never return an error. */
282-
MCA_PML_CALL(start(nreqs, data->mcct_reqs));
282+
MCA_PML_CALL(start(nreqs, reqs));
283283

284284
/* Wait for them all. If there's an error, note that we don't care
285285
* what the error was -- just that there *was* an error. The PML
286286
* will finish all requests, even if one or more of them fail.
287287
* i.e., by the end of this call, all the requests are free-able.
288288
* So free them anyway -- even if there was an error, and return the
289289
* error after we free everything. */
290-
err = ompi_request_wait_all(nreqs, data->mcct_reqs,
291-
MPI_STATUSES_IGNORE);
290+
err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
291+
if( MPI_SUCCESS == err )
292+
return MPI_SUCCESS;
292293
err_hndl:
293294
/* Free the requests in all cases as they are persistent */
294-
ompi_coll_base_free_reqs(data->mcct_reqs, nreqs);
295+
ompi_coll_base_free_reqs(reqs, nreqs);
295296

296297
return err;
297298
}

ompi/mca/coll/base/coll_base_barrier.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,16 @@ int ompi_coll_base_barrier_intra_basic_linear(struct ompi_communicator_t *comm,
352352

353353
else {
354354
requests = coll_base_comm_get_reqs(module->base_data, size);
355+
if( NULL == requests ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }
356+
355357
for (i = 1; i < size; ++i) {
356358
err = MCA_PML_CALL(irecv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE,
357359
MCA_COLL_BASE_TAG_BARRIER, comm,
358360
&(requests[i])));
359361
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
360362
}
361-
ompi_request_wait_all( size-1, requests+1, MPI_STATUSES_IGNORE );
363+
err = ompi_request_wait_all( size-1, requests+1, MPI_STATUSES_IGNORE );
364+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
362365
requests = NULL; /* we're done the requests array is clean */
363366

364367
for (i = 1; i < size; ++i) {
@@ -376,7 +379,7 @@ int ompi_coll_base_barrier_intra_basic_linear(struct ompi_communicator_t *comm,
376379
__FILE__, line, err, rank) );
377380
(void)line; // silence compiler warning
378381
if( NULL != requests )
379-
ompi_coll_base_free_reqs(requests, size-1);
382+
ompi_coll_base_free_reqs(requests, size);
380383
return err;
381384
}
382385
/* copied function (with appropriate renaming) ends here */

0 commit comments

Comments
 (0)