Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package com.scalar.db.storage.objectstorage;

import static org.assertj.core.api.Assertions.assertThat;

import com.scalar.db.api.DistributedStorageIntegrationTestBase;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.DataType;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class ObjectStorageIntegrationTest extends DistributedStorageIntegrationTestBase {

Expand Down Expand Up @@ -57,4 +66,25 @@ public void scan_ScanGivenForIndexedColumn_ShouldScan() {}
@Override
@Disabled("Object Storage does not support index-related operations")
public void scan_ScanGivenForNonIndexedColumn_ShouldThrowIllegalArgumentException() {}

@Test
public void put_PutWithLargeTextValue_ShouldStoreProperly() throws ExecutionException {
// Arrange
int payloadSizeBytes = 20000000; // The default string length limit of Jackson
char[] chars = new char[payloadSizeBytes + 1];
Arrays.fill(chars, 'a');
String largeText = new String(chars);
Put put = preparePuts().get(0);
put = Put.newBuilder(put).textValue(getColumnName2(), largeText).build();

// Act
storage.put(put);

// Assert
Get get = prepareGet(0, 0);
Optional<Result> actual = storage.get(get);
assertThat(actual.isPresent()).isTrue();
assertThat(actual.get().contains(getColumnName2())).isTrue();
assertThat(actual.get().getText(getColumnName2())).isEqualTo(largeText);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/com/scalar/db/common/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,12 @@ public enum CoreError implements ScalarDbError {
+ "you may be able to adjust the settings to enable consistent reads. Please refer to the storage configuration for details. Storage: %s",
"",
""),
OBJECT_STORAGE_EXCEEDS_MAX_VALUE_LENGTH_ALLOWED(
Category.USER_ERROR,
"0279",
"The size of a single column value exceeds the maximum allowed size of %d bytes. Column: %s; Size: %d bytes",
"",
""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,72 @@
import com.scalar.db.io.TimeColumn;
import com.scalar.db.io.TimestampColumn;
import com.scalar.db.io.TimestampTZColumn;
import java.nio.ByteBuffer;

public class ObjectStorageOperationChecker extends OperationChecker {
private static final char[] ILLEGAL_CHARACTERS_IN_PRIMARY_KEY = {
ObjectStorageUtils.OBJECT_KEY_DELIMITER, ObjectStorageUtils.CONCATENATED_KEY_DELIMITER,
};

private static final ColumnVisitor COMMON_COLUMN_CHECKER =
new ColumnVisitor() {
@Override
public void visit(BooleanColumn column) {}

@Override
public void visit(IntColumn column) {}

@Override
public void visit(BigIntColumn column) {}

@Override
public void visit(FloatColumn column) {}

@Override
public void visit(DoubleColumn column) {}

@Override
public void visit(TextColumn column) {
String value = column.getTextValue();
if (value == null) {
return;
}

if (Serializer.MAX_STRING_LENGTH_ALLOWED < value.length()) {
throw new IllegalArgumentException(
CoreError.OBJECT_STORAGE_EXCEEDS_MAX_VALUE_LENGTH_ALLOWED.buildMessage(
Serializer.MAX_STRING_LENGTH_ALLOWED, column.getName(), value.length()));
}
}

@Override
public void visit(BlobColumn column) {
ByteBuffer buffer = column.getBlobValue();
if (buffer == null) {
return;
}
// Calculate the length after Base64 encoding.
int base64EncodedLength = ((buffer.capacity() + 2) / 3) * 4;
if (Serializer.MAX_STRING_LENGTH_ALLOWED < base64EncodedLength) {
throw new IllegalArgumentException(
CoreError.OBJECT_STORAGE_EXCEEDS_MAX_VALUE_LENGTH_ALLOWED.buildMessage(
Serializer.MAX_STRING_LENGTH_ALLOWED, column.getName(), base64EncodedLength));
}
}

@Override
public void visit(DateColumn column) {}

@Override
public void visit(TimeColumn column) {}

@Override
public void visit(TimestampColumn column) {}

@Override
public void visit(TimestampTZColumn column) {}
};

private static final ColumnVisitor PRIMARY_KEY_COLUMN_CHECKER =
new ColumnVisitor() {
@Override
Expand Down Expand Up @@ -104,6 +164,7 @@ public void check(Scan scan) throws ExecutionException {
@Override
public void check(Put put) throws ExecutionException {
super.check(put);
put.getColumns().values().forEach(column -> column.accept(COMMON_COLUMN_CHECKER));
checkPrimaryKey(put);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.scalar.db.storage.objectstorage;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

public class Serializer {
public static final Integer MAX_STRING_LENGTH_ALLOWED = Integer.MAX_VALUE;
private static final ObjectMapper mapper = new ObjectMapper();

static {
mapper
.getFactory()
.setStreamReadConstraints(
StreamReadConstraints.builder().maxStringLength(MAX_STRING_LENGTH_ALLOWED).build());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false);
mapper.registerModule(new JavaTimeModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.DataType;
import com.scalar.db.io.Key;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -807,6 +809,148 @@ public void check_ForMutationsWithDeleteWithCondition_ShouldBehaveProperly()
.doesNotThrowAnyException();
}

@Test
public void check_PutGiven_WhenTextColumnLengthIsWithinLimit_ShouldNotThrowException()
throws Exception {
// Arrange
when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1);

// Temporarily set MAX_STRING_LENGTH_ALLOWED to a small value for testing
Field field = Serializer.class.getDeclaredField("MAX_STRING_LENGTH_ALLOWED");
field.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

Integer originalValue = (Integer) field.get(null);
try {
field.set(null, 10);

Put put =
Put.newBuilder()
.namespace(NAMESPACE_NAME)
.table(TABLE_NAME)
.partitionKey(Key.ofInt(PKEY1, 0))
.clusteringKey(Key.ofInt(CKEY1, 0))
.textValue(COL3, "1234567890") // 10 characters, exactly at the limit
.build();

// Act Assert
assertThatCode(() -> operationChecker.check(put)).doesNotThrowAnyException();
} finally {
field.set(null, originalValue);
}
}

@Test
public void check_PutGiven_WhenBlobColumnLengthIsWithinLimit_ShouldNotThrowException()
throws Exception {
// Arrange
when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1);

// Temporarily set MAX_STRING_LENGTH_ALLOWED to a small value for testing
Field field = Serializer.class.getDeclaredField("MAX_STRING_LENGTH_ALLOWED");
field.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

Integer originalValue = (Integer) field.get(null);
try {
field.set(null, 10);

// 6 bytes -> Base64 encoded length = ((6 + 2) / 3) * 4 = 8, which is within the limit of 10
byte[] blob = new byte[6];
Put put =
Put.newBuilder()
.namespace(NAMESPACE_NAME)
.table(TABLE_NAME)
.partitionKey(Key.ofInt(PKEY1, 0))
.clusteringKey(Key.ofInt(CKEY1, 0))
.blobValue(COL4, blob)
.build();

// Act Assert
assertThatCode(() -> operationChecker.check(put)).doesNotThrowAnyException();
} finally {
field.set(null, originalValue);
}
}

@Test
public void check_PutGiven_WhenTextColumnExceedsMaxLength_ShouldThrowIllegalArgumentException()
throws Exception {
// Arrange
when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1);

// Temporarily set MAX_STRING_LENGTH_ALLOWED to a small value for testing
Field field = Serializer.class.getDeclaredField("MAX_STRING_LENGTH_ALLOWED");
field.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

Integer originalValue = (Integer) field.get(null);
try {
field.set(null, 10);

Put put =
Put.newBuilder()
.namespace(NAMESPACE_NAME)
.table(TABLE_NAME)
.partitionKey(Key.ofInt(PKEY1, 0))
.clusteringKey(Key.ofInt(CKEY1, 0))
.textValue(COL3, "12345678901") // 11 characters, exceeds limit of 10
.build();

// Act Assert
assertThatThrownBy(() -> operationChecker.check(put))
.isInstanceOf(IllegalArgumentException.class);
} finally {
field.set(null, originalValue);
}
}

@Test
public void check_PutGiven_WhenBlobColumnExceedsMaxLength_ShouldThrowIllegalArgumentException()
throws Exception {
// Arrange
when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1);

// Temporarily set MAX_STRING_LENGTH_ALLOWED to a small value for testing
Field field = Serializer.class.getDeclaredField("MAX_STRING_LENGTH_ALLOWED");
field.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

Integer originalValue = (Integer) field.get(null);
try {
field.set(null, 10);

// 9 bytes -> Base64 encoded length = ((9 + 2) / 3) * 4 = 12, which exceeds limit of 10
byte[] blob = new byte[9];
Put put =
Put.newBuilder()
.namespace(NAMESPACE_NAME)
.table(TABLE_NAME)
.partitionKey(Key.ofInt(PKEY1, 0))
.clusteringKey(Key.ofInt(CKEY1, 0))
.blobValue(COL4, blob)
.build();

// Act Assert
assertThatThrownBy(() -> operationChecker.check(put))
.isInstanceOf(IllegalArgumentException.class);
} finally {
field.set(null, originalValue);
}
}

private Put buildPutWithCondition(MutationCondition condition) {
return Put.newBuilder()
.namespace(NAMESPACE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,7 @@ private void populateRecords() {
puts.forEach(p -> assertThatCode(() -> storage.put(p)).doesNotThrowAnyException());
}

private Get prepareGet(int pKey, int cKey) {
protected Get prepareGet(int pKey, int cKey) {
Key partitionKey = Key.ofInt(getColumnName1(), pKey);
Key clusteringKey = Key.ofInt(getColumnName4(), cKey);
return Get.newBuilder()
Expand All @@ -2407,7 +2407,7 @@ private Get prepareGet(int pKey, int cKey) {
.build();
}

private List<Put> preparePuts() {
protected List<Put> preparePuts() {
List<Put> puts = new ArrayList<>();

IntStream.range(0, 5)
Expand Down