Skip to content

Commit 337e915

Browse files
pixccdcherednik
andauthored
24-3: Fix disabled use virtual addressing option for import from S3 (#9162) (#12915)
Co-authored-by: Daniil Cherednik <dcherednik@ydb.tech>
1 parent 4d0b0f0 commit 337e915

File tree

13 files changed

+356
-182
lines changed

13 files changed

+356
-182
lines changed
Lines changed: 9 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,146 +1,18 @@
11
#include "s3_recipe_ut_helpers.h"
22

33
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
4-
#include <library/cpp/testing/hook/hook.h>
5-
6-
#include <util/string/builder.h>
7-
8-
#include <aws/core/Aws.h>
9-
10-
Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
11-
Aws::InitAPI(Aws::SDKOptions());
12-
}
13-
14-
Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
15-
Aws::ShutdownAPI(Aws::SDKOptions());
16-
}
174

185
namespace NTestUtils {
196

20-
extern const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])";
21-
22-
extern const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";
23-
24-
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot) {
25-
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
26-
}
27-
28-
Aws::S3::S3Client MakeS3Client() {
29-
Aws::Client::ClientConfiguration s3ClientConfig;
30-
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
31-
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
32-
return Aws::S3::S3Client(
33-
std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(),
34-
s3ClientConfig,
35-
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
36-
/*useVirtualAddressing=*/true);
37-
}
38-
39-
void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) {
40-
Aws::S3::Model::CreateBucketRequest req;
41-
req.SetBucket(bucket);
42-
req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
43-
const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
44-
UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
45-
}
46-
47-
void CreateBucket(const TString& bucket) {
48-
Aws::S3::S3Client s3Client = MakeS3Client();
49-
50-
CreateBucket(bucket, s3Client);
51-
}
52-
53-
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
54-
Aws::S3::Model::PutObjectRequest req;
55-
req.WithBucket(bucket).WithKey(object);
56-
57-
auto inputStream = std::make_shared<std::stringstream>();
58-
*inputStream << content;
59-
req.SetBody(inputStream);
60-
const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
61-
UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
62-
}
63-
64-
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) {
65-
Aws::S3::S3Client s3Client = MakeS3Client();
66-
67-
UploadObject(bucket, object, content, s3Client);
68-
}
7+
using std::shared_ptr;
8+
using namespace NKikimr::NKqp;
699

70-
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
71-
CreateBucket(bucket, s3Client);
72-
UploadObject(bucket, object, content, s3Client);
73-
}
10+
const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])";
11+
const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";
7412

75-
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
76-
Aws::S3::S3Client s3Client = MakeS3Client();
77-
78-
CreateBucketWithObject(bucket, object, content, s3Client);
79-
}
80-
81-
TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) {
82-
Aws::S3::Model::GetObjectRequest req;
83-
req.WithBucket(bucket).WithKey(object);
84-
85-
Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req);
86-
UNIT_ASSERT(outcome.IsSuccess());
87-
Aws::S3::Model::GetObjectResult& result = outcome.GetResult();
88-
std::istreambuf_iterator<char> eos;
89-
std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos);
90-
Cerr << "Got object content from \"" << bucket << "." << object << "\"\n"
91-
<< objContent << Endl;
92-
return objContent;
93-
}
94-
95-
TString GetObject(const TString& bucket, const TString& object) {
96-
Aws::S3::S3Client s3Client = MakeS3Client();
97-
98-
return GetObject(bucket, object, s3Client);
99-
}
100-
101-
std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) {
102-
Aws::S3::Model::ListObjectsRequest listReq;
103-
listReq.WithBucket(bucket);
104-
105-
Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq);
106-
UNIT_ASSERT(outcome.IsSuccess());
107-
108-
std::vector<TString> keys;
109-
for (auto& obj : outcome.GetResult().GetContents()) {
110-
keys.push_back(TString(obj.GetKey()));
111-
Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl;
112-
}
113-
return keys;
114-
}
115-
116-
std::vector<TString> GetObjectKeys(const TString& bucket) {
117-
Aws::S3::S3Client s3Client = MakeS3Client();
118-
119-
return GetObjectKeys(bucket, s3Client);
120-
}
121-
122-
TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) {
123-
std::vector<TString> keys = GetObjectKeys(bucket, s3Client);
124-
TString result;
125-
bool firstObject = true;
126-
for (const TString& key : keys) {
127-
result += GetObject(bucket, key, s3Client);
128-
if (!firstObject) {
129-
result += separator;
130-
}
131-
firstObject = false;
132-
}
133-
return result;
134-
}
135-
136-
TString GetAllObjects(const TString& bucket, TStringBuf separator) {
137-
Aws::S3::S3Client s3Client = MakeS3Client();
138-
139-
return GetAllObjects(bucket, separator, s3Client);
140-
}
141-
142-
TString GetBucketLocation(const TStringBuf bucket) {
143-
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
144-
}
13+
shared_ptr<TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot)
14+
{
15+
return NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
16+
}
14517

146-
} // namespace NTestUtils
18+
}

ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.h

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,21 @@
22

33
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
44
#include <ydb/core/kqp/ut/federated_query/common/common.h>
5-
#include <library/cpp/testing/unittest/registar.h>
6-
7-
#include <util/generic/strbuf.h>
8-
#include <util/generic/string.h>
9-
#include <util/system/env.h>
10-
11-
#include <aws/core/auth/AWSCredentialsProvider.h>
12-
#include <aws/core/Aws.h>
13-
#include <aws/s3/model/CreateBucketRequest.h>
14-
#include <aws/s3/model/GetObjectRequest.h>
15-
#include <aws/s3/model/ListObjectsRequest.h>
16-
#include <aws/s3/model/PutObjectRequest.h>
17-
#include <aws/s3/S3Client.h>
5+
#include <ydb/library/testlib/s3_recipe_helper/s3_recipe_helper.h>
186

197
namespace NTestUtils {
208

21-
constexpr TStringBuf TEST_CONTENT =
22-
R"({"key": "1", "value": "trololo"}
23-
{"key": "2", "value": "hello world"})"sv;
24-
25-
constexpr TStringBuf TEST_CONTENT_KEYS =
26-
R"({"key": "1"}
27-
{"key": "3"})"sv;
28-
29-
extern const TString TEST_SCHEMA;
30-
extern const TString TEST_SCHEMA_IDS;
31-
32-
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");
33-
34-
Aws::S3::S3Client MakeS3Client();
35-
36-
void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client);
37-
void CreateBucket(const TString& bucket);
38-
39-
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client);
40-
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content);
41-
42-
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client);
43-
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content);
44-
45-
TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client);
46-
TString GetObject(const TString& bucket, const TString& object);
9+
constexpr TStringBuf TEST_CONTENT =
10+
R"({"key": "1", "value": "trololo"}
11+
{"key": "2", "value": "hello world"})"sv;
4712

48-
std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client);
49-
std::vector<TString> GetObjectKeys(const TString& bucket);
13+
constexpr TStringBuf TEST_CONTENT_KEYS =
14+
R"({"key": "1"}
15+
{"key": "3"})"sv;
5016

51-
TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client);
52-
TString GetAllObjects(const TString& bucket, TStringBuf separator = {});
17+
extern const TString TEST_SCHEMA;
18+
extern const TString TEST_SCHEMA_IDS;
5319

54-
TString GetBucketLocation(const TStringBuf bucket);
20+
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");
5521

5622
} // namespace NTestUtils

ydb/core/kqp/ut/federated_query/s3/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PEERDIR(
2222
ydb/core/kqp/ut/federated_query/common
2323
ydb/library/yql/providers/s3/actors
2424
ydb/library/yql/sql/pg_dummy
25+
ydb/library/testlib/s3_recipe_helper
2526
ydb/public/sdk/cpp/client/ydb_types/operation
2627
)
2728

ydb/core/wrappers/s3_storage_config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,15 @@ IExternalStorageOperator::TPtr TS3ExternalStorageConfig::DoConstructStorageOpera
171171
TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Import::ImportFromS3Settings& settings)
172172
: Config(ConfigFromSettings(settings))
173173
, Credentials(CredentialsFromSettings(settings))
174+
, UseVirtualAddressing(!settings.disable_virtual_addressing())
174175
{
175176
Bucket = settings.bucket();
176177
}
177178

178179
TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Export::ExportToS3Settings& settings)
179180
: Config(ConfigFromSettings(settings))
180181
, Credentials(CredentialsFromSettings(settings))
182+
, UseVirtualAddressing(!settings.disable_virtual_addressing())
181183
{
182184
Bucket = settings.bucket();
183185
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#include "s3_recipe_helper.h"
2+
3+
#include <library/cpp/testing/hook/hook.h>
4+
5+
#include <util/string/builder.h>
6+
7+
#include <aws/core/Aws.h>
8+
9+
Y_TEST_HOOK_BEFORE_RUN(InitAwsAPI) {
10+
Aws::InitAPI(Aws::SDKOptions());
11+
}
12+
13+
Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) {
14+
Aws::ShutdownAPI(Aws::SDKOptions());
15+
}
16+
17+
namespace NTestUtils {
18+
19+
Aws::S3::S3Client MakeS3Client() {
20+
Aws::Client::ClientConfiguration s3ClientConfig;
21+
s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT");
22+
s3ClientConfig.scheme = Aws::Http::Scheme::HTTP;
23+
return Aws::S3::S3Client(
24+
std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(),
25+
s3ClientConfig,
26+
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
27+
/*useVirtualAddressing=*/true);
28+
}
29+
30+
void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) {
31+
Aws::S3::Model::CreateBucketRequest req;
32+
req.SetBucket(bucket);
33+
req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write);
34+
const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req);
35+
UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
36+
}
37+
38+
void CreateBucket(const TString& bucket) {
39+
Aws::S3::S3Client s3Client = MakeS3Client();
40+
41+
CreateBucket(bucket, s3Client);
42+
}
43+
44+
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
45+
Aws::S3::Model::PutObjectRequest req;
46+
req.WithBucket(bucket).WithKey(object);
47+
48+
auto inputStream = std::make_shared<std::stringstream>();
49+
*inputStream << content;
50+
req.SetBody(inputStream);
51+
const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req);
52+
UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage());
53+
}
54+
55+
void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) {
56+
Aws::S3::S3Client s3Client = MakeS3Client();
57+
58+
UploadObject(bucket, object, content, s3Client);
59+
}
60+
61+
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) {
62+
CreateBucket(bucket, s3Client);
63+
UploadObject(bucket, object, content, s3Client);
64+
}
65+
66+
void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) {
67+
Aws::S3::S3Client s3Client = MakeS3Client();
68+
69+
CreateBucketWithObject(bucket, object, content, s3Client);
70+
}
71+
72+
TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) {
73+
Aws::S3::Model::GetObjectRequest req;
74+
req.WithBucket(bucket).WithKey(object);
75+
76+
Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req);
77+
UNIT_ASSERT(outcome.IsSuccess());
78+
Aws::S3::Model::GetObjectResult& result = outcome.GetResult();
79+
std::istreambuf_iterator<char> eos;
80+
std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos);
81+
Cerr << "Got object content from \"" << bucket << "." << object << "\"\n"
82+
<< objContent << Endl;
83+
return objContent;
84+
}
85+
86+
TString GetObject(const TString& bucket, const TString& object) {
87+
Aws::S3::S3Client s3Client = MakeS3Client();
88+
89+
return GetObject(bucket, object, s3Client);
90+
}
91+
92+
std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) {
93+
Aws::S3::Model::ListObjectsRequest listReq;
94+
listReq.WithBucket(bucket);
95+
96+
Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq);
97+
UNIT_ASSERT(outcome.IsSuccess());
98+
99+
std::vector<TString> keys;
100+
for (auto& obj : outcome.GetResult().GetContents()) {
101+
keys.push_back(TString(obj.GetKey()));
102+
Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl;
103+
}
104+
return keys;
105+
}
106+
107+
std::vector<TString> GetObjectKeys(const TString& bucket) {
108+
Aws::S3::S3Client s3Client = MakeS3Client();
109+
110+
return GetObjectKeys(bucket, s3Client);
111+
}
112+
113+
TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) {
114+
std::vector<TString> keys = GetObjectKeys(bucket, s3Client);
115+
TString result;
116+
bool firstObject = true;
117+
for (const TString& key : keys) {
118+
result += GetObject(bucket, key, s3Client);
119+
if (!firstObject) {
120+
result += separator;
121+
}
122+
firstObject = false;
123+
}
124+
return result;
125+
}
126+
127+
TString GetAllObjects(const TString& bucket, TStringBuf separator) {
128+
Aws::S3::S3Client s3Client = MakeS3Client();
129+
130+
return GetAllObjects(bucket, separator, s3Client);
131+
}
132+
133+
TString GetBucketLocation(const TStringBuf bucket) {
134+
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
135+
}
136+
137+
} // namespace NTestUtils

0 commit comments

Comments
 (0)