Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 9 additions & 137 deletions ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,146 +1,18 @@
#include "s3_recipe_ut_helpers.h"

#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
#include <library/cpp/testing/hook/hook.h>

#include <util/string/builder.h>

#include <aws/core/Aws.h>

Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
Aws::InitAPI(Aws::SDKOptions());
}

Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
Aws::ShutdownAPI(Aws::SDKOptions());
}

namespace NTestUtils {

extern const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])";

extern const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot) {
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
}

Aws::S3::S3Client MakeS3Client() {
Aws::Client::ClientConfiguration s3ClientConfig;
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
return Aws::S3::S3Client(
std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(),
s3ClientConfig,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/*useVirtualAddressing=*/true);
}

void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::CreateBucketRequest req;
req.SetBucket(bucket);
req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
}

void CreateBucket(const TString& bucket) {
Aws::S3::S3Client s3Client = MakeS3Client();

CreateBucket(bucket, s3Client);
}

void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::PutObjectRequest req;
req.WithBucket(bucket).WithKey(object);

auto inputStream = std::make_shared<std::stringstream>();
*inputStream << content;
req.SetBody(inputStream);
const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
}

void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) {
Aws::S3::S3Client s3Client = MakeS3Client();

UploadObject(bucket, object, content, s3Client);
}
using std::shared_ptr;
using namespace NKikimr::NKqp;

void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
CreateBucket(bucket, s3Client);
UploadObject(bucket, object, content, s3Client);
}
const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])";
const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";

void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
Aws::S3::S3Client s3Client = MakeS3Client();

CreateBucketWithObject(bucket, object, content, s3Client);
}

TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::GetObjectRequest req;
req.WithBucket(bucket).WithKey(object);

Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req);
UNIT_ASSERT(outcome.IsSuccess());
Aws::S3::Model::GetObjectResult& result = outcome.GetResult();
std::istreambuf_iterator<char> eos;
std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos);
Cerr << "Got object content from \"" << bucket << "." << object << "\"\n"
<< objContent << Endl;
return objContent;
}

TString GetObject(const TString& bucket, const TString& object) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetObject(bucket, object, s3Client);
}

std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::ListObjectsRequest listReq;
listReq.WithBucket(bucket);

Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq);
UNIT_ASSERT(outcome.IsSuccess());

std::vector<TString> keys;
for (auto& obj : outcome.GetResult().GetContents()) {
keys.push_back(TString(obj.GetKey()));
Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl;
}
return keys;
}

std::vector<TString> GetObjectKeys(const TString& bucket) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetObjectKeys(bucket, s3Client);
}

TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) {
std::vector<TString> keys = GetObjectKeys(bucket, s3Client);
TString result;
bool firstObject = true;
for (const TString& key : keys) {
result += GetObject(bucket, key, s3Client);
if (!firstObject) {
result += separator;
}
firstObject = false;
}
return result;
}

TString GetAllObjects(const TString& bucket, TStringBuf separator) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetAllObjects(bucket, separator, s3Client);
}

TString GetBucketLocation(const TStringBuf bucket) {
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
}
shared_ptr<TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot)
{
return NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
}

} // namespace NTestUtils
}
54 changes: 10 additions & 44 deletions ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,21 @@

#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/federated_query/common/common.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/generic/strbuf.h>
#include <util/generic/string.h>
#include <util/system/env.h>

#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/Aws.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/S3Client.h>
#include <ydb/library/testlib/s3_recipe_helper/s3_recipe_helper.h>

namespace NTestUtils {

constexpr TStringBuf TEST_CONTENT =
R"({"key": "1", "value": "trololo"}
{"key": "2", "value": "hello world"})"sv;

constexpr TStringBuf TEST_CONTENT_KEYS =
R"({"key": "1"}
{"key": "3"})"sv;

extern const TString TEST_SCHEMA;
extern const TString TEST_SCHEMA_IDS;

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");

Aws::S3::S3Client MakeS3Client();

void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client);
void CreateBucket(const TString& bucket);

void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client);
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content);

void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client);
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content);

TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client);
TString GetObject(const TString& bucket, const TString& object);
constexpr TStringBuf TEST_CONTENT =
R"({"key": "1", "value": "trololo"}
{"key": "2", "value": "hello world"})"sv;

std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client);
std::vector<TString> GetObjectKeys(const TString& bucket);
constexpr TStringBuf TEST_CONTENT_KEYS =
R"({"key": "1"}
{"key": "3"})"sv;

TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client);
TString GetAllObjects(const TString& bucket, TStringBuf separator = {});
extern const TString TEST_SCHEMA;
extern const TString TEST_SCHEMA_IDS;

TString GetBucketLocation(const TStringBuf bucket);
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");

} // namespace NTestUtils
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/federated_query/s3/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PEERDIR(
ydb/core/kqp/ut/federated_query/common
ydb/library/yql/providers/s3/actors
ydb/library/yql/sql/pg_dummy
ydb/library/testlib/s3_recipe_helper
ydb/public/sdk/cpp/client/ydb_types/operation
)

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/wrappers/s3_storage_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,15 @@ IExternalStorageOperator::TPtr TS3ExternalStorageConfig::DoConstructStorageOpera
TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings)
: Config(ConfigFromSettings(settings))
, Credentials(CredentialsFromSettings(settings))
, UseVirtualAddressing(!settings.disable_virtual_addressing())
{
Bucket = settings.bucket();
}

TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Export::ExportToS3Settings& settings)
: Config(ConfigFromSettings(settings))
, Credentials(CredentialsFromSettings(settings))
, UseVirtualAddressing(!settings.disable_virtual_addressing())
{
Bucket = settings.bucket();
}
Expand Down
137 changes: 137 additions & 0 deletions ydb/library/testlib/s3_recipe_helper/s3_recipe_helper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include "s3_recipe_helper.h"

#include <library/cpp/testing/hook/hook.h>

#include <util/string/builder.h>

#include <aws/core/Aws.h>

Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
Aws::InitAPI(Aws::SDKOptions());
}

Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
Aws::ShutdownAPI(Aws::SDKOptions());
}

namespace NTestUtils {

Aws::S3::S3Client MakeS3Client() {
Aws::Client::ClientConfiguration s3ClientConfig;
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
return Aws::S3::S3Client(
std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(),
s3ClientConfig,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/*useVirtualAddressing=*/true);
}

void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::CreateBucketRequest req;
req.SetBucket(bucket);
req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
}

void CreateBucket(const TString& bucket) {
Aws::S3::S3Client s3Client = MakeS3Client();

CreateBucket(bucket, s3Client);
}

void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::PutObjectRequest req;
req.WithBucket(bucket).WithKey(object);

auto inputStream = std::make_shared<std::stringstream>();
*inputStream << content;
req.SetBody(inputStream);
const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
}

void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) {
Aws::S3::S3Client s3Client = MakeS3Client();

UploadObject(bucket, object, content, s3Client);
}

void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
CreateBucket(bucket, s3Client);
UploadObject(bucket, object, content, s3Client);
}

void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
Aws::S3::S3Client s3Client = MakeS3Client();

CreateBucketWithObject(bucket, object, content, s3Client);
}

TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::GetObjectRequest req;
req.WithBucket(bucket).WithKey(object);

Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req);
UNIT_ASSERT(outcome.IsSuccess());
Aws::S3::Model::GetObjectResult& result = outcome.GetResult();
std::istreambuf_iterator<char> eos;
std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos);
Cerr << "Got object content from \"" << bucket << "." << object << "\"\n"
<< objContent << Endl;
return objContent;
}

TString GetObject(const TString& bucket, const TString& object) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetObject(bucket, object, s3Client);
}

std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) {
Aws::S3::Model::ListObjectsRequest listReq;
listReq.WithBucket(bucket);

Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq);
UNIT_ASSERT(outcome.IsSuccess());

std::vector<TString> keys;
for (auto& obj : outcome.GetResult().GetContents()) {
keys.push_back(TString(obj.GetKey()));
Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl;
}
return keys;
}

std::vector<TString> GetObjectKeys(const TString& bucket) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetObjectKeys(bucket, s3Client);
}

TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) {
std::vector<TString> keys = GetObjectKeys(bucket, s3Client);
TString result;
bool firstObject = true;
for (const TString& key : keys) {
result += GetObject(bucket, key, s3Client);
if (!firstObject) {
result += separator;
}
firstObject = false;
}
return result;
}

TString GetAllObjects(const TString& bucket, TStringBuf separator) {
Aws::S3::S3Client s3Client = MakeS3Client();

return GetAllObjects(bucket, separator, s3Client);
}

TString GetBucketLocation(const TStringBuf bucket) {
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
}

} // namespace NTestUtils
Loading