Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
#define strdup(x) _strdup(x)
#endif

#define ThrowUp(x) \
helper::Throw<std::invalid_argument>("Core", "Engine", "ThrowUp", \
"Non-overridden function " + std::string(x) + \
" called in Remote")

namespace adios2
{

Expand Down Expand Up @@ -57,10 +52,27 @@ void OpenSimpleResponseHandler(CManager cm, CMConnection conn, void *vevent, voi
void *obj = CMCondition_get_client_data(cm, open_response_msg->OpenResponseCondition);
static_cast<EVPathRemote *>(obj)->m_ID = open_response_msg->FileHandle;
static_cast<EVPathRemote *>(obj)->m_Size = open_response_msg->FileSize;
std::vector<char> *Tmp = static_cast<EVPathRemote *>(obj)->m_TmpContentVector;
if (Tmp && open_response_msg->FileContents)
{
Tmp->resize(open_response_msg->FileSize);
memcpy(Tmp->data(), open_response_msg->FileContents, open_response_msg->FileSize);
}

CMCondition_signal(cm, open_response_msg->OpenResponseCondition);
return;
};

void CloseResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::CloseFileResponseMsg close_response_msg =
static_cast<EVPathRemoteCommon::CloseFileResponseMsg>(vevent);

CMCondition_signal(cm, close_response_msg->CloseResponseCondition);
return;
};

void ReadResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
Expand Down Expand Up @@ -118,6 +130,8 @@ void EVPathRemote::InitCMData()
(CMHandlerFunc)OpenSimpleResponseHandler, &ev_state);
CMregister_handler(ev_state.ReadResponseFormat, (CMHandlerFunc)ReadResponseHandler,
&ev_state);
CMregister_handler(ev_state.CloseResponseFormat, (CMHandlerFunc)CloseResponseHandler,
&ev_state);
});
}

Expand Down Expand Up @@ -187,17 +201,67 @@ void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port
memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
open_msg.ReadContents = 0;
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.OpenSimpleFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
m_Active = true;
}

void EVPathRemote::OpenReadSimpleFile(const std::string hostname, const int32_t port,
const std::string filename, std::vector<char> &contents)
{

EVPathRemoteCommon::_OpenSimpleFileMsg open_msg;
InitCMData();
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)strdup(hostname.c_str()));
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)port);
m_conn = CMinitiate_conn(ev_state.cm, contact_list);
free_attr_list(contact_list);
if (!m_conn)
return;

memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
open_msg.ReadContents = 1;
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
m_TmpContentVector = &contents; // this will be accessed in the handler
CMwrite(m_conn, ev_state.OpenSimpleFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
// file does not remain open after OpenReadSimpleFile
m_TmpContentVector = nullptr;
m_Active = false;
}

void EVPathRemote::Close()
{

EVPathRemoteCommon::_CloseFileMsg CloseMsg;
memset(&CloseMsg, 0, sizeof(CloseMsg));
CloseMsg.CloseResponseCondition = CMCondition_get(ev_state.cm, m_conn);
CloseMsg.FileHandle = m_ID;
CMCondition_set_client_data(ev_state.cm, CloseMsg.CloseResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.CloseFileFormat, &CloseMsg);
CMCondition_wait(ev_state.cm, CloseMsg.CloseResponseCondition);
m_Active = false;
m_ID = 0;
}

EVPathRemote::GetHandle EVPathRemote::Get(const char *VarName, size_t Step, size_t StepCount,
size_t BlockID, Dims &Count, Dims &Start,
Accuracy &accuracy, void *dest)
{
EVPathRemoteCommon::_GetRequestMsg GetMsg;
if (!m_Active)
helper::Throw<std::invalid_argument>("Remote", "EVPathRemoteFile", "FileNotOpen",
"Attempted a Get on an unopened file\n");

memset(&GetMsg, 0, sizeof(GetMsg));
GetMsg.GetResponseCondition = CMCondition_get(ev_state.cm, m_conn);
GetMsg.FileHandle = m_ID;
Expand All @@ -219,6 +283,9 @@ EVPathRemote::GetHandle EVPathRemote::Get(const char *VarName, size_t Step, size
EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
{
EVPathRemoteCommon::_ReadRequestMsg ReadMsg;
if (!m_Active)
helper::Throw<std::invalid_argument>("Remote", "EVPathRemoteFile", "FileNotOpen",
"Attempted a Read on an unopened file\n");
memset(&ReadMsg, 0, sizeof(ReadMsg));
ReadMsg.ReadResponseCondition = CMCondition_get(ev_state.cm, m_conn);
ReadMsg.FileHandle = m_ID;
Expand Down
32 changes: 32 additions & 0 deletions source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,52 @@ class EVPathRemote : public Remote

explicit operator bool() const { return m_Active; }

/*
* Open() and OpenSimpleFile() are synchronous calls that
* internally return a unique ID of an open file on the server.
* Barring an explicit Close() operation, that file will remain
* open until the connection between the client and server closes.
* Note that because of connection sharing and other network-level
* considerations, this might not happen upon destruction of the
* EVPathRemote object.
*/
void Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering);

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

/*
* OpenReadSimpleFile() is a synchronous call that returns the
* full contents of a remote simple file as a char array. It does
* this with a single round-trip to the server and does not leave
* an open file on the server.
*/
void OpenReadSimpleFile(const std::string hostname, const int32_t port,
const std::string filename, std::vector<char> &contents);

GetHandle Get(const char *VarName, size_t Step, size_t StepCount, size_t BlockID, Dims &Count,
Dims &Start, Accuracy &accuracy, void *dest);

bool WaitForGet(GetHandle handle);

GetHandle Read(size_t Start, size_t Size, void *Dest);

/*
* EVPathRemote::Close is an active synchronous operation that
* involves a round-trip to the server, waiting for a response to
* ensure that the file is closed on the server. This should
* likely not be performed in any destructors because of the
* delays involved. However server files are also closed after
* the network connection from which they were opened goes away.
* This is a passive asynchronous operation that will happen
* sometime after the EVPathRemote object is destroyed.
*/
void Close();

int64_t m_ID;

std::vector<char> *m_TmpContentVector;

private:
#ifdef ADIOS2_HAVE_SST
void InitCMData();
Expand Down
9 changes: 9 additions & 0 deletions source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ void Remote::OpenSimpleFile(const std::string hostname, const int32_t port,
ThrowUp("RemoteSimpleOpen");
};

void Remote::OpenReadSimpleFile(const std::string hostname, const int32_t port,
const std::string filename, std::vector<char> &contents)
{
ThrowUp("RemoteSimpleOpenRead");
};

Remote::GetHandle Remote::Get(const char *VarName, size_t Step, size_t StepCount, size_t BlockID,
Dims &Count, Dims &Start, Accuracy &accuracy, void *dest)
{
Expand All @@ -53,6 +59,9 @@ Remote::GetHandle Remote::Read(size_t Start, size_t Size, void *Dest)
ThrowUp("RemoteRead");
return (Remote::GetHandle)0;
};

void Remote::Close() { ThrowUp("RemoteClose"); };

Remote::~Remote() {}
Remote::Remote(const adios2::HostOptions &hostOptions)
: m_HostOptions(std::make_shared<adios2::HostOptions>(hostOptions))
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class Remote
virtual void OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename);

virtual void OpenReadSimpleFile(const std::string hostname, const int32_t port,
const std::string filename, std::vector<char> &contents);

typedef void *GetHandle;

virtual GetHandle Get(const char *VarName, size_t Step, size_t StepCount, size_t BlockID,
Expand All @@ -50,6 +53,8 @@ class Remote

virtual GetHandle Read(size_t Start, size_t Size, void *Dest);

virtual void Close();

size_t m_Size;

private:
Expand Down
28 changes: 21 additions & 7 deletions source/adios2/toolkit/remote/remote_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ namespace EVPathRemoteCommon
{

FMField OpenFileList[] = {
{"OpenResponseCondition", "integer", sizeof(long),
FMOffset(OpenFileMsg, OpenResponseCondition)},
{"OpenResponseCondition", "integer", sizeof(int), FMOffset(OpenFileMsg, OpenResponseCondition)},
{"FileName", "string", sizeof(char *), FMOffset(OpenFileMsg, FileName)},
{"Mode", "integer", sizeof(RemoteFileMode), FMOffset(OpenFileMsg, Mode)},
{"RowMajorOrder", "integer", sizeof(int), FMOffset(OpenFileMsg, RowMajorOrder)},
Expand All @@ -20,17 +19,18 @@ FMStructDescRec OpenFileStructs[] = {{"OpenFile", OpenFileList, sizeof(struct _O
{NULL, NULL, 0, NULL}};

FMField OpenSimpleFileList[] = {
{"OpenResponseCondition", "integer", sizeof(long),
{"OpenResponseCondition", "integer", sizeof(int),
FMOffset(OpenSimpleFileMsg, OpenResponseCondition)},
{"FileName", "string", sizeof(char *), FMOffset(OpenSimpleFileMsg, FileName)},
{"ReadContents(0)", "integer", sizeof(long), FMOffset(OpenSimpleFileMsg, ReadContents)},
{NULL, NULL, 0, 0}};

FMStructDescRec OpenSimpleFileStructs[] = {
{"OpenSimpleFile", OpenSimpleFileList, sizeof(struct _OpenSimpleFileMsg), NULL},
{NULL, NULL, 0, NULL}};

FMField OpenResponseList[] = {
{"OpenResponseCondition", "integer", sizeof(long),
{"OpenResponseCondition", "integer", sizeof(int),
FMOffset(OpenResponseMsg, OpenResponseCondition)},
{"FileHandle", "integer", sizeof(intptr_t), FMOffset(OpenResponseMsg, FileHandle)},
{NULL, NULL, 0, 0}};
Expand All @@ -40,7 +40,7 @@ FMStructDescRec OpenResponseStructs[] = {
{NULL, NULL, 0, NULL}};

FMField OpenSimpleResponseList[] = {
{"OpenResponseCondition", "integer", sizeof(long),
{"OpenResponseCondition", "integer", sizeof(int),
FMOffset(OpenSimpleResponseMsg, OpenResponseCondition)},
{"FileHandle", "integer", sizeof(intptr_t), FMOffset(OpenSimpleResponseMsg, FileHandle)},
{"FileSize", "integer", sizeof(size_t), FMOffset(OpenSimpleResponseMsg, FileSize)},
Expand Down Expand Up @@ -72,7 +72,7 @@ FMStructDescRec GetRequestStructs[] = {{"Get", GetRequestList, sizeof(struct _Ge
{NULL, NULL, 0, NULL}};

FMField ReadRequestList[] = {
{"ReadResponseCondition", "integer", sizeof(long),
{"ReadResponseCondition", "integer", sizeof(int),
FMOffset(ReadRequestMsg, ReadResponseCondition)},
{"FileHandle", "integer", sizeof(intptr_t), FMOffset(ReadRequestMsg, FileHandle)},
{"Offset", "integer", sizeof(size_t), FMOffset(ReadRequestMsg, Offset)},
Expand All @@ -84,7 +84,7 @@ FMStructDescRec ReadRequestStructs[] = {
{"Read", ReadRequestList, sizeof(struct _ReadRequestMsg), NULL}, {NULL, NULL, 0, NULL}};

FMField ReadResponseList[] = {
{"ReadResponseCondition", "integer", sizeof(long),
{"ReadResponseCondition", "integer", sizeof(int),
FMOffset(ReadResponseMsg, ReadResponseCondition)},
{"Dest", "integer", sizeof(void *), FMOffset(ReadResponseMsg, Dest)},
{"OperatorType", "integer", sizeof(uint8_t), FMOffset(ReadResponseMsg, OperatorType)},
Expand All @@ -98,11 +98,23 @@ FMStructDescRec ReadResponseStructs[] = {

FMField CloseFileList[] = {
{"FileHandle", "integer", sizeof(intptr_t), FMOffset(CloseFileMsg, FileHandle)},
{"CloseResponseCondition", "integer", sizeof(int),
FMOffset(CloseFileMsg, CloseResponseCondition)},
{NULL, NULL, 0, 0}};

FMStructDescRec CloseFileStructs[] = {{"Close", CloseFileList, sizeof(struct _CloseFileMsg), NULL},
{NULL, NULL, 0, NULL}};

FMField CloseResponseList[] = {
{"CloseResponseCondition", "integer", sizeof(int),
FMOffset(CloseFileResponseMsg, CloseResponseCondition)},
{"Status", "integer", sizeof(intptr_t), FMOffset(CloseFileResponseMsg, Status)},
{NULL, NULL, 0, 0}};

FMStructDescRec CloseResponseStructs[] = {
{"CloseResponse", CloseResponseList, sizeof(struct _CloseFileResponseMsg), NULL},
{NULL, NULL, 0, NULL}};

FMField KillServerList[] = {{"KillResponseCondition", "integer", sizeof(long),
FMOffset(KillServerMsg, KillResponseCondition)},
{NULL, NULL, 0, 0}};
Expand Down Expand Up @@ -155,6 +167,8 @@ void RegisterFormats(EVPathRemoteCommon::Remote_evpath_state &ev_state)
ev_state.ReadResponseFormat =
CMregister_format(ev_state.cm, EVPathRemoteCommon::ReadResponseStructs);
ev_state.CloseFileFormat = CMregister_format(ev_state.cm, EVPathRemoteCommon::CloseFileStructs);
ev_state.CloseResponseFormat =
CMregister_format(ev_state.cm, EVPathRemoteCommon::CloseResponseStructs);
ev_state.KillServerFormat =
CMregister_format(ev_state.cm, EVPathRemoteCommon::KillServerStructs);
ev_state.KillResponseFormat =
Expand Down
13 changes: 12 additions & 1 deletion source/adios2/toolkit/remote/remote_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct _OpenSimpleFileMsg
{
int OpenResponseCondition;
char *FileName;
int ReadContents;
} *OpenSimpleFileMsg;

typedef struct _OpenSimpleResponseMsg
Expand Down Expand Up @@ -94,9 +95,18 @@ typedef struct _ReadResponseMsg
*/
typedef struct _CloseFileMsg
{
void *FileHandle;
int CloseResponseCondition;
int64_t FileHandle;
} *CloseFileMsg;

/*
*/
typedef struct _CloseFileResponseMsg
{
int CloseResponseCondition;
int Status;
} *CloseFileResponseMsg;

typedef struct _KillServerMsg
{
int KillResponseCondition;
Expand Down Expand Up @@ -145,6 +155,7 @@ struct Remote_evpath_state
CMFormat ReadRequestFormat;
CMFormat ReadResponseFormat;
CMFormat CloseFileFormat;
CMFormat CloseResponseFormat;
CMFormat KillServerFormat;
CMFormat KillResponseFormat;
CMFormat StatusServerFormat;
Expand Down
Loading
Loading