Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ public static Properties getPropertiesWithPerformanceOptions(String testName) {
Properties properties = getProperties(testName);

// For Blob Storage
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, "5242880"); // 5MB
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_CONCURRENCY, "4");
properties.setProperty(
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, "30");
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_SECS, "30");

// For S3
properties.setProperty(S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
properties.setProperty(S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
properties.setProperty(S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
properties.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, "30");
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, "5242880"); // 5MB
properties.setProperty(S3Config.MULTIPART_UPLOAD_MAX_CONCURRENCY, "4");
properties.setProperty(S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
properties.setProperty(S3Config.REQUEST_TIMEOUT_SECS, "30");

return properties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,34 @@ public class ObjectStorageWrapperLargeObjectWriteIntegrationTest {
@BeforeAll
public void beforeAll() throws ObjectStorageWrapperException {
Properties properties = getProperties(TEST_NAME);
long parallelUploadThresholdInBytes;
long payloadSizeBytes;

if (ObjectStorageEnv.isBlobStorage()) {
// Minimum block size must be greater than or equal to 256KB for Blob Storage
Long parallelUploadUnit = 256 * 1024L; // 256KB
Long uploadUnit = 256 * 1024L; // 256KB
properties.setProperty(
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
String.valueOf(parallelUploadUnit));
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, String.valueOf(uploadUnit));
properties.setProperty(
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES,
String.valueOf(parallelUploadUnit * 2));
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
payloadSizeBytes = uploadUnit * 2 + 1;
} else if (ObjectStorageEnv.isCloudStorage()) {
// Minimum block size must be greater than or equal to 256KB for Cloud Storage
Long parallelUploadUnit = 256 * 1024L; // 256KB
Long uploadUnit = 256 * 1024L; // 256KB
properties.setProperty(
CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
String.valueOf(parallelUploadUnit));
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
CloudStorageConfig.UPLOAD_CHUNK_SIZE_BYTES, String.valueOf(uploadUnit));
payloadSizeBytes = uploadUnit * 2 + 1;
} else if (ObjectStorageEnv.isS3()) {
// Minimum part size must be greater than or equal to 5MB for S3
Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB
Long uploadUnit = 5 * 1024 * 1024L; // 5MB
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, String.valueOf(uploadUnit));
properties.setProperty(
S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, String.valueOf(parallelUploadUnit));
properties.setProperty(
S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2));
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
payloadSizeBytes = uploadUnit * 2 + 1;
} else {
throw new AssertionError();
}

char[] charArray = new char[(int) parallelUploadThresholdInBytes + 1];
char[] charArray = new char[(int) payloadSizeBytes];
Arrays.fill(charArray, 'a');
testObject1 = new String(charArray);
Arrays.fill(charArray, 'b');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@

public class BlobStorageConfig implements ObjectStorageConfig {
public static final String STORAGE_NAME = "blob-storage";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
public static final String STORAGE_NAME_IN_PREFIX = "blob_storage";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";

public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
PREFIX + "parallel_upload_block_size_in_bytes";
public static final String PARALLEL_UPLOAD_MAX_PARALLELISM =
PREFIX + "parallel_upload_max_parallelism";
public static final String PARALLEL_UPLOAD_THRESHOLD_IN_BYTES =
PREFIX + "parallel_upload_threshold_in_bytes";
public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds";
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_BYTES =
PREFIX + "parallel_upload_block_size_bytes";
public static final String PARALLEL_UPLOAD_MAX_CONCURRENCY =
PREFIX + "parallel_upload_max_concurrency";
public static final String PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES =
PREFIX + "parallel_upload_threshold_size_bytes";
public static final String REQUEST_TIMEOUT_SECS = PREFIX + "request_timeout_secs";

private static final Logger logger = LoggerFactory.getLogger(BlobStorageConfig.class);
private final String endpoint;
Expand All @@ -29,10 +30,10 @@ public class BlobStorageConfig implements ObjectStorageConfig {
private final String bucket;
private final String metadataNamespace;

private final Long parallelUploadBlockSizeInBytes;
private final Integer parallelUploadMaxParallelism;
private final Long parallelUploadThresholdInBytes;
private final Integer requestTimeoutInSeconds;
private final Long parallelUploadBlockSizeBytes;
private final Integer parallelUploadMaxConcurrency;
private final Long parallelUploadThresholdSizeBytes;
private final Integer requestTimeoutSecs;

public BlobStorageConfig(DatabaseConfig databaseConfig) {
String storage = databaseConfig.getStorage();
Expand Down Expand Up @@ -63,14 +64,13 @@ public BlobStorageConfig(DatabaseConfig databaseConfig) {
+ "\" is not applicable to Blob Storage and will be ignored.");
}

parallelUploadBlockSizeInBytes =
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
parallelUploadMaxParallelism =
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_PARALLELISM, null);
parallelUploadThresholdInBytes =
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, null);
requestTimeoutInSeconds =
getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_IN_SECONDS, null);
parallelUploadBlockSizeBytes =
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, null);
parallelUploadMaxConcurrency =
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_CONCURRENCY, null);
parallelUploadThresholdSizeBytes =
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, null);
requestTimeoutSecs = getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_SECS, null);
}

@Override
Expand Down Expand Up @@ -101,19 +101,19 @@ public String getUsername() {
return username;
}

public Optional<Long> getParallelUploadBlockSizeInBytes() {
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
public Optional<Long> getParallelUploadBlockSizeBytes() {
return Optional.ofNullable(parallelUploadBlockSizeBytes);
}

public Optional<Integer> getParallelUploadMaxParallelism() {
return Optional.ofNullable(parallelUploadMaxParallelism);
public Optional<Integer> getParallelUploadMaxConcurrency() {
return Optional.ofNullable(parallelUploadMaxConcurrency);
}

public Optional<Long> getParallelUploadThresholdInBytes() {
return Optional.ofNullable(parallelUploadThresholdInBytes);
public Optional<Long> getParallelUploadThresholdSizeBytes() {
return Optional.ofNullable(parallelUploadThresholdSizeBytes);
}

public Optional<Integer> getRequestTimeoutInSeconds() {
return Optional.ofNullable(requestTimeoutInSeconds);
public Optional<Integer> getRequestTimeoutSecs() {
return Optional.ofNullable(requestTimeoutSecs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@ThreadSafe
public class BlobStorageWrapper implements ObjectStorageWrapper {
private final BlobContainerClient client;
private final Duration requestTimeoutInSeconds;
private final Duration requestTimeoutSecs;
private final ParallelTransferOptions parallelTransferOptions;

public BlobStorageWrapper(BlobStorageConfig config) {
Expand All @@ -37,18 +37,17 @@ public BlobStorageWrapper(BlobStorageConfig config) {
.credential(new StorageSharedKeyCredential(config.getUsername(), config.getPassword()))
.buildClient()
.getBlobContainerClient(config.getBucket());
this.requestTimeoutInSeconds =
config.getRequestTimeoutInSeconds().map(Duration::ofSeconds).orElse(null);
this.requestTimeoutSecs = config.getRequestTimeoutSecs().map(Duration::ofSeconds).orElse(null);
this.parallelTransferOptions = new ParallelTransferOptions();
if (config.getParallelUploadBlockSizeInBytes().isPresent()) {
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeInBytes().get());
if (config.getParallelUploadBlockSizeBytes().isPresent()) {
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeBytes().get());
}
if (config.getParallelUploadMaxParallelism().isPresent()) {
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxParallelism().get());
if (config.getParallelUploadMaxConcurrency().isPresent()) {
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxConcurrency().get());
}
if (config.getParallelUploadThresholdInBytes().isPresent()) {
if (config.getParallelUploadThresholdSizeBytes().isPresent()) {
parallelTransferOptions.setMaxSingleUploadSizeLong(
config.getParallelUploadThresholdInBytes().get());
config.getParallelUploadThresholdSizeBytes().get());
}
}

Expand All @@ -58,7 +57,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
try {
BlobClient blobClient = client.getBlobClient(key);
BlobDownloadContentResponse response =
blobClient.downloadContentWithResponse(null, null, requestTimeoutInSeconds, null);
blobClient.downloadContentWithResponse(null, null, requestTimeoutSecs, null);
String data = response.getValue().toString();
String eTag = response.getHeaders().getValue(HttpHeaderName.ETAG);
return Optional.of(new ObjectStorageWrapperResponse(data, eTag));
Expand All @@ -77,8 +76,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
@Override
public Set<String> getKeys(String prefix) throws ObjectStorageWrapperException {
try {
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
.stream()
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs).stream()
.map(BlobItem::getName)
.collect(Collectors.toSet());
} catch (Exception e) {
Expand All @@ -95,7 +93,7 @@ public void insert(String key, String object) throws ObjectStorageWrapperExcepti
new BlobParallelUploadOptions(BinaryData.fromString(object))
.setRequestConditions(new BlobRequestConditions().setIfNoneMatch("*"))
.setParallelTransferOptions(parallelTransferOptions);
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
} catch (BlobStorageException e) {
if (e.getErrorCode().equals(BlobErrorCode.BLOB_ALREADY_EXISTS)) {
throw new PreconditionFailedException(
Expand All @@ -120,7 +118,7 @@ public void update(String key, String object, String version)
new BlobParallelUploadOptions(BinaryData.fromString(object))
.setRequestConditions(new BlobRequestConditions().setIfMatch(version))
.setParallelTransferOptions(parallelTransferOptions);
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
} catch (BlobStorageException e) {
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
Expand Down Expand Up @@ -162,7 +160,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
try {
BlobClient blobClient = client.getBlobClient(key);
blobClient.deleteWithResponse(
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutInSeconds, null);
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutSecs, null);
} catch (BlobStorageException e) {
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
Expand All @@ -183,7 +181,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException {
try {
client
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs)
.forEach(
blobItem -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

public class CloudStorageConfig implements ObjectStorageConfig {
public static final String STORAGE_NAME = "cloud-storage";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
public static final String STORAGE_NAME_IN_PREFIX = "cloud_storage";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";

public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
PREFIX + "parallel_upload_block_size_in_bytes";
public static final String UPLOAD_CHUNK_SIZE_BYTES = PREFIX + "upload_chunk_size_bytes";

private static final Logger logger = LoggerFactory.getLogger(CloudStorageConfig.class);
private final String password;
private final String bucket;
private final String metadataNamespace;
private final String projectId;
private final Integer parallelUploadBlockSizeInBytes;
private final Integer uploadChunkSizeBytes;

public CloudStorageConfig(DatabaseConfig databaseConfig) {
String storage = databaseConfig.getStorage();
Expand All @@ -49,8 +49,7 @@ public CloudStorageConfig(DatabaseConfig databaseConfig) {
+ "\" is not applicable to Cloud Storage and will be ignored.");
}

parallelUploadBlockSizeInBytes =
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
uploadChunkSizeBytes = getInt(databaseConfig.getProperties(), UPLOAD_CHUNK_SIZE_BYTES, null);
}

@Override
Expand Down Expand Up @@ -92,7 +91,7 @@ public Credentials getCredentials() {
}
}

public Optional<Integer> getParallelUploadBlockSizeInBytes() {
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
public Optional<Integer> getUploadChunkSizeBytes() {
return Optional.ofNullable(uploadChunkSizeBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CloudStorageWrapper implements ObjectStorageWrapper {

private final Storage storage;
private final String bucket;
private final Integer parallelUploadBlockSizeInBytes;
private final Integer uploadChunkSizeBytes;

public CloudStorageWrapper(CloudStorageConfig config) {
storage =
Expand All @@ -41,15 +41,15 @@ public CloudStorageWrapper(CloudStorageConfig config) {
.build()
.getService();
bucket = config.getBucket();
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
}

@VisibleForTesting
@SuppressFBWarnings("EI_EXPOSE_REP2")
public CloudStorageWrapper(CloudStorageConfig config, Storage storage) {
this.storage = storage;
this.bucket = config.getBucket();
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
}

@Override
Expand Down Expand Up @@ -209,8 +209,8 @@ private void writeData(String key, String object, Storage.BlobWriteOption precon
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucket, key)).build();

try (WriteChannel writer = storage.writer(blobInfo, precondition)) {
if (parallelUploadBlockSizeInBytes != null) {
writer.setChunkSize(parallelUploadBlockSizeInBytes);
if (uploadChunkSizeBytes != null) {
writer.setChunkSize(uploadChunkSizeBytes);
}
ByteBuffer buffer = ByteBuffer.wrap(data);
while (buffer.hasRemaining()) {
Expand Down
Loading