Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,12 @@ class Engine
void Put(Variable<T> variable, U const &data, const Mode launch = Mode::Deferred)
{
auto bufferView = static_cast<AdiosView<U>>(data);
#ifdef ADIOS2_HAVE_GPU_SUPPORT
auto bufferMem = bufferView.memory_space();
#ifdef ADIOS2_HAVE_GPU_SUPPORT
auto variableMem = variable.GetMemorySpace();
CheckMemorySpace(variableMem, bufferMem);
#endif
variable.SetMemorySpace(bufferMem);
Put(variable, bufferView.data(), launch);
}

Expand Down
3 changes: 3 additions & 0 deletions examples/hello/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ if(ADIOS2_HAVE_Kokkos)
if(ADIOS2_HAVE_SST)
add_subdirectory(sstKokkos)
endif()
if(ADIOS2_HAVE_DataMan)
add_subdirectory(datamanKokkos)
endif()
endif()

add_subdirectory(bpThreadWrite)
Expand Down
33 changes: 33 additions & 0 deletions examples/hello/datamanKokkos/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#------------------------------------------------------------------------------#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#------------------------------------------------------------------------------#

cmake_minimum_required(VERSION 3.12)
project(ADIOS2HelloDataManKokkosExample)

if(NOT TARGET adios2_core)
set(_components CXX)

find_package(MPI COMPONENTS C)
if(MPI_FOUND)
# Workaround for various MPI implementations forcing the link of C++ bindings
add_definitions(-DOMPI_SKIP_MPICXX -DMPICH_SKIP_MPICXX)

list(APPEND _components MPI)
endif()

find_package(ZeroMQ 4.1 QUIET)

find_package(ADIOS2 REQUIRED COMPONENTS ${_components})
endif()

if(ADIOS2_HAVE_MPI AND ADIOS2_HAVE_DataMan)
add_executable(adios2_hello_datamanWriterKokkos dataManWriterKokkos.cpp)
target_link_libraries(adios2_hello_datamanWriterKokkos adios2::cxx11_mpi MPI::MPI_C Kokkos::kokkos)
install(TARGETS adios2_hello_datamanWriterKokkos RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})

add_executable(adios2_hello_datamanReaderKokkos dataManReaderKokkos.cpp)
target_link_libraries(adios2_hello_datamanReaderKokkos adios2::cxx11_mpi MPI::MPI_C Kokkos::kokkos)
install(TARGETS adios2_hello_datamanReaderKokkos RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()
76 changes: 76 additions & 0 deletions examples/hello/datamanKokkos/dataManReaderKokkos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <adios2.h>
#include <chrono>
#include <iostream>
#include <mpi.h>
#include <numeric>
#include <thread>
#include <vector>

#include <adios2/cxx11/KokkosView.h>

#include <Kokkos_Core.hpp>

int mpiRank, mpiSize;

template <class T, class MemSpace>
void PrintData(Kokkos::View<T *, MemSpace> &gpuData, const size_t step)
{
auto data = Kokkos::create_mirror_view_and_copy(Kokkos::HostSpace{}, gpuData);
std::cout << "Rank: " << mpiRank << " Step: " << step << " [";
for (int i = 0; i < data.extent_int(0); ++i)
{
std::cout << data(i) << " ";
}
std::cout << "]" << std::endl;
}

int main(int argc, char *argv[])
{
// initialize MPI
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);

// initialize adios2
adios2::ADIOS adios(MPI_COMM_WORLD);
adios2::IO dataManIO = adios.DeclareIO("whatever");
dataManIO.SetEngine("DataMan");
dataManIO.SetParameters({{"IPAddress", "127.0.0.1"}, {"Port", "12306"}, {"Timeout", "5"}});

// open stream
adios2::Engine dataManReader = dataManIO.Open("HelloDataMan", adios2::Mode::Read);

// define variable
adios2::Variable<float> floatArrayVar;

Kokkos::DefaultExecutionSpace exe_space;
std::cout << "Read on memory space: " << exe_space.name() << std::endl;
// read data
while (true)
{
auto status = dataManReader.BeginStep();
if (status == adios2::StepStatus::OK)
{
floatArrayVar = dataManIO.InquireVariable<float>("FloatArray");
auto shape = floatArrayVar.Shape();
size_t datasize =
std::accumulate(shape.begin(), shape.end(), 1, std::multiplies<size_t>());
Kokkos::View<float *, Kokkos::DefaultExecutionSpace::memory_space> floatVector(
"simBuffer", datasize);
dataManReader.Get<float>(floatArrayVar, floatVector, adios2::Mode::Sync);
dataManReader.EndStep();
PrintData(floatVector, dataManReader.CurrentStep());
}
else if (status == adios2::StepStatus::EndOfStream)
{
std::cout << "End of stream" << std::endl;
break;
}
}

// clean up
dataManReader.Close();
MPI_Finalize();

return 0;
}
97 changes: 97 additions & 0 deletions examples/hello/datamanKokkos/dataManWriterKokkos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* datamanWriterKokkos.cpp Simple example of writing multiple steps of a 2D float Kokkos::View
* through ADIOS2 DataMan
*/
#include <adios2.h>
#include <adios2/cxx11/KokkosView.h>
#include <iostream>
#include <mpi.h>
#include <numeric>
#include <thread>
#include <vector>

#include <Kokkos_Core.hpp>

size_t Nx = 10;
size_t Ny = 10;
size_t steps = 2;
adios2::Dims shape;
adios2::Dims start;
adios2::Dims count;

int mpiRank, mpiSize;

template <class T, class MemSpace>
void PrintData(Kokkos::View<T **, MemSpace> &gpuData, const size_t step)
{
auto data = Kokkos::create_mirror_view_and_copy(Kokkos::HostSpace{}, gpuData);
std::cout << "Rank: " << mpiRank << " Step: " << step << " [";
for (int i = 0; i < data.extent_int(0); ++i)
for (int j = 0; j < data.extent_int(1); ++j)
std::cout << data(i, j) << " ";
std::cout << "]" << std::endl;
}

template <class T, class MemSpace, class ExecSpace>
Kokkos::View<T **, MemSpace> GenerateData(const size_t step, const size_t Ny, const size_t mpiRank)
{
Kokkos::View<T **, MemSpace> gpuSimData("simBuffer", Nx, Ny);
static_assert(Kokkos::SpaceAccessibility<ExecSpace, MemSpace>::accessible, "");
Kokkos::parallel_for(
"initBuffer", Kokkos::RangePolicy<ExecSpace>(0, Nx), KOKKOS_LAMBDA(int i) {
for (int j = 0; j < Ny; j++)
gpuSimData(i, j) = static_cast<float>(i * Ny + j) + mpiRank * 10000 + step;
});
Kokkos::fence();
ExecSpace exe_space;
std::cout << "Create data for step " << step << " on memory space: " << exe_space.name()
<< std::endl;
return gpuSimData;
}

int main(int argc, char *argv[])
{
// initialize MPI
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);

// initialize data dimensions
count = {Nx, Ny};
start = {mpiRank * Nx, 0};
shape = {mpiSize * Nx, Ny};

// initialize adios2
adios2::ADIOS adios(MPI_COMM_WORLD);
adios2::IO dataManIO = adios.DeclareIO("whatever");
dataManIO.SetEngine("DataMan");
dataManIO.SetParameters({{"IPAddress", "127.0.0.1"},
{"Port", "12306"},
{"Timeout", "5"},
{"RendezvousReaderCount", "1"}});

// open stream
adios2::Engine dataManWriter = dataManIO.Open("HelloDataMan", adios2::Mode::Write);

// define variable
auto floatArrayVar = dataManIO.DefineVariable<float>("FloatArray", shape, start, count);

// write data
for (size_t i = 0; i < steps; ++i)
{
auto floatVector = GenerateData<float, Kokkos::DefaultExecutionSpace::memory_space,
Kokkos::DefaultExecutionSpace>(i, Ny, mpiRank);
dataManWriter.BeginStep();
dataManWriter.Put(floatArrayVar, floatVector, adios2::Mode::Sync);
PrintData(floatVector, dataManWriter.CurrentStep());
dataManWriter.EndStep();
}

dataManWriter.Close();
MPI_Finalize();

return 0;
}
8 changes: 4 additions & 4 deletions source/adios2/engine/dataman/DataManReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
while (true)
{
int ret =
m_Serializer.GetData(data, variable.m_Name, variable.m_Start, variable.m_Count,
m_CurrentStep, variable.m_MemoryStart, variable.m_MemoryCount);
int ret = m_Serializer.GetData(data, variable.m_Name, variable.m_Start,
variable.m_Count, m_CurrentStep, variable.m_MemSpace,
variable.m_MemoryStart, variable.m_MemoryCount);
if (ret == 0)
{
break;
Expand All @@ -57,7 +57,7 @@ void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
while (true)
{
int ret = m_Serializer.GetData(data, variable.m_Name, start, count, m_CurrentStep,
memstart, memcount);
variable.m_MemSpace, memstart, memcount);
if (ret == 0)
{
break;
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/dataman/DataManWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ void DataManWriter::PutDeferredCommon(Variable<T> &variable, const T *values)
std::reverse(memstart.begin(), memstart.end());
std::reverse(memcount.begin(), memcount.end());
m_Serializer.PutData(variable.m_Data, variable.m_Name, shape, start, count, memstart,
memcount, m_Name, CurrentStep(), m_MpiRank, "", variable.m_Operations);
memcount, variable.m_MemSpace, m_Name, CurrentStep(), m_MpiRank, "",
variable.m_Operations);
}

if (m_MonitorActive)
Expand Down
17 changes: 12 additions & 5 deletions source/adios2/toolkit/format/dataman/DataManSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ void DataManSerializer::Log(const int level, const std::string &message, const b
void DataManSerializer::PutData(const std::string *inputData, const std::string &varName,
const Dims &varShape, const Dims &varStart, const Dims &varCount,
const Dims &varMemStart, const Dims &varMemCount,
const std::string &doid, const size_t step, const int rank,
const std::string &address,
const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer, JsonPtr metadataJson)
{
Expand Down Expand Up @@ -646,8 +646,14 @@ void DataManSerializer::PutData(const std::string *inputData, const std::string

localBuffer->resize(localBuffer->size() + inputData->size());

std::memcpy(localBuffer->data() + localBuffer->size() - inputData->size(), inputData->data(),
inputData->size());
#ifdef ADIOS2_HAVE_GPU_SUPPORT
if (varMemSpace == MemorySpace::GPU)
helper::CopyFromGPUToBuffer(localBuffer->data(), localBuffer->size() - inputData->size(),
inputData->data(), varMemSpace, inputData->size());
#endif
if (varMemSpace == MemorySpace::Host)
std::memcpy(localBuffer->data() + localBuffer->size() - inputData->size(),
inputData->data(), inputData->size());

if (metadataJson == nullptr)
{
Expand All @@ -665,7 +671,8 @@ void DataManSerializer::PutData(const std::string *inputData, const std::string
template <>
int DataManSerializer::GetData(std::string *outputData, const std::string &varName,
const Dims &varStart, const Dims &varCount, const size_t step,
const Dims &varMemStart, const Dims &varMemCount)
const MemorySpace varMemSpace, const Dims &varMemStart,
const Dims &varMemCount)
{
PERFSTUBS_SCOPED_TIMER_FUNC();

Expand Down
15 changes: 8 additions & 7 deletions source/adios2/toolkit/format/dataman/DataManSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ class DataManSerializer
// put a variable for writer
void PutData(const std::string *inputData, const std::string &varName, const Dims &varShape,
const Dims &varStart, const Dims &varCount, const Dims &varMemStart,
const Dims &varMemCount, const std::string &doid, const size_t step,
const int rank, const std::string &address,
const Dims &varMemCount, const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer = nullptr, JsonPtr metadataJson = nullptr);

template <class T>
void PutData(const T *inputData, const std::string &varName, const Dims &varShape,
const Dims &varStart, const Dims &varCount, const Dims &varMemStart,
const Dims &varMemCount, const std::string &doid, const size_t step,
const int rank, const std::string &address,
const Dims &varMemCount, const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer = nullptr, JsonPtr metadataJson = nullptr);

Expand Down Expand Up @@ -134,8 +134,8 @@ class DataManSerializer

template <class T>
int GetData(T *output_data, const std::string &varName, const Dims &varStart,
const Dims &varCount, const size_t step, const Dims &varMemStart = Dims(),
const Dims &varMemCount = Dims());
const Dims &varCount, const size_t step, const MemorySpace varMemSpace,
const Dims &varMemStart = Dims(), const Dims &varMemCount = Dims());

void Erase(const size_t step, const bool allPreviousSteps = false);

Expand Down Expand Up @@ -166,7 +166,8 @@ class DataManSerializer
nlohmann::json DeserializeJson(const char *start, size_t size);

template <typename T>
void CalculateMinMax(const T *data, const Dims &count, nlohmann::json &metaj);
void CalculateMinMax(const T *data, const Dims &count, const MemorySpace varMemSpace,
nlohmann::json &metaj);

bool StepHasMinimumBlocks(const size_t step, const int requireMinimumBlocks);

Expand Down
Loading