Skip to content

Commit c77df61

Browse files
authored
Merge pull request #3588 from vicentebolea/fix-mpi-dp
Fix MPI Data plane cohort handling
2 parents 2054006 + 4e2f6eb commit c77df61

File tree

1 file changed

+67
-17
lines changed

1 file changed

+67
-17
lines changed

source/adios2/toolkit/sst/dp/mpi_dp.c

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include <mpi.h>
3232

33+
#include <stdint.h>
3334
#include <stdio.h>
3435
#include <stdlib.h>
3536
#include <string.h>
@@ -54,15 +55,15 @@ typedef struct _MpiWriterContactInfo
5455
{
5556
char ContactString[MPI_DP_CONTACT_STRING_LEN];
5657
void *StreamWPR;
57-
int PID;
58+
long taskID;
5859
} *MpiWriterContactInfo;
5960

6061
/* Base Stream class, used implicitly */
6162
typedef struct _MpiStream
6263
{
6364
void *CP_Stream;
6465
int Rank;
65-
int PID;
66+
long taskID;
6667
} MpiStream;
6768

6869
/* Link Stream class, used implicitly */
@@ -218,7 +219,7 @@ static FMField MpiWriterContactList[] = {
218219
{"ContactString", "char[" MACRO_TO_STR(MPI_DP_CONTACT_STRING_LEN) "]", sizeof(char),
219220
FMOffset(MpiWriterContactInfo, ContactString)},
220221
{"writer_ID", "integer", sizeof(void *), FMOffset(MpiWriterContactInfo, StreamWPR)},
221-
{"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)},
222+
{"taskID", "integer", sizeof(long), FMOffset(MpiWriterContactInfo, taskID)},
222223
{NULL, NULL, 0, 0}};
223224

224225
static FMStructDescRec MpiWriterContactStructs[] = {
@@ -227,6 +228,16 @@ static FMStructDescRec MpiWriterContactStructs[] = {
227228

228229
/*****Internal functions*****************************************************/
229230

231+
/**
232+
* Return an unique process ID (Task ID) for the current process. We do this by
233+
* combining the PID of the process and the hostid (as return the same output
234+
* as `hostid` or the content of /etc/machine-id in modern UNIX-like systems).
235+
*/
236+
static uint64_t GetUniqueTaskId()
237+
{
238+
return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid();
239+
}
240+
230241
static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data,
231242
attr_list attrs);
232243

@@ -242,9 +253,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v
242253
* the reader side. It should do whatever is necessary to initialize a new
243254
* reader-side data plane. A pointer to per-reader-rank contact information
244255
* should be placed in *ReaderContactInfoPtr. The structure of that
245-
* information should be described by DPInterface.ReaderContactFormats. (This
246-
* is an FFS format description. See
247-
* https://www.cc.gatech.edu/systems/projects/FFS/.)
256+
* information should be described by DPInterface.ReaderContactFormats.
248257
*/
249258
static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr,
250259
struct _SstParams *Params, attr_list WriterContact,
@@ -256,7 +265,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **Read
256265
CMFormat F;
257266

258267
Stream->Stream.CP_Stream = CP_Stream;
259-
Stream->Stream.PID = getpid();
268+
Stream->Stream.taskID = GetUniqueTaskId();
260269
Stream->Link.Stats = Stats;
261270

262271
SMPI_Comm_rank(comm, &Stream->Stream.Rank);
@@ -305,7 +314,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst
305314
SMPI_Comm_rank(comm, &Stream->Stream.Rank);
306315

307316
Stream->Stream.CP_Stream = CP_Stream;
308-
Stream->Stream.PID = getpid();
317+
Stream->Stream.taskID = GetUniqueTaskId();
309318
STAILQ_INIT(&Stream->TimeSteps);
310319
TAILQ_INIT(&Stream->Readers);
311320

@@ -329,8 +338,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst
329338
* on the connecting peer in InitReader) and should create its own
330339
* per-writer-rank contact information and place it in *writerContactInfoPtr.
331340
* The structure of that information should be described by
332-
* DPInterface.WriterContactFormats. (This is an FFS format description. See
333-
* https://www.cc.gatech.edu/systems/projects/FFS/.)
341+
* DPInterface.WriterContactFormats.
334342
*/
335343
static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
336344
int readerCohortSize, CP_PeerCohort PeerCohort,
@@ -372,7 +380,7 @@ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_St
372380
"Writer Rank %d, test contact", Rank);
373381

374382
StreamWPR->MyContactInfo.StreamWPR = StreamWPR;
375-
StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID;
383+
StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID;
376384
*WriterContactInfoPtr = &StreamWPR->MyContactInfo;
377385

378386
return StreamWPR;
@@ -474,7 +482,7 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, int Ra
474482
ret->cm = cm;
475483
ret->CPStream = Stream->Stream.CP_Stream;
476484
ret->DestinationRank = Rank;
477-
ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL : MPI_DP_REMOTE;
485+
ret->CommType = (TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL : MPI_DP_REMOTE;
478486

479487
if (ret->CommType == MPI_DP_REMOTE)
480488
{
@@ -541,7 +549,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
541549
}
542550
else
543551
{
544-
Svcs->verbose(Handle->CPStream, DPTraceVerbose,
552+
Svcs->verbose(Handle->CPStream, DPCriticalVerbose,
545553
"Remote memory read to rank %d with condition %d has FAILED"
546554
"because of "
547555
"writer failure\n",
@@ -580,7 +588,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v
580588
if (!RequestedData)
581589
{
582590
PERFSTUBS_TIMER_STOP_FUNC(timer);
583-
Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
591+
Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
584592
"Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep);
585593
return;
586594
}
@@ -799,11 +807,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int Fa
799807
FailedPeerRank);
800808
}
801809

810+
/** MpiDisconnectWriterPerReader.
811+
*
812+
* This is called whenever a reader disconnect from a writer. This function
813+
* simply disconnect the mpi communicator, it does not frees any data
814+
* structure. We must do it in this way since:
815+
*
816+
* - There is the possibility of the failed peer to re-enter in the network.
817+
* - We must disconnect the MPI port for that particular mpi reader task since
818+
* otherwise it the reader task might hung in mpi_finalize, in the case the
819+
* the failure leads to a application graceful exit.
820+
*/
821+
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
822+
{
823+
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
824+
MpiStreamWR StreamWR = StreamWPR->StreamWR;
825+
826+
const int CohortSize = StreamWPR->Link.CohortSize;
827+
828+
Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
829+
"MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
830+
StreamWR->Stream.Rank);
831+
832+
for (int i = 0; i < CohortSize; i++)
833+
{
834+
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
835+
{
836+
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
837+
}
838+
}
839+
}
840+
802841
/**
803842
* MpiDestroyWriterPerReader.
804843
*
805-
* This is called whenever a reader disconnect from a writer. This function
806-
* also removes the StreamWPR from its own StreamWR.
844+
* This is called by the MpiDestroyWriter function. This function will free any resource
845+
* allocated to the particulare WriterPerReader instance (StreamWPR).
807846
*/
808847
static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
809848
{
@@ -812,6 +851,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream
812851

813852
const int CohortSize = StreamWPR->Link.CohortSize;
814853

854+
Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
855+
"MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
856+
StreamWR->Stream.Rank);
857+
815858
for (int i = 0; i < CohortSize; i++)
816859
{
817860
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
@@ -837,6 +880,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
837880
{
838881
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;
839882

883+
Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
884+
"MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);
885+
840886
pthread_mutex_lock(&StreamWR->MutexReaders);
841887
while (!TAILQ_EMPTY(&StreamWR->Readers))
842888
{
@@ -866,6 +912,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
866912
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
867913
{
868914
MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;
915+
916+
Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
917+
"MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);
918+
869919
const int CohortSize = StreamRS->Link.CohortSize;
870920

871921
for (int i = 0; i < CohortSize; i++)
@@ -896,7 +946,7 @@ extern CP_DP_Interface LoadMpiDP()
896946
.getPriority = MpiGetPriority,
897947
.destroyReader = MpiDestroyReader,
898948
.destroyWriter = MpiDestroyWriter,
899-
.destroyWriterPerReader = MpiDestroyWriterPerReader,
949+
.destroyWriterPerReader = MpiDisconnectWriterPerReader,
900950
.notifyConnFailure = MpiNotifyConnFailure,
901951
};
902952

0 commit comments

Comments
 (0)