diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java index 83581b1cb..697a9aebd 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java @@ -32,7 +32,7 @@ public class NativeMetadataJavaClient implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NativeMetadataJavaClient.class); - private final long timeout; + private long timeout; private long bufferSize; private long largeBufferSize; @@ -229,44 +229,64 @@ private void initialize() { public JniWrapper executeQuery(Integer queryType, List params) { - getReadLock(); - final CompletableFuture future = new CompletableFuture<>(); try { - getLibLakeSoulMetaData().execute_query( - new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { - future.complete(result); - } else { - future.completeExceptionally(new SQLException(msg)); - } - }, getIntegerCallbackObjectReferenceManager()), - tokioRuntime, - tokioPostgresClient, - preparedStatement, - queryType, - String.join(PARAM_DELIM, params), - queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address() - ); - Integer len = future.get(timeout, TimeUnit.MILLISECONDS); - if (len < 0) return null; - byte[] bytes = new byte[len]; - if (queryType < DAO_TYPE_QUERY_LIST_OFFSET) - sharedBuffer.get(0, bytes, 0, len); - else - largeSharedBuffer.get(0, bytes, 0, len); - try { - return JniWrapper.parseFrom(bytes); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); + getReadLock(); + int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS; + while (retryCounter >= 0) { + try { + final CompletableFuture future = new CompletableFuture<>(); + getLibLakeSoulMetaData().execute_query( + new ReferencedIntegerCallback((result, msg) -> { + if (msg.isEmpty()) { + future.complete(result); + } else { + future.completeExceptionally(new SQLException(msg)); + } + }, getIntegerCallbackObjectReferenceManager()), + tokioRuntime, + tokioPostgresClient, + preparedStatement, + queryType, + String.join(PARAM_DELIM, params), + queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address() + ); + Integer len = future.get(timeout, TimeUnit.MILLISECONDS); + if (len < 0) return null; + byte[] bytes = new byte[len]; + if (queryType < DAO_TYPE_QUERY_LIST_OFFSET) + sharedBuffer.get(0, bytes, 0, len); + else + largeSharedBuffer.get(0, bytes, 0, len); + return JniWrapper.parseFrom(bytes); + } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { + if (retryCounter == 0) { + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } catch (TimeoutException e) { + if (retryCounter == 0) { + LOG.error("Execute Query {} with {} timeout", queryType, params); + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } } - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - LOG.error("Execute Query {} with {} timeout", queryType, params); - throw new RuntimeException(e); } finally { unlockReadLock(); } + return null; + } + + private void enlargeBufferAndTimeout() { + bufferSize *= 2; + largeBufferSize *= 2; + sharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(bufferSize); + largeSharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(largeBufferSize); + timeout += 5000L; } private void getReadLock() { @@ -287,102 +307,150 @@ private void unlockWriteLock() { public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) { - getWriteLock(); try { - final CompletableFuture future = new CompletableFuture<>(); - - byte[] bytes = jniWrapper.toByteArray(); - if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET) - sharedBuffer.put(0, bytes, 0, bytes.length); - else - largeSharedBuffer.put(0, bytes, 0, bytes.length); - - getLibLakeSoulMetaData().execute_insert( - new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { - future.complete(result); - } else { - future.completeExceptionally(new SQLException(msg)); - } - }, getIntegerCallbackObjectReferenceManager()), - tokioRuntime, - tokioPostgresClient, - preparedStatement, - insertType, - insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(), - bytes.length - ); - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper); - throw new RuntimeException(e); + getWriteLock(); + int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS; + while (retryCounter >= 0) { + try { + final CompletableFuture future = new CompletableFuture<>(); + + byte[] bytes = jniWrapper.toByteArray(); + if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET) + sharedBuffer.put(0, bytes, 0, bytes.length); + else + largeSharedBuffer.put(0, bytes, 0, bytes.length); + + getLibLakeSoulMetaData().execute_insert( + new ReferencedIntegerCallback((result, msg) -> { + if (msg.isEmpty()) { + future.complete(result); + } else { + future.completeExceptionally(new SQLException(msg)); + } + }, getIntegerCallbackObjectReferenceManager()), + tokioRuntime, + tokioPostgresClient, + preparedStatement, + insertType, + insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(), + bytes.length + ); + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + if (retryCounter == 0) { + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } catch (TimeoutException e) { + if (retryCounter == 0) { + LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper); + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } + } } finally { unlockWriteLock(); } + return -1; } public Integer executeUpdate(Integer updateType, List params) { - getWriteLock(); try { - final CompletableFuture future = new CompletableFuture<>(); - - getLibLakeSoulMetaData().execute_update( - new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { - future.complete(result); - } else { - future.completeExceptionally(new SQLException(msg)); - } - }, getIntegerCallbackObjectReferenceManager()), - tokioRuntime, - tokioPostgresClient, - preparedStatement, - updateType, - String.join(PARAM_DELIM, params) - ); - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - LOG.error("Execute Update {} with {} timeout", updateType, params); - throw new RuntimeException(e); + getWriteLock(); + int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS; + while (retryCounter >= 0) { + try { + final CompletableFuture future = new CompletableFuture<>(); + + getLibLakeSoulMetaData().execute_update( + new ReferencedIntegerCallback((result, msg) -> { + if (msg.isEmpty()) { + future.complete(result); + } else { + future.completeExceptionally(new SQLException(msg)); + } + }, getIntegerCallbackObjectReferenceManager()), + tokioRuntime, + tokioPostgresClient, + preparedStatement, + updateType, + String.join(PARAM_DELIM, params) + ); + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + if (retryCounter == 0) { + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } catch (TimeoutException e) { + if (retryCounter == 0) { + LOG.error("Execute Update {} with {} timeout", updateType, params); + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } + } } finally { unlockWriteLock(); } + return -1; } - public List executeQueryScalar(Integer updateType, List params) { - getReadLock(); + public List executeQueryScalar(Integer queryScalarType, List params) { try { - final CompletableFuture future = new CompletableFuture<>(); - - getLibLakeSoulMetaData().execute_query_scalar( - new ReferencedStringCallback((result, msg) -> { - if (msg.isEmpty()) { - future.complete(result); - } else { - future.completeExceptionally(new SQLException(msg)); - } - }, getStringCallbackObjectReferenceManager()), - tokioRuntime, - tokioPostgresClient, - preparedStatement, - updateType, - String.join(PARAM_DELIM, params) - ); - String result = future.get(timeout, TimeUnit.MILLISECONDS); - if (result.isEmpty()) return Collections.emptyList(); - return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - LOG.error("Execute Update {} with {} timeout", updateType, params); - throw new RuntimeException(e); + getReadLock(); + int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS; + while (retryCounter >= 0) { + try { + final CompletableFuture future = new CompletableFuture<>(); + + getLibLakeSoulMetaData().execute_query_scalar( + new ReferencedStringCallback((result, msg) -> { + if (msg.isEmpty()) { + future.complete(result); + } else { + future.completeExceptionally(new SQLException(msg)); + } + }, getStringCallbackObjectReferenceManager()), + tokioRuntime, + tokioPostgresClient, + preparedStatement, + queryScalarType, + String.join(PARAM_DELIM, params) + ); + String result = future.get(timeout, TimeUnit.MILLISECONDS); + if (result.isEmpty()) return Collections.emptyList(); + return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (retryCounter == 0) { + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } catch (TimeoutException e) { + if (retryCounter == 0) { + LOG.error("Execute QueryScalar {} with {} timeout", queryScalarType, params); + throw new RuntimeException(e); + } else { + enlargeBufferAndTimeout(); + retryCounter--; + } + } + } } finally { unlockReadLock(); } + return Collections.emptyList(); } public static Integer insert(NativeUtils.CodedDaoType insertType, JniWrapper jniWrapper) { diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java index 1b63eae89..db681a51a 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java @@ -10,6 +10,8 @@ public class NativeUtils { public static boolean NATIVE_METADATA_UPDATE_ENABLED = true; + public static int NATIVE_METADATA_MAX_RETRY_ATTEMPTS = 3; + public static int DAO_TYPE_QUERY_ONE_OFFSET = 0; public static int DAO_TYPE_QUERY_LIST_OFFSET = 100; public static int DAO_TYPE_INSERT_ONE_OFFSET = 200;