Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 70 additions & 19 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <mpi.h>

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -54,15 +55,15 @@ typedef struct _MpiWriterContactInfo
{
char ContactString[MPI_DP_CONTACT_STRING_LEN];
void *StreamWPR;
int PID;
long taskID;
} * MpiWriterContactInfo;

/* Base Stream class, used implicitly */
typedef struct _MpiStream
{
void *CP_Stream;
int Rank;
int PID;
long taskID;
} MpiStream;

/* Link Stream class, used implicitly */
Expand Down Expand Up @@ -231,7 +232,8 @@ static FMField MpiWriterContactList[] = {
sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)},
{"writer_ID", "integer", sizeof(void *),
FMOffset(MpiWriterContactInfo, StreamWPR)},
{"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)},
{"taskID", "integer", sizeof(long),
FMOffset(MpiWriterContactInfo, taskID)},
{NULL, NULL, 0, 0}};

static FMStructDescRec MpiWriterContactStructs[] = {
Expand All @@ -241,6 +243,16 @@ static FMStructDescRec MpiWriterContactStructs[] = {

/*****Internal functions*****************************************************/

/**
* Return an unique process ID (Task ID) for the current process. We do this by
* combining the PID of the process and the hostid (as return the same output
* as `hostid` or the content of /etc/machine-id in modern UNIX-like systems).
*/
static uint64_t GetUniqueTaskId()
{
return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid();
}

static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs);

Expand All @@ -256,9 +268,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
* the reader side. It should do whatever is necessary to initialize a new
* reader-side data plane. A pointer to per-reader-rank contact information
* should be placed in *ReaderContactInfoPtr. The structure of that
* information should be described by DPInterface.ReaderContactFormats. (This
* is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* information should be described by DPInterface.ReaderContactFormats.
*/
static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
void **ReaderContactInfoPtr,
Expand All @@ -271,7 +281,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
CMFormat F;

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
Stream->Link.Stats = Stats;

SMPI_Comm_rank(comm, &Stream->Stream.Rank);
Expand Down Expand Up @@ -322,7 +332,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
SMPI_Comm_rank(comm, &Stream->Stream.Rank);

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
STAILQ_INIT(&Stream->TimeSteps);
TAILQ_INIT(&Stream->Readers);

Expand All @@ -347,8 +357,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
* on the connecting peer in InitReader) and should create its own
* per-writer-rank contact information and place it in *writerContactInfoPtr.
* The structure of that information should be described by
* DPInterface.WriterContactFormats. (This is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* DPInterface.WriterContactFormats.
*/
static DP_WSR_Stream
MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
Expand Down Expand Up @@ -392,7 +401,7 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
"Writer Rank %d, test contact", Rank);

StreamWPR->MyContactInfo.StreamWPR = StreamWPR;
StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID;
StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID;
*WriterContactInfoPtr = &StreamWPR->MyContactInfo;

return StreamWPR;
Expand Down Expand Up @@ -503,9 +512,9 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
ret->cm = cm;
ret->CPStream = Stream->Stream.CP_Stream;
ret->DestinationRank = Rank;
ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL
: MPI_DP_REMOTE;

ret->CommType =
(TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL
: MPI_DP_REMOTE;
if (ret->CommType == MPI_DP_REMOTE)
{
CMCondition_set_client_data(cm, ReadRequestMsg.NotifyCondition, ret);
Expand Down Expand Up @@ -576,7 +585,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
else
{
Svcs->verbose(
Handle->CPStream, DPTraceVerbose,
Handle->CPStream, DPCriticalVerbose,
"Remote memory read to rank %d with condition %d has FAILED"
"because of "
"writer failure\n",
Expand Down Expand Up @@ -615,7 +624,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
if (!RequestedData)
{
PERFSTUBS_TIMER_STOP_FUNC(timer);
Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
"Failed to read TimeStep %ld, not found\n",
ReadRequestMsg->TimeStep);
return;
Expand Down Expand Up @@ -850,11 +859,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v,
FailedPeerRank);
}

/** MpiDisconnectWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* simply disconnect the mpi communicator, it does not frees any data
* structure. We must do it in this way since:
*
* - There is the possibility of the failed peer to re-enter in the network.
* - We must disconnect the MPI port for that particular mpi reader task since
* otherwise it the reader task might hung in mpi_finalize, in the case the
* the failure leads to a application graceful exit.
*/
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
}
}

/**
* MpiDestroyWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* also removes the StreamWPR from its own StreamWR.
* This is called by the MpiDestroyWriter function. This function will free any resource
* allocated to the particulare WriterPerReader instance (StreamWPR).
*/
static void MpiDestroyWriterPerReader(CP_Services Svcs,
DP_WSR_Stream WSR_Stream_v)
Expand All @@ -864,6 +904,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
Expand All @@ -889,6 +933,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);

pthread_mutex_lock(&StreamWR->MutexReaders);
while (!TAILQ_EMPTY(&StreamWR->Readers))
{
Expand Down Expand Up @@ -918,6 +965,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;

Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);

const int CohortSize = StreamRS->Link.CohortSize;

for (int i = 0; i < CohortSize; i++)
Expand Down Expand Up @@ -948,7 +999,7 @@ extern CP_DP_Interface LoadMpiDP()
.getPriority = MpiGetPriority,
.destroyReader = MpiDestroyReader,
.destroyWriter = MpiDestroyWriter,
.destroyWriterPerReader = MpiDestroyWriterPerReader,
.destroyWriterPerReader = MpiDisconnectWriterPerReader,
.notifyConnFailure = MpiNotifyConnFailure,
};

Expand Down