Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 119 additions & 92 deletions source/adios2/CMakeLists.txt

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions source/adios2/common/ADIOSTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ struct MinBlockInfo
{
int WriterID = 0;
size_t BlockID = 0;
size_t *Start;
size_t *Count;
const size_t *Start;
const size_t *Count;
MinMaxStruct MinMax;
void *BufferP = NULL;
};
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/core/Variable.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Dims Variable<T>::DoCount() const

if (!MVI->WasLocalValue)
{
size_t *DimsPtr = (MVI->BlocksInfo)[m_BlockID].Count;
const size_t *DimsPtr = (MVI->BlocksInfo)[m_BlockID].Count;
Dims D;
D.resize(MVI->Dims);
for (int i = 0; i < MVI->Dims; i++)
Expand Down
191 changes: 147 additions & 44 deletions source/adios2/engine/campaign/CampaignData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"

#include <cstring>
#include <fstream>
#include <iostream>
#include <stdio.h>
Expand Down Expand Up @@ -95,10 +96,10 @@ static int sqlcb_directory(void *p, int argc, char **argv, char **azColName)
return 0;
};

static int sqlcb_bpdataset(void *p, int argc, char **argv, char **azColName)
static int sqlcb_dataset(void *p, int argc, char **argv, char **azColName)
{
CampaignData *cdp = reinterpret_cast<CampaignData *>(p);
CampaignBPDataset cds;
CampaignDataset cds;
size_t dsid = helper::StringToSizeT(std::string(argv[0]), "SQL callback convert text to int");
size_t hostid = helper::StringToSizeT(std::string(argv[1]), "SQL callback convert text to int");
size_t dirid = helper::StringToSizeT(std::string(argv[2]), "SQL callback convert text to int");
Expand All @@ -119,16 +120,24 @@ static int sqlcb_bpdataset(void *p, int argc, char **argv, char **azColName)
{
cds.uuid = std::string(argv[5]);
}
cdp->bpdatasets[dsid] = cds;
if (cdp->version.version >= 0.4)
{
cds.format = FileFormat(std::string(argv[6]));
}
else
{
cds.format = FileFormat::ADIOS;
}
cdp->datasets[dsid] = cds;
return 0;
};

static int sqlcb_bpfile(void *p, int argc, char **argv, char **azColName)
static int sqlcb_file(void *p, int argc, char **argv, char **azColName)
{
CampaignData *cdp = reinterpret_cast<CampaignData *>(p);
CampaignBPFile cf;
CampaignFile cf;
size_t dsid = helper::StringToSizeT(std::string(argv[0]), "SQL callback convert text to int");
cf.bpDatasetIdx = dsid;
cf.datasetIdx = dsid;
cf.name = std::string(argv[1]);
int comp = helper::StringTo<int>(std::string(argv[2]), "SQL callback convert text to int");
cf.compressed = (bool)comp;
Expand All @@ -138,7 +147,7 @@ static int sqlcb_bpfile(void *p, int argc, char **argv, char **azColName)
helper::StringToSizeT(std::string(argv[4]), "SQL callback convert text to int");
cf.ctime = helper::StringTo<int64_t>(std::string(argv[5]), "SQL callback convert ctime to int");

CampaignBPDataset &cds = cdp->bpdatasets[cf.bpDatasetIdx];
CampaignDataset &cds = cdp->datasets[cf.datasetIdx];
cds.files.push_back(cf);
return 0;
};
Expand Down Expand Up @@ -196,7 +205,11 @@ void ReadCampaignData(sqlite3 *db, CampaignData &cd)
sqlite3_free(zErrMsg);
}

if (cd.version.version >= 0.3)
if (cd.version.version >= 0.4)
{
sqlcmd = "SELECT rowid, hostid, dirid, name, keyid, uuid, fileformat FROM dataset";
}
else if (cd.version.version >= 0.3)
{
sqlcmd = "SELECT rowid, hostid, dirid, name, keyid, uuid FROM bpdataset";
}
Expand All @@ -208,19 +221,27 @@ void ReadCampaignData(sqlite3 *db, CampaignData &cd)
{
sqlcmd = "SELECT rowid, hostid, dirid, name FROM bpdataset";
}
rc = sqlite3_exec(db, sqlcmd.c_str(), sqlcb_bpdataset, &cd, &zErrMsg);
rc = sqlite3_exec(db, sqlcmd.c_str(), sqlcb_dataset, &cd, &zErrMsg);
if (rc != SQLITE_OK)
{
std::cout << "SQL error: " << zErrMsg << std::endl;
std::string m(zErrMsg);
helper::Throw<std::invalid_argument>("Engine", "CampaignReader", "ReadCampaignData",
"SQL error on reading bpdataset records:" + m);
"SQL error on reading dataset records:" + m);
sqlite3_free(zErrMsg);
}

sqlcmd = "SELECT bpdatasetid, name, compression, lenorig, lencompressed, ctime "
"FROM bpfile";
rc = sqlite3_exec(db, sqlcmd.c_str(), sqlcb_bpfile, &cd, &zErrMsg);
if (cd.version.version >= 0.4)
{
sqlcmd = "SELECT datasetid, name, compression, lenorig, lencompressed, ctime "
"FROM file";
}
else
{
sqlcmd = "SELECT bpdatasetid, name, compression, lenorig, lencompressed, ctime "
"FROM bpfile";
}
rc = sqlite3_exec(db, sqlcmd.c_str(), sqlcb_file, &cd, &zErrMsg);
if (rc != SQLITE_OK)
{
std::cout << "SQL error: " << zErrMsg << std::endl;
Expand All @@ -238,7 +259,8 @@ void ReadCampaignData(sqlite3 *db, CampaignData &cd)
the version of the library linked do not match, or Z_ERRNO if there
is an error reading or writing the files.
http://www.zlib.net/zlib_how.html */
int inflateToFile(const unsigned char *source, const size_t blobsize, std::ofstream *dest)
int inflateToFileOrMemory(const unsigned char *source, const size_t blobsize,
std::ofstream *destFile, char *destData = nullptr)
{
constexpr size_t CHUNK = 16777216;
int ret;
Expand Down Expand Up @@ -284,11 +306,19 @@ int inflateToFile(const unsigned char *source, const size_t blobsize, std::ofstr
return ret;
}
have = CHUNK - strm.avail_out;
dest->write(reinterpret_cast<char *>(out.data()), have);
if (dest->bad())
if (destData)
{
memcpy(destData, out.data(), have);
destData += have;
}
else
{
helper::Throw<std::runtime_error>("Core", "Campaign", "Inflate",
"error writing file ");
destFile->write(reinterpret_cast<char *>(out.data()), have);
if (destFile->bad())
{
helper::Throw<std::runtime_error>("Core", "Campaign", "Inflate",
"error writing file ");
}
}

} while (strm.avail_out == 0);
Expand Down Expand Up @@ -352,7 +382,7 @@ static bool isFileNewer(const std::string path, int64_t ctime)

#ifdef ADIOS2_HAVE_SODIUM
void DecryptData(const unsigned char *encryptedData, size_t lenEncrypted, size_t lenDecrypted,
const CampaignBPFile &bpfile, std::string keystr, unsigned char *decryptedData)
const CampaignFile &bpfile, std::string keystr, unsigned char *decryptedData)
{
if (sodium_init() < 0)
{
Expand Down Expand Up @@ -400,48 +430,57 @@ void DecryptData(const unsigned char *encryptedData, size_t lenEncrypted, size_t
}
#endif

void SaveToFile(sqlite3 *db, const std::string &path, const CampaignBPFile &bpfile,
std::string &keyHex)
void DumpToFileOrMemory(sqlite3 *db, const CampaignFile &file, std::string &keyHex,
const std::string &path, char *data, const CampaignData &cd)
{
if (isFileNewer(path, bpfile.ctime))
if (!path.empty() && isFileNewer(path, file.ctime))
{
return;
}

int rc;
char *zErrMsg = 0;
std::string sqlcmd;
std::string id = std::to_string(bpfile.bpDatasetIdx);
std::string id = std::to_string(file.datasetIdx);

sqlite3_stmt *statement;
sqlcmd =
"SELECT data FROM bpfile WHERE bpdatasetid = " + id + " AND name = '" + bpfile.name + "'";
if (cd.version.version >= 0.4)
{
sqlcmd =
"SELECT data FROM file WHERE datasetid = " + id + " AND name = '" + file.name + "'";
}
else
{
sqlcmd =
"SELECT data FROM bpfile WHERE bpdatasetid = " + id + " AND name = '" + file.name + "'";
}
// std::cout << "SQL statement: " << sqlcmd << "\n";
rc = sqlite3_prepare_v2(db, sqlcmd.c_str(), static_cast<int>(sqlcmd.size()), &statement, NULL);
if (rc != SQLITE_OK)
{
const char *zErrMsg = sqlite3_errmsg(db);
std::cout << "SQL error: " << zErrMsg << std::endl;
std::string m(zErrMsg);
helper::Throw<std::invalid_argument>("Engine", "CampaignReader", "SaveToFIle",
"SQL error on reading info records:" + m);
sqlite3_free(zErrMsg);
}

int result = 0;
result = sqlite3_step(statement);
if (result != SQLITE_ROW)
{
helper::Throw<std::invalid_argument>("Engine", "CampaignReader", "SaveToFIle",
"Did not find record for :" + bpfile.name);
"Did not find record for :" + file.name);
}

int iBlobsize = sqlite3_column_bytes(statement, 0);
const void *blob = sqlite3_column_blob(statement, 0);

/*std::cout << "-- Retrieved from DB data of " << bpfile.name << " size = " << iBlobsize
<< " compressed = " << bpfile.compressed << " encryption key = " << keyHex
<< " compressed size = " << bpfile.lengthCompressed
<< " original size = " << bpfile.lengthOriginal << " blob = " << blob << "\n";*/
/*
std::cout << "-- Retrieved from DB data of " << file.name << " size = " << iBlobsize
<< " compressed = " << file.compressed << " encryption key = " << keyHex
<< " compressed size = " << file.lengthCompressed
<< " original size = " << file.lengthOriginal << " blob = " << blob << "\n";
*/

size_t blobsize = static_cast<size_t>(iBlobsize);
void *q = const_cast<void *>(blob);
Expand All @@ -450,34 +489,98 @@ void SaveToFile(sqlite3 *db, const std::string &path, const CampaignBPFile &bpfi
#ifdef ADIOS2_HAVE_SODIUM
if (!keyHex.empty())
{
size_t decryptedSize =
(bpfile.compressed ? bpfile.lengthCompressed : bpfile.lengthOriginal);
size_t decryptedSize = (file.compressed ? file.lengthCompressed : file.lengthOriginal);
q = malloc(decryptedSize);
free_q = true;
DecryptData(static_cast<const unsigned char *>(blob), blobsize, decryptedSize, bpfile,
keyHex, static_cast<unsigned char *>(q));
DecryptData(static_cast<const unsigned char *>(blob), blobsize, decryptedSize, file, keyHex,
static_cast<unsigned char *>(q));
}
#endif

std::ofstream f;
f.rdbuf()->pubsetbuf(0, 0);
f.open(path, std::ios::out | std::ios::binary);
if (bpfile.compressed)
if (path.empty())
{
const unsigned char *ptr = static_cast<const unsigned char *>(q);
inflateToFile(ptr, blobsize, &f);
// read into memory
if (file.compressed)
{
const unsigned char *ptr = static_cast<const unsigned char *>(q);
inflateToFileOrMemory(ptr, blobsize, nullptr, data);
}
else
{
memcpy(data, q, blobsize);
}
}
else
{
f.write(static_cast<const char *>(q), blobsize);
// Save to File
std::ofstream f;
f.rdbuf()->pubsetbuf(0, 0);
f.open(path, std::ios::out | std::ios::binary);
if (file.compressed)
{
const unsigned char *ptr = static_cast<const unsigned char *>(q);
inflateToFileOrMemory(ptr, blobsize, &f, nullptr);
}
else
{
f.write(static_cast<const char *>(q), blobsize);
}
f.close();
}
f.close();

if (free_q)
{
free(q);
}
}

void SaveToFile(sqlite3 *db, const std::string &path, const CampaignFile &file, std::string &keyHex,
const CampaignData &cd)
{
DumpToFileOrMemory(db, file, keyHex, path, nullptr, cd);
}

void ReadToMemory(sqlite3 *db, char *data, const CampaignFile &file, std::string &keyHex,
const CampaignData &cd)
{
DumpToFileOrMemory(db, file, keyHex, "", data, cd);
}

std::string FileFormat::ToString()
{
switch (value)
{
case FileFormat::ADIOS:
return "ADIOS";
case FileFormat::HDF5:
return "HDF5";
case FileFormat::TEXT:
return "TEXT";
default:
return "Unknown";
}
};

FileFormat::FileFormat(const std::string &fmtstr)
{
if (fmtstr == "ADIOS")
{
value = FileFormat::ADIOS;
}
else if (fmtstr == "HDF5")
{
value = FileFormat::HDF5;
}
else if (fmtstr == "TEXT")
{
value = FileFormat::TEXT;
}
else
{
value = FileFormat::Unknown;
}
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
Loading
Loading