Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class NativeMetadataJavaClient implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(NativeMetadataJavaClient.class);

private final long timeout;
private long timeout;
private long bufferSize;

private long largeBufferSize;
Expand Down Expand Up @@ -229,44 +229,64 @@ private void initialize() {


public JniWrapper executeQuery(Integer queryType, List<String> params) {
getReadLock();
final CompletableFuture<Integer> future = new CompletableFuture<>();
try {
getLibLakeSoulMetaData().execute_query(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params),
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
);
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
if (len < 0) return null;
byte[] bytes = new byte[len];
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
sharedBuffer.get(0, bytes, 0, len);
else
largeSharedBuffer.get(0, bytes, 0, len);
try {
return JniWrapper.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
getReadLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();
getLibLakeSoulMetaData().execute_query(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params),
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
);
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
if (len < 0) return null;
byte[] bytes = new byte[len];
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
sharedBuffer.get(0, bytes, 0, len);
else
largeSharedBuffer.get(0, bytes, 0, len);
return JniWrapper.parseFrom(bytes);
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Query {} with {} timeout", queryType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Query {} with {} timeout", queryType, params);
throw new RuntimeException(e);
} finally {
unlockReadLock();
}
return null;
}

private void enlargeBufferAndTimeout() {
bufferSize *= 2;
largeBufferSize *= 2;
sharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(bufferSize);
largeSharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(largeBufferSize);
timeout += 5000L;
}

private void getReadLock() {
Expand All @@ -287,102 +307,150 @@ private void unlockWriteLock() {


public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
getWriteLock();
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

byte[] bytes = jniWrapper.toByteArray();
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
sharedBuffer.put(0, bytes, 0, bytes.length);
else
largeSharedBuffer.put(0, bytes, 0, bytes.length);

getLibLakeSoulMetaData().execute_insert(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
bytes.length
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
throw new RuntimeException(e);
getWriteLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

byte[] bytes = jniWrapper.toByteArray();
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
sharedBuffer.put(0, bytes, 0, bytes.length);
else
largeSharedBuffer.put(0, bytes, 0, bytes.length);

getLibLakeSoulMetaData().execute_insert(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
bytes.length
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockWriteLock();
}
return -1;
}

public Integer executeUpdate(Integer updateType, List<String> params) {
getWriteLock();
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_update(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
getWriteLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<Integer> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_update(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockWriteLock();
}
return -1;
}

public List<String> executeQueryScalar(Integer updateType, List<String> params) {
getReadLock();
public List<String> executeQueryScalar(Integer queryScalarType, List<String> params) {
try {
final CompletableFuture<String> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_query_scalar(
new ReferencedStringCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
String result = future.get(timeout, TimeUnit.MILLISECONDS);
if (result.isEmpty()) return Collections.emptyList();
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.error("Execute Update {} with {} timeout", updateType, params);
throw new RuntimeException(e);
getReadLock();
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
while (retryCounter >= 0) {
try {
final CompletableFuture<String> future = new CompletableFuture<>();

getLibLakeSoulMetaData().execute_query_scalar(
new ReferencedStringCallback((result, msg) -> {
if (msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
}
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryScalarType,
String.join(PARAM_DELIM, params)
);
String result = future.get(timeout, TimeUnit.MILLISECONDS);
if (result.isEmpty()) return Collections.emptyList();
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
if (retryCounter == 0) {
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
} catch (TimeoutException e) {
if (retryCounter == 0) {
LOG.error("Execute QueryScalar {} with {} timeout", queryScalarType, params);
throw new RuntimeException(e);
} else {
enlargeBufferAndTimeout();
retryCounter--;
}
}
}
} finally {
unlockReadLock();
}
return Collections.emptyList();
}

public static Integer insert(NativeUtils.CodedDaoType insertType, JniWrapper jniWrapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class NativeUtils {

public static boolean NATIVE_METADATA_UPDATE_ENABLED = true;

public static int NATIVE_METADATA_MAX_RETRY_ATTEMPTS = 3;

public static int DAO_TYPE_QUERY_ONE_OFFSET = 0;
public static int DAO_TYPE_QUERY_LIST_OFFSET = 100;
public static int DAO_TYPE_INSERT_ONE_OFFSET = 200;
Expand Down