Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ endif()

if (ADIOS2_HAVE_SST)
# EVPath-enabled remote file transport
target_sources(adios2_core PRIVATE toolkit/remote/remote_common.cpp toolkit/transport/file/FileRemote.cpp)
target_sources(adios2_core PRIVATE toolkit/remote/remote_common.cpp toolkit/transport/file/FileRemote.cpp toolkit/remote/EVPathRemote.cpp)
target_link_libraries(adios2_core PRIVATE adios2::thirdparty::EVPath)
add_subdirectory(toolkit/remote)
endif()
Expand Down
23 changes: 18 additions & 5 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "BP5Reader.tcc"

#include "adios2/helper/adiosMath.h" // SetWithinLimit
#include "adios2/toolkit/remote/EVPathRemote.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include <adios2-perfstubs-interface.h>

Expand Down Expand Up @@ -283,15 +284,27 @@ void BP5Reader::PerformGets()
if (m_BP5Deserializer->PendingGetRequests.size() == 0)
return;

std::string RemoteName;
if (!m_Parameters.RemoteDataPath.empty())
{
m_Remote.Open("localhost", RemoteCommon::ServerPort, m_Parameters.RemoteDataPath,
m_OpenMode, RowMajorOrdering);
RemoteName = m_Parameters.RemoteDataPath;
}
else if (getenv("DoRemote"))
{
m_Remote.Open("localhost", RemoteCommon::ServerPort, m_Name, m_OpenMode,
RowMajorOrdering);
RemoteName = m_Name;
}
(void)RowMajorOrdering; // Use in case no remotes available
#ifdef ADIOS2_HAVE_SST
m_Remote = std::unique_ptr<EVPathRemote>(new EVPathRemote());
m_Remote->Open("localhost", EVPathRemoteCommon::ServerPort, RemoteName, m_OpenMode,
RowMajorOrdering);
#endif
if (m_Remote == nullptr)
{
helper::Throw<std::ios_base::failure>(
"Engine", "BP5Reader", "OpenFiles",
"Remote file " + m_Name +
" cannot be opened. Possible server or file specification error.");
}
if (!m_Remote)
{
Expand Down Expand Up @@ -324,7 +337,7 @@ void BP5Reader::PerformRemoteGets()
auto GetRequests = m_BP5Deserializer->PendingGetRequests;
for (auto &Req : GetRequests)
{
m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class BP5Reader : public BP5Engine, public Engine
/* transport manager for managing the active flag file */
transportman::TransportMan m_ActiveFlagFileManager;
bool m_dataIsRemote = false;
Remote m_Remote;
std::unique_ptr<Remote> m_Remote;
bool m_WriterIsActive = true;
adios2::profiling::JSONProfiler m_JSONProfiler;

Expand Down
219 changes: 219 additions & 0 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
*/
#include "EVPathRemote.h"
#include "Remote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"
#include "adios2/helper/adiosSystem.h"
#ifdef _MSC_VER
#define strdup(x) _strdup(x)
#endif

#define ThrowUp(x) \
helper::Throw<std::invalid_argument>("Core", "Engine", "ThrowUp", \
"Non-overridden function " + std::string(x) + \
" called in Remote")

namespace adios2
{

EVPathRemote::EVPathRemote() {}

#ifdef ADIOS2_HAVE_SST
EVPathRemote::~EVPathRemote()
{
if (m_conn)
CMConnection_close(m_conn);
}

void OpenResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::OpenResponseMsg open_response_msg =
static_cast<EVPathRemoteCommon::OpenResponseMsg>(vevent);

void *obj = CMCondition_get_client_data(cm, open_response_msg->OpenResponseCondition);
static_cast<EVPathRemote *>(obj)->m_ID = open_response_msg->FileHandle;
CMCondition_signal(cm, open_response_msg->OpenResponseCondition);
return;
};

void OpenSimpleResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::OpenSimpleResponseMsg open_response_msg =
static_cast<EVPathRemoteCommon::OpenSimpleResponseMsg>(vevent);

void *obj = CMCondition_get_client_data(cm, open_response_msg->OpenResponseCondition);
static_cast<EVPathRemote *>(obj)->m_ID = open_response_msg->FileHandle;
static_cast<EVPathRemote *>(obj)->m_Size = open_response_msg->FileSize;
CMCondition_signal(cm, open_response_msg->OpenResponseCondition);
return;
};

void ReadResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::ReadResponseMsg read_response_msg =
static_cast<EVPathRemoteCommon::ReadResponseMsg>(vevent);
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);
CMCondition_signal(cm, read_response_msg->ReadResponseCondition);
return;
};

CManagerSingleton &CManagerSingleton::Instance(EVPathRemoteCommon::Remote_evpath_state &ev_state)
{
std::mutex mtx;
const std::lock_guard<std::mutex> lock(mtx);
static CManagerSingleton instance;
ev_state = instance.internalEvState;
return instance;
}

void EVPathRemote::InitCMData()
{
(void)CManagerSingleton::Instance(ev_state);
static std::once_flag flag;
std::call_once(flag, [&]() {
CMregister_handler(ev_state.OpenResponseFormat, (CMHandlerFunc)OpenResponseHandler,
&ev_state);
CMregister_handler(ev_state.ReadResponseFormat, (CMHandlerFunc)ReadResponseHandler,
&ev_state);
CMregister_handler(ev_state.OpenSimpleResponseFormat,
(CMHandlerFunc)OpenSimpleResponseHandler, &ev_state);
CMregister_handler(ev_state.ReadResponseFormat, (CMHandlerFunc)ReadResponseHandler,
&ev_state);
});
}

void EVPathRemote::Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering)
{

EVPathRemoteCommon::_OpenFileMsg open_msg;
InitCMData();
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)strdup(hostname.c_str()));
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)port);
m_conn = CMinitiate_conn(ev_state.cm, contact_list);
free_attr_list(contact_list);
if (!m_conn)
return;

memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
switch (mode)
{
case Mode::Read:
open_msg.Mode = EVPathRemoteCommon::RemoteFileMode::RemoteOpen;
break;
case Mode::ReadRandomAccess:
open_msg.Mode = EVPathRemoteCommon::RemoteFileMode::RemoteOpenRandomAccess;
break;
default:
break;
}
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
open_msg.RowMajorOrder = RowMajorOrdering;
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.OpenFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
m_Active = true;
}

void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename)
{

EVPathRemoteCommon::_OpenSimpleFileMsg open_msg;
InitCMData();
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)strdup(hostname.c_str()));
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)port);
m_conn = CMinitiate_conn(ev_state.cm, contact_list);
free_attr_list(contact_list);
if (!m_conn)
return;

memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.OpenSimpleFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
m_Active = true;
}

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
EVPathRemoteCommon::_GetRequestMsg GetMsg;
memset(&GetMsg, 0, sizeof(GetMsg));
GetMsg.GetResponseCondition = CMCondition_get(ev_state.cm, m_conn);
GetMsg.FileHandle = m_ID;
GetMsg.VarName = VarName;
GetMsg.Step = Step;
GetMsg.BlockID = BlockID;
GetMsg.DimCount = (int)Count.size();
GetMsg.Count = Count.data();
GetMsg.Start = Start.data();
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
CMCondition_wait(ev_state.cm, GetMsg.GetResponseCondition);
return GetMsg.GetResponseCondition;
}

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
{
EVPathRemoteCommon::_ReadRequestMsg ReadMsg;
memset(&ReadMsg, 0, sizeof(ReadMsg));
ReadMsg.ReadResponseCondition = CMCondition_get(ev_state.cm, m_conn);
ReadMsg.FileHandle = m_ID;
ReadMsg.Offset = Start;
ReadMsg.Size = Size;
ReadMsg.Dest = Dest;
CMwrite(m_conn, ev_state.ReadRequestFormat, &ReadMsg);
CMCondition_wait(ev_state.cm, ReadMsg.ReadResponseCondition);
return ReadMsg.ReadResponseCondition;
}

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)handle);
}
#else

void EVPathRemote::Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering){};

void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename){};

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
return static_cast<GetHandle>(0);
};

bool EVPathRemote::WaitForGet(GetHandle handle) { return false; }

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
{
return static_cast<GetHandle>(0);
};
EVPathRemote::~EVPathRemote() {}
#endif

} // end namespace adios2
89 changes: 89 additions & 0 deletions source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*/

#ifndef ADIOS2_TOOLKIT_REMOTE_EVPATHREMOTE_H_
#define ADIOS2_TOOLKIT_REMOTE_EVPATHREMOTE_H_

/// \cond EXCLUDE_FROM_DOXYGEN
#include <mutex>
#include <string>
#include <vector>
/// \endcond

#include "adios2/toolkit/profiling/iochrono/IOChrono.h"

#include "Remote.h"
#include "adios2/common/ADIOSConfig.h"

#include "remote_common.h"

namespace adios2
{

class EVPathRemote : public Remote
{

public:
profiling::IOChrono m_Profiler; ///< profiles Open, Write/Read, Close

/**
* Base constructor that all derived classes pass
* @param type from derived class
* @param comm passed to m_Comm
*/
EVPathRemote();
~EVPathRemote();

explicit operator bool() const { return m_Active; }

void Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering);

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

bool WaitForGet(GetHandle handle);

GetHandle Read(size_t Start, size_t Size, void *Dest);

int64_t m_ID;

private:
#ifdef ADIOS2_HAVE_SST
void InitCMData();
EVPathRemoteCommon::Remote_evpath_state ev_state;
CMConnection m_conn = NULL;
std::mutex m_CMInitMutex;
#endif
bool m_Active = false;
};

#ifdef ADIOS2_HAVE_SST
class CManagerSingleton
{
public:
static CManagerSingleton &Instance(EVPathRemoteCommon::Remote_evpath_state &ev_state);

private:
CManager m_cm = NULL;
EVPathRemoteCommon::Remote_evpath_state internalEvState;
CManagerSingleton()
{
m_cm = CManager_create();
internalEvState.cm = m_cm;
RegisterFormats(internalEvState);
CMfork_comm_thread(internalEvState.cm);
}

~CManagerSingleton() { CManager_close(m_cm); }
};
#endif

} // end namespace adios2

#endif /* ADIOS2_TOOLKIT_EVPATHREMOTE_REMOTE_H_ */
Loading