diff --git a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java index 85e3c8be21..07c5fedb33 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java +++ b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java @@ -53,17 +53,17 @@ public static Properties getPropertiesWithPerformanceOptions(String testName) { Properties properties = getProperties(testName); // For Blob Storage - properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB - properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, "4"); + properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, "5242880"); // 5MB + properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_CONCURRENCY, "4"); properties.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB - properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, "30"); + BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB + properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_SECS, "30"); // For S3 - properties.setProperty(S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB - properties.setProperty(S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, "4"); - properties.setProperty(S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB - properties.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, "30"); + properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, "5242880"); // 5MB + properties.setProperty(S3Config.MULTIPART_UPLOAD_MAX_CONCURRENCY, "4"); + properties.setProperty(S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB + properties.setProperty(S3Config.REQUEST_TIMEOUT_SECS, "30"); return properties; } diff --git a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java index 2a80fc5047..d1cee596b1 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java @@ -36,38 +36,34 @@ public class ObjectStorageWrapperLargeObjectWriteIntegrationTest { @BeforeAll public void beforeAll() throws ObjectStorageWrapperException { Properties properties = getProperties(TEST_NAME); - long parallelUploadThresholdInBytes; + long payloadSizeBytes; if (ObjectStorageEnv.isBlobStorage()) { // Minimum block size must be greater than or equal to 256KB for Blob Storage - Long parallelUploadUnit = 256 * 1024L; // 256KB + Long uploadUnit = 256 * 1024L; // 256KB properties.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, - String.valueOf(parallelUploadUnit)); + BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, String.valueOf(uploadUnit)); properties.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, - String.valueOf(parallelUploadUnit * 2)); - parallelUploadThresholdInBytes = parallelUploadUnit * 2; + BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2)); + payloadSizeBytes = uploadUnit * 2 + 1; } else if (ObjectStorageEnv.isCloudStorage()) { // Minimum block size must be greater than or equal to 256KB for Cloud Storage - Long parallelUploadUnit = 256 * 1024L; // 256KB + Long uploadUnit = 256 * 1024L; // 256KB properties.setProperty( - CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, - String.valueOf(parallelUploadUnit)); - parallelUploadThresholdInBytes = parallelUploadUnit * 2; + CloudStorageConfig.UPLOAD_CHUNK_SIZE_BYTES, String.valueOf(uploadUnit)); + payloadSizeBytes = uploadUnit * 2 + 1; } else if (ObjectStorageEnv.isS3()) { // Minimum part size must be greater than or equal to 5MB for S3 - Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB + Long uploadUnit = 5 * 1024 * 1024L; // 5MB + properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, String.valueOf(uploadUnit)); properties.setProperty( - S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, String.valueOf(parallelUploadUnit)); - properties.setProperty( - S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2)); - parallelUploadThresholdInBytes = parallelUploadUnit * 2; + S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2)); + payloadSizeBytes = uploadUnit * 2 + 1; } else { throw new AssertionError(); } - char[] charArray = new char[(int) parallelUploadThresholdInBytes + 1]; + char[] charArray = new char[(int) payloadSizeBytes]; Arrays.fill(charArray, 'a'); testObject1 = new String(charArray); Arrays.fill(charArray, 'b'); diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java index a2839d9ec5..b6dac75bf1 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java @@ -13,16 +13,17 @@ public class BlobStorageConfig implements ObjectStorageConfig { public static final String STORAGE_NAME = "blob-storage"; - public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + "."; + public static final String STORAGE_NAME_IN_PREFIX = "blob_storage"; + public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + "."; public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace"; - public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = - PREFIX + "parallel_upload_block_size_in_bytes"; - public static final String PARALLEL_UPLOAD_MAX_PARALLELISM = - PREFIX + "parallel_upload_max_parallelism"; - public static final String PARALLEL_UPLOAD_THRESHOLD_IN_BYTES = - PREFIX + "parallel_upload_threshold_in_bytes"; - public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds"; + public static final String PARALLEL_UPLOAD_BLOCK_SIZE_BYTES = + PREFIX + "parallel_upload_block_size_bytes"; + public static final String PARALLEL_UPLOAD_MAX_CONCURRENCY = + PREFIX + "parallel_upload_max_concurrency"; + public static final String PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES = + PREFIX + "parallel_upload_threshold_size_bytes"; + public static final String REQUEST_TIMEOUT_SECS = PREFIX + "request_timeout_secs"; private static final Logger logger = LoggerFactory.getLogger(BlobStorageConfig.class); private final String endpoint; @@ -31,10 +32,10 @@ public class BlobStorageConfig implements ObjectStorageConfig { private final String bucket; private final String metadataNamespace; - private final Long parallelUploadBlockSizeInBytes; - private final Integer parallelUploadMaxParallelism; - private final Long parallelUploadThresholdInBytes; - private final Integer requestTimeoutInSeconds; + private final Long parallelUploadBlockSizeBytes; + private final Integer parallelUploadMaxConcurrency; + private final Long parallelUploadThresholdSizeBytes; + private final Integer requestTimeoutSecs; public BlobStorageConfig(DatabaseConfig databaseConfig) { String storage = databaseConfig.getStorage(); @@ -69,14 +70,13 @@ public BlobStorageConfig(DatabaseConfig databaseConfig) { + "\" is not applicable to Blob Storage and will be ignored."); } - parallelUploadBlockSizeInBytes = - getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null); - parallelUploadMaxParallelism = - getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_PARALLELISM, null); - parallelUploadThresholdInBytes = - getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, null); - requestTimeoutInSeconds = - getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_IN_SECONDS, null); + parallelUploadBlockSizeBytes = + getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, null); + parallelUploadMaxConcurrency = + getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_CONCURRENCY, null); + parallelUploadThresholdSizeBytes = + getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, null); + requestTimeoutSecs = getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_SECS, null); } @Override @@ -107,19 +107,19 @@ public String getUsername() { return username; } - public Optional getParallelUploadBlockSizeInBytes() { - return Optional.ofNullable(parallelUploadBlockSizeInBytes); + public Optional getParallelUploadBlockSizeBytes() { + return Optional.ofNullable(parallelUploadBlockSizeBytes); } - public Optional getParallelUploadMaxParallelism() { - return Optional.ofNullable(parallelUploadMaxParallelism); + public Optional getParallelUploadMaxConcurrency() { + return Optional.ofNullable(parallelUploadMaxConcurrency); } - public Optional getParallelUploadThresholdInBytes() { - return Optional.ofNullable(parallelUploadThresholdInBytes); + public Optional getParallelUploadThresholdSizeBytes() { + return Optional.ofNullable(parallelUploadThresholdSizeBytes); } - public Optional getRequestTimeoutInSeconds() { - return Optional.ofNullable(requestTimeoutInSeconds); + public Optional getRequestTimeoutSecs() { + return Optional.ofNullable(requestTimeoutSecs); } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java index 044696dc17..a8e36a0cf4 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java @@ -27,7 +27,7 @@ @ThreadSafe public class BlobStorageWrapper implements ObjectStorageWrapper { private final BlobContainerClient client; - private final Duration requestTimeoutInSeconds; + private final Duration requestTimeoutSecs; private final ParallelTransferOptions parallelTransferOptions; public BlobStorageWrapper(BlobStorageConfig config) { @@ -37,18 +37,17 @@ public BlobStorageWrapper(BlobStorageConfig config) { .credential(new StorageSharedKeyCredential(config.getUsername(), config.getPassword())) .buildClient() .getBlobContainerClient(config.getBucket()); - this.requestTimeoutInSeconds = - config.getRequestTimeoutInSeconds().map(Duration::ofSeconds).orElse(null); + this.requestTimeoutSecs = config.getRequestTimeoutSecs().map(Duration::ofSeconds).orElse(null); this.parallelTransferOptions = new ParallelTransferOptions(); - if (config.getParallelUploadBlockSizeInBytes().isPresent()) { - parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeInBytes().get()); + if (config.getParallelUploadBlockSizeBytes().isPresent()) { + parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeBytes().get()); } - if (config.getParallelUploadMaxParallelism().isPresent()) { - parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxParallelism().get()); + if (config.getParallelUploadMaxConcurrency().isPresent()) { + parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxConcurrency().get()); } - if (config.getParallelUploadThresholdInBytes().isPresent()) { + if (config.getParallelUploadThresholdSizeBytes().isPresent()) { parallelTransferOptions.setMaxSingleUploadSizeLong( - config.getParallelUploadThresholdInBytes().get()); + config.getParallelUploadThresholdSizeBytes().get()); } } @@ -58,7 +57,7 @@ public Optional get(String key) try { BlobClient blobClient = client.getBlobClient(key); BlobDownloadContentResponse response = - blobClient.downloadContentWithResponse(null, null, requestTimeoutInSeconds, null); + blobClient.downloadContentWithResponse(null, null, requestTimeoutSecs, null); String data = response.getValue().toString(); String eTag = response.getHeaders().getValue(HttpHeaderName.ETAG); return Optional.of(new ObjectStorageWrapperResponse(data, eTag)); @@ -77,8 +76,7 @@ public Optional get(String key) @Override public Set getKeys(String prefix) throws ObjectStorageWrapperException { try { - return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds) - .stream() + return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs).stream() .map(BlobItem::getName) .collect(Collectors.toSet()); } catch (Exception e) { @@ -95,7 +93,7 @@ public void insert(String key, String object) throws ObjectStorageWrapperExcepti new BlobParallelUploadOptions(BinaryData.fromString(object)) .setRequestConditions(new BlobRequestConditions().setIfNoneMatch("*")) .setParallelTransferOptions(parallelTransferOptions); - blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null); + blobClient.uploadWithResponse(options, requestTimeoutSecs, null); } catch (BlobStorageException e) { if (e.getErrorCode().equals(BlobErrorCode.BLOB_ALREADY_EXISTS)) { throw new PreconditionFailedException( @@ -120,7 +118,7 @@ public void update(String key, String object, String version) new BlobParallelUploadOptions(BinaryData.fromString(object)) .setRequestConditions(new BlobRequestConditions().setIfMatch(version)) .setParallelTransferOptions(parallelTransferOptions); - blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null); + blobClient.uploadWithResponse(options, requestTimeoutSecs, null); } catch (BlobStorageException e) { if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET) || e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) { @@ -162,7 +160,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept try { BlobClient blobClient = client.getBlobClient(key); blobClient.deleteWithResponse( - null, new BlobRequestConditions().setIfMatch(version), requestTimeoutInSeconds, null); + null, new BlobRequestConditions().setIfMatch(version), requestTimeoutSecs, null); } catch (BlobStorageException e) { if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET) || e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) { @@ -183,7 +181,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException { try { client - .listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds) + .listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs) .forEach( blobItem -> { try { diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java index ab80128269..372baa6fa0 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java @@ -17,18 +17,18 @@ public class CloudStorageConfig implements ObjectStorageConfig { public static final String STORAGE_NAME = "cloud-storage"; - public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + "."; + public static final String STORAGE_NAME_IN_PREFIX = "cloud_storage"; + public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + "."; public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace"; - public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = - PREFIX + "parallel_upload_block_size_in_bytes"; + public static final String UPLOAD_CHUNK_SIZE_BYTES = PREFIX + "upload_chunk_size_bytes"; private static final Logger logger = LoggerFactory.getLogger(CloudStorageConfig.class); private final String password; private final String bucket; private final String metadataNamespace; private final String projectId; - private final Integer parallelUploadBlockSizeInBytes; + private final Integer uploadChunkSizeBytes; public CloudStorageConfig(DatabaseConfig databaseConfig) { String storage = databaseConfig.getStorage(); @@ -55,8 +55,7 @@ public CloudStorageConfig(DatabaseConfig databaseConfig) { + "\" is not applicable to Cloud Storage and will be ignored."); } - parallelUploadBlockSizeInBytes = - getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null); + uploadChunkSizeBytes = getInt(databaseConfig.getProperties(), UPLOAD_CHUNK_SIZE_BYTES, null); } @Override @@ -98,7 +97,7 @@ public Credentials getCredentials() { } } - public Optional getParallelUploadBlockSizeInBytes() { - return Optional.ofNullable(parallelUploadBlockSizeInBytes); + public Optional getUploadChunkSizeBytes() { + return Optional.ofNullable(uploadChunkSizeBytes); } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java index ab89310ed7..d24d622464 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java @@ -31,7 +31,7 @@ public class CloudStorageWrapper implements ObjectStorageWrapper { private final Storage storage; private final String bucket; - private final Integer parallelUploadBlockSizeInBytes; + private final Integer uploadChunkSizeBytes; public CloudStorageWrapper(CloudStorageConfig config) { storage = @@ -41,7 +41,7 @@ public CloudStorageWrapper(CloudStorageConfig config) { .build() .getService(); bucket = config.getBucket(); - parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null); + uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null); } @VisibleForTesting @@ -49,7 +49,7 @@ public CloudStorageWrapper(CloudStorageConfig config) { public CloudStorageWrapper(CloudStorageConfig config, Storage storage) { this.storage = storage; this.bucket = config.getBucket(); - parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null); + uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null); } @Override @@ -209,8 +209,8 @@ private void writeData(String key, String object, Storage.BlobWriteOption precon BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucket, key)).build(); try (WriteChannel writer = storage.writer(blobInfo, precondition)) { - if (parallelUploadBlockSizeInBytes != null) { - writer.setChunkSize(parallelUploadBlockSizeInBytes); + if (uploadChunkSizeBytes != null) { + writer.setChunkSize(uploadChunkSizeBytes); } ByteBuffer buffer = ByteBuffer.wrap(data); while (buffer.hasRemaining()) { diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java index 3312f92a22..89095dd5cc 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java @@ -18,13 +18,13 @@ public class S3Config implements ObjectStorageConfig { public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + "."; public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace"; - public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = - PREFIX + "parallel_upload_block_size_in_bytes"; - public static final String PARALLEL_UPLOAD_MAX_PARALLELISM = - PREFIX + "parallel_upload_max_parallelism"; - public static final String PARALLEL_UPLOAD_THRESHOLD_IN_BYTES = - PREFIX + "parallel_upload_threshold_in_bytes"; - public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds"; + public static final String MULTIPART_UPLOAD_PART_SIZE_BYTES = + PREFIX + "multipart_upload_part_size_bytes"; + public static final String MULTIPART_UPLOAD_MAX_CONCURRENCY = + PREFIX + "multipart_upload_max_concurrency"; + public static final String MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES = + PREFIX + "multipart_upload_threshold_size_bytes"; + public static final String REQUEST_TIMEOUT_SECS = PREFIX + "request_timeout_secs"; private static final Logger logger = LoggerFactory.getLogger(S3Config.class); private final String username; @@ -33,10 +33,10 @@ public class S3Config implements ObjectStorageConfig { private final String metadataNamespace; private final String region; - private final Long parallelUploadBlockSizeInBytes; - private final Integer parallelUploadMaxParallelism; - private final Long parallelUploadThresholdInBytes; - private final Integer requestTimeoutInSeconds; + private final Long multipartUploadPartSizeBytes; + private final Integer multipartUploadMaxConcurrency; + private final Long multipartUploadThresholdSizeBytes; + private final Integer requestTimeoutSecs; public S3Config(DatabaseConfig databaseConfig) { String storage = databaseConfig.getStorage(); @@ -73,14 +73,13 @@ public S3Config(DatabaseConfig databaseConfig) { + "\" is not applicable to S3 and will be ignored."); } - parallelUploadBlockSizeInBytes = - getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null); - parallelUploadMaxParallelism = - getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_PARALLELISM, null); - parallelUploadThresholdInBytes = - getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, null); - requestTimeoutInSeconds = - getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_IN_SECONDS, null); + multipartUploadPartSizeBytes = + getLong(databaseConfig.getProperties(), MULTIPART_UPLOAD_PART_SIZE_BYTES, null); + multipartUploadMaxConcurrency = + getInt(databaseConfig.getProperties(), MULTIPART_UPLOAD_MAX_CONCURRENCY, null); + multipartUploadThresholdSizeBytes = + getLong(databaseConfig.getProperties(), MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, null); + requestTimeoutSecs = getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_SECS, null); } @Override @@ -111,19 +110,19 @@ public String getUsername() { return username; } - public Optional getParallelUploadBlockSizeInBytes() { - return Optional.ofNullable(parallelUploadBlockSizeInBytes); + public Optional getMultipartUploadPartSizeBytes() { + return Optional.ofNullable(multipartUploadPartSizeBytes); } - public Optional getParallelUploadMaxParallelism() { - return Optional.ofNullable(parallelUploadMaxParallelism); + public Optional getMultipartUploadMaxConcurrency() { + return Optional.ofNullable(multipartUploadMaxConcurrency); } - public Optional getParallelUploadThresholdInBytes() { - return Optional.ofNullable(parallelUploadThresholdInBytes); + public Optional getMultipartUploadThresholdSizeBytes() { + return Optional.ofNullable(multipartUploadThresholdSizeBytes); } - public Optional getRequestTimeoutInSeconds() { - return Optional.ofNullable(requestTimeoutInSeconds); + public Optional getRequestTimeoutSecs() { + return Optional.ofNullable(requestTimeoutSecs); } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java index 2f7342d2b4..2c3e480ecd 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java @@ -43,23 +43,26 @@ public class S3Wrapper implements ObjectStorageWrapper { public S3Wrapper(S3Config config) { AwsCrtAsyncHttpClient.Builder httpClientBuilder = AwsCrtAsyncHttpClient.builder(); - if (config.getParallelUploadMaxParallelism().isPresent()) { - httpClientBuilder.maxConcurrency(config.getParallelUploadMaxParallelism().get()); + if (config.getMultipartUploadMaxConcurrency().isPresent()) { + httpClientBuilder.maxConcurrency(config.getMultipartUploadMaxConcurrency().get()); } MultipartConfiguration.Builder multipartConfigBuilder = MultipartConfiguration.builder(); - if (config.getParallelUploadBlockSizeInBytes().isPresent()) { - multipartConfigBuilder.minimumPartSizeInBytes( - config.getParallelUploadBlockSizeInBytes().get()); + if (config.getMultipartUploadPartSizeBytes().isPresent()) { + multipartConfigBuilder.minimumPartSizeInBytes(config.getMultipartUploadPartSizeBytes().get()); + } + if (config.getMultipartUploadThresholdSizeBytes().isPresent()) { + multipartConfigBuilder.thresholdInBytes(config.getMultipartUploadThresholdSizeBytes().get()); } ClientOverrideConfiguration.Builder overrideConfigBuilder = ClientOverrideConfiguration.builder(); - if (config.getRequestTimeoutInSeconds().isPresent()) { + if (config.getRequestTimeoutSecs().isPresent()) { overrideConfigBuilder.apiCallTimeout( - Duration.ofSeconds(config.getRequestTimeoutInSeconds().get())); + Duration.ofSeconds(config.getRequestTimeoutSecs().get())); } this.client = S3AsyncClient.builder() + .multipartEnabled(true) .region(Region.of(config.getRegion())) .credentialsProvider( StaticCredentialsProvider.create( diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfigTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfigTest.java index d5148d7824..478eda14d0 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfigTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfigTest.java @@ -15,10 +15,10 @@ public class BlobStorageConfigTest { private static final String ANY_CONTACT_POINT = ANY_ENDPOINT + "/" + ANY_BUCKET; private static final String BLOB_STORAGE = "blob-storage"; private static final String ANY_TABLE_METADATA_NAMESPACE = "any_namespace"; - private static final String ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = "5242880"; // 5MB - private static final String ANY_PARALLEL_UPLOAD_MAX_PARALLELISM = "4"; - private static final String ANY_PARALLEL_UPLOAD_THRESHOLD_IN_BYTES = "10485760"; // 10MB - private static final String ANY_REQUEST_TIMEOUT_IN_SECONDS = "30"; + private static final String ANY_PARALLEL_UPLOAD_BLOCK_SIZE_BYTES = "5242880"; // 5MB + private static final String ANY_PARALLEL_UPLOAD_MAX_CONCURRENCY = "4"; + private static final String ANY_PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES = "10485760"; // 10MB + private static final String ANY_REQUEST_TIMEOUT_SECS = "30"; @Test public void constructor_AllPropertiesGiven_ShouldLoadProperly() { @@ -30,14 +30,13 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { props.setProperty(DatabaseConfig.STORAGE, BLOB_STORAGE); props.setProperty(BlobStorageConfig.TABLE_METADATA_NAMESPACE, ANY_TABLE_METADATA_NAMESPACE); props.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, - ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES); + BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, ANY_PARALLEL_UPLOAD_BLOCK_SIZE_BYTES); props.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, ANY_PARALLEL_UPLOAD_MAX_PARALLELISM); + BlobStorageConfig.PARALLEL_UPLOAD_MAX_CONCURRENCY, ANY_PARALLEL_UPLOAD_MAX_CONCURRENCY); props.setProperty( - BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, - ANY_PARALLEL_UPLOAD_THRESHOLD_IN_BYTES); - props.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, ANY_REQUEST_TIMEOUT_IN_SECONDS); + BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, + ANY_PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES); + props.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_SECS, ANY_REQUEST_TIMEOUT_SECS); // Act BlobStorageConfig config = new BlobStorageConfig(new DatabaseConfig(props)); @@ -48,14 +47,18 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); assertThat(config.getBucket()).isEqualTo(ANY_BUCKET); assertThat(config.getMetadataNamespace()).isEqualTo(ANY_TABLE_METADATA_NAMESPACE); - assertThat(config.getParallelUploadBlockSizeInBytes()).isNotEmpty(); - assertThat(config.getParallelUploadBlockSizeInBytes().get()).isEqualTo(5242880); - assertThat(config.getParallelUploadMaxParallelism()).isNotEmpty(); - assertThat(config.getParallelUploadMaxParallelism().get()).isEqualTo(4); - assertThat(config.getParallelUploadThresholdInBytes()).isNotEmpty(); - assertThat(config.getParallelUploadThresholdInBytes().get()).isEqualTo(10485760); - assertThat(config.getRequestTimeoutInSeconds()).isNotEmpty(); - assertThat(config.getRequestTimeoutInSeconds().get()).isEqualTo(30); + assertThat(config.getParallelUploadBlockSizeBytes()).isNotEmpty(); + assertThat(config.getParallelUploadBlockSizeBytes().get()) + .isEqualTo(Long.parseLong(ANY_PARALLEL_UPLOAD_BLOCK_SIZE_BYTES)); + assertThat(config.getParallelUploadMaxConcurrency()).isNotEmpty(); + assertThat(config.getParallelUploadMaxConcurrency().get()) + .isEqualTo(Integer.parseInt(ANY_PARALLEL_UPLOAD_MAX_CONCURRENCY)); + assertThat(config.getParallelUploadThresholdSizeBytes()).isNotEmpty(); + assertThat(config.getParallelUploadThresholdSizeBytes().get()) + .isEqualTo(Long.parseLong(ANY_PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES)); + assertThat(config.getRequestTimeoutSecs()).isNotEmpty(); + assertThat(config.getRequestTimeoutSecs().get()) + .isEqualTo(Integer.parseInt(ANY_REQUEST_TIMEOUT_SECS)); } @Test @@ -77,10 +80,10 @@ public void constructor_PropertiesWithoutNonMandatoryOptionsGiven_ShouldLoadProp assertThat(config.getBucket()).isEqualTo(ANY_BUCKET); assertThat(config.getMetadataNamespace()) .isEqualTo(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME); - assertThat(config.getParallelUploadBlockSizeInBytes()).isEmpty(); - assertThat(config.getParallelUploadMaxParallelism()).isEmpty(); - assertThat(config.getParallelUploadThresholdInBytes()).isEmpty(); - assertThat(config.getRequestTimeoutInSeconds()).isEmpty(); + assertThat(config.getParallelUploadBlockSizeBytes()).isEmpty(); + assertThat(config.getParallelUploadMaxConcurrency()).isEmpty(); + assertThat(config.getParallelUploadThresholdSizeBytes()).isEmpty(); + assertThat(config.getRequestTimeoutSecs()).isEmpty(); } @Test diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java index fd83e46dda..ea37b5e1fc 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java @@ -14,7 +14,7 @@ public class CloudStorageConfigTest { private static final String ANY_CONTACT_POINT = ANY_BUCKET; private static final String CloudStorage_STORAGE = "cloud-storage"; private static final String ANY_TABLE_METADATA_NAMESPACE = "any_namespace"; - private static final String ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = "5242880"; // 5MB + private static final String ANY_UPLOAD_CHUNK_SIZE_BYTES = "5242880"; // 5MB @Test public void constructor_AllPropertiesGiven_ShouldLoadProperly() { @@ -25,9 +25,7 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { props.setProperty(DatabaseConfig.PASSWORD, ANY_PASSWORD); props.setProperty(DatabaseConfig.STORAGE, CloudStorage_STORAGE); props.setProperty(CloudStorageConfig.TABLE_METADATA_NAMESPACE, ANY_TABLE_METADATA_NAMESPACE); - props.setProperty( - CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, - ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES); + props.setProperty(CloudStorageConfig.UPLOAD_CHUNK_SIZE_BYTES, ANY_UPLOAD_CHUNK_SIZE_BYTES); // Act CloudStorageConfig config = new CloudStorageConfig(new DatabaseConfig(props)); @@ -37,8 +35,9 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { assertThat(config.getBucket()).isEqualTo(ANY_BUCKET); assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); assertThat(config.getMetadataNamespace()).isEqualTo(ANY_TABLE_METADATA_NAMESPACE); - assertThat(config.getParallelUploadBlockSizeInBytes()).isNotEmpty(); - assertThat(config.getParallelUploadBlockSizeInBytes().get()).isEqualTo(5242880); + assertThat(config.getUploadChunkSizeBytes()).isNotEmpty(); + assertThat(config.getUploadChunkSizeBytes().get()) + .isEqualTo(Integer.parseInt(ANY_UPLOAD_CHUNK_SIZE_BYTES)); } @Test @@ -59,7 +58,7 @@ public void constructor_PropertiesWithoutNonMandatoryOptionsGiven_ShouldLoadProp assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); assertThat(config.getMetadataNamespace()) .isEqualTo(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME); - assertThat(config.getParallelUploadBlockSizeInBytes()).isEmpty(); + assertThat(config.getUploadChunkSizeBytes()).isEmpty(); } @Test diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java index 62e0f5de98..1a52b4258c 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java @@ -54,7 +54,7 @@ public void setUp() throws Exception { when(config.getMetadataNamespace()).thenReturn(METADATA_NAMESPACE); when(config.getProjectId()).thenReturn(PROJECT_ID); when(config.getBucket()).thenReturn(BUCKET); - when(config.getParallelUploadBlockSizeInBytes()).thenReturn(Optional.empty()); + when(config.getUploadChunkSizeBytes()).thenReturn(Optional.empty()); wrapper = new CloudStorageWrapper(config, storage); } diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3ConfigTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3ConfigTest.java index f1ddff8d8d..8f65a451d6 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3ConfigTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3ConfigTest.java @@ -15,10 +15,10 @@ public class S3ConfigTest { private static final String ANY_CONTACT_POINT = ANY_REGION + "/" + ANY_BUCKET; private static final String S3_STORAGE = "s3"; private static final String ANY_TABLE_METADATA_NAMESPACE = "any_namespace"; - private static final String ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = "5242880"; // 5MB - private static final String ANY_PARALLEL_UPLOAD_MAX_PARALLELISM = "4"; - private static final String ANY_PARALLEL_UPLOAD_THRESHOLD_IN_BYTES = "10485760"; // 10MB - private static final String ANY_REQUEST_TIMEOUT_IN_SECONDS = "30"; + private static final String ANY_MULTIPART_UPLOAD_PART_SIZE_BYTES = "5242880"; // 5MB + private static final String ANY_MULTIPART_UPLOAD_MAX_CONCURRENCY = "4"; + private static final String ANY_MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES = "10485760"; // 10MB + private static final String ANY_REQUEST_TIMEOUT_SECS = "30"; @Test public void constructor_AllPropertiesGiven_ShouldLoadProperly() { @@ -30,12 +30,12 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { props.setProperty(DatabaseConfig.STORAGE, S3_STORAGE); props.setProperty(S3Config.TABLE_METADATA_NAMESPACE, ANY_TABLE_METADATA_NAMESPACE); props.setProperty( - S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES); + S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, ANY_MULTIPART_UPLOAD_PART_SIZE_BYTES); props.setProperty( - S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, ANY_PARALLEL_UPLOAD_MAX_PARALLELISM); + S3Config.MULTIPART_UPLOAD_MAX_CONCURRENCY, ANY_MULTIPART_UPLOAD_MAX_CONCURRENCY); props.setProperty( - S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, ANY_PARALLEL_UPLOAD_THRESHOLD_IN_BYTES); - props.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, ANY_REQUEST_TIMEOUT_IN_SECONDS); + S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, ANY_MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES); + props.setProperty(S3Config.REQUEST_TIMEOUT_SECS, ANY_REQUEST_TIMEOUT_SECS); // Act S3Config config = new S3Config(new DatabaseConfig(props)); @@ -46,14 +46,18 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { assertThat(config.getUsername()).isEqualTo(ANY_USERNAME); assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); assertThat(config.getMetadataNamespace()).isEqualTo(ANY_TABLE_METADATA_NAMESPACE); - assertThat(config.getParallelUploadBlockSizeInBytes()).isNotEmpty(); - assertThat(config.getParallelUploadBlockSizeInBytes().get()).isEqualTo(5242880); - assertThat(config.getParallelUploadMaxParallelism()).isNotEmpty(); - assertThat(config.getParallelUploadMaxParallelism().get()).isEqualTo(4); - assertThat(config.getParallelUploadThresholdInBytes()).isNotEmpty(); - assertThat(config.getParallelUploadThresholdInBytes().get()).isEqualTo(10485760); - assertThat(config.getRequestTimeoutInSeconds()).isNotEmpty(); - assertThat(config.getRequestTimeoutInSeconds().get()).isEqualTo(30); + assertThat(config.getMultipartUploadPartSizeBytes()).isNotEmpty(); + assertThat(config.getMultipartUploadPartSizeBytes().get()) + .isEqualTo(Long.parseLong(ANY_MULTIPART_UPLOAD_PART_SIZE_BYTES)); + assertThat(config.getMultipartUploadMaxConcurrency()).isNotEmpty(); + assertThat(config.getMultipartUploadMaxConcurrency().get()) + .isEqualTo(Integer.parseInt(ANY_MULTIPART_UPLOAD_MAX_CONCURRENCY)); + assertThat(config.getMultipartUploadThresholdSizeBytes()).isNotEmpty(); + assertThat(config.getMultipartUploadThresholdSizeBytes().get()) + .isEqualTo(Long.parseLong(ANY_MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES)); + assertThat(config.getRequestTimeoutSecs()).isNotEmpty(); + assertThat(config.getRequestTimeoutSecs().get()) + .isEqualTo(Integer.parseInt(ANY_REQUEST_TIMEOUT_SECS)); } @Test @@ -75,10 +79,10 @@ public void constructor_PropertiesWithoutNonMandatoryOptionsGiven_ShouldLoadProp assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); assertThat(config.getMetadataNamespace()) .isEqualTo(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME); - assertThat(config.getParallelUploadBlockSizeInBytes()).isEmpty(); - assertThat(config.getParallelUploadMaxParallelism()).isEmpty(); - assertThat(config.getParallelUploadThresholdInBytes()).isEmpty(); - assertThat(config.getRequestTimeoutInSeconds()).isEmpty(); + assertThat(config.getMultipartUploadPartSizeBytes()).isEmpty(); + assertThat(config.getMultipartUploadMaxConcurrency()).isEmpty(); + assertThat(config.getMultipartUploadThresholdSizeBytes()).isEmpty(); + assertThat(config.getRequestTimeoutSecs()).isEmpty(); } @Test