Skip to content

Commit 20b5d71

Browse files
Ceng23333zenghua
andauthored
retry when native metadata client fails (#313)
Signed-off-by: zenghua <huazeng@dmetasoul.com> Co-authored-by: zenghua <huazeng@dmetasoul.com>
1 parent ae7fc43 commit 20b5d71

File tree

2 files changed

+183
-113
lines changed

2 files changed

+183
-113
lines changed

lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java

Lines changed: 181 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class NativeMetadataJavaClient implements AutoCloseable {
3232

3333
private static final Logger LOG = LoggerFactory.getLogger(NativeMetadataJavaClient.class);
3434

35-
private final long timeout;
35+
private long timeout;
3636
private long bufferSize;
3737

3838
private long largeBufferSize;
@@ -229,44 +229,64 @@ private void initialize() {
229229

230230

231231
public JniWrapper executeQuery(Integer queryType, List<String> params) {
232-
getReadLock();
233-
final CompletableFuture<Integer> future = new CompletableFuture<>();
234232
try {
235-
getLibLakeSoulMetaData().execute_query(
236-
new ReferencedIntegerCallback((result, msg) -> {
237-
if (msg.isEmpty()) {
238-
future.complete(result);
239-
} else {
240-
future.completeExceptionally(new SQLException(msg));
241-
}
242-
}, getIntegerCallbackObjectReferenceManager()),
243-
tokioRuntime,
244-
tokioPostgresClient,
245-
preparedStatement,
246-
queryType,
247-
String.join(PARAM_DELIM, params),
248-
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
249-
);
250-
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
251-
if (len < 0) return null;
252-
byte[] bytes = new byte[len];
253-
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
254-
sharedBuffer.get(0, bytes, 0, len);
255-
else
256-
largeSharedBuffer.get(0, bytes, 0, len);
257-
try {
258-
return JniWrapper.parseFrom(bytes);
259-
} catch (InvalidProtocolBufferException e) {
260-
throw new RuntimeException(e);
233+
getReadLock();
234+
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
235+
while (retryCounter >= 0) {
236+
try {
237+
final CompletableFuture<Integer> future = new CompletableFuture<>();
238+
getLibLakeSoulMetaData().execute_query(
239+
new ReferencedIntegerCallback((result, msg) -> {
240+
if (msg.isEmpty()) {
241+
future.complete(result);
242+
} else {
243+
future.completeExceptionally(new SQLException(msg));
244+
}
245+
}, getIntegerCallbackObjectReferenceManager()),
246+
tokioRuntime,
247+
tokioPostgresClient,
248+
preparedStatement,
249+
queryType,
250+
String.join(PARAM_DELIM, params),
251+
queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
252+
);
253+
Integer len = future.get(timeout, TimeUnit.MILLISECONDS);
254+
if (len < 0) return null;
255+
byte[] bytes = new byte[len];
256+
if (queryType < DAO_TYPE_QUERY_LIST_OFFSET)
257+
sharedBuffer.get(0, bytes, 0, len);
258+
else
259+
largeSharedBuffer.get(0, bytes, 0, len);
260+
return JniWrapper.parseFrom(bytes);
261+
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
262+
if (retryCounter == 0) {
263+
throw new RuntimeException(e);
264+
} else {
265+
enlargeBufferAndTimeout();
266+
retryCounter--;
267+
}
268+
} catch (TimeoutException e) {
269+
if (retryCounter == 0) {
270+
LOG.error("Execute Query {} with {} timeout", queryType, params);
271+
throw new RuntimeException(e);
272+
} else {
273+
enlargeBufferAndTimeout();
274+
retryCounter--;
275+
}
276+
}
261277
}
262-
} catch (InterruptedException | ExecutionException e) {
263-
throw new RuntimeException(e);
264-
} catch (TimeoutException e) {
265-
LOG.error("Execute Query {} with {} timeout", queryType, params);
266-
throw new RuntimeException(e);
267278
} finally {
268279
unlockReadLock();
269280
}
281+
return null;
282+
}
283+
284+
private void enlargeBufferAndTimeout() {
285+
bufferSize *= 2;
286+
largeBufferSize *= 2;
287+
sharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(bufferSize);
288+
largeSharedBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(largeBufferSize);
289+
timeout += 5000L;
270290
}
271291

272292
private void getReadLock() {
@@ -287,102 +307,150 @@ private void unlockWriteLock() {
287307

288308

289309
public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
290-
getWriteLock();
291310
try {
292-
final CompletableFuture<Integer> future = new CompletableFuture<>();
293-
294-
byte[] bytes = jniWrapper.toByteArray();
295-
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
296-
sharedBuffer.put(0, bytes, 0, bytes.length);
297-
else
298-
largeSharedBuffer.put(0, bytes, 0, bytes.length);
299-
300-
getLibLakeSoulMetaData().execute_insert(
301-
new ReferencedIntegerCallback((result, msg) -> {
302-
if (msg.isEmpty()) {
303-
future.complete(result);
304-
} else {
305-
future.completeExceptionally(new SQLException(msg));
306-
}
307-
}, getIntegerCallbackObjectReferenceManager()),
308-
tokioRuntime,
309-
tokioPostgresClient,
310-
preparedStatement,
311-
insertType,
312-
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
313-
bytes.length
314-
);
315-
return future.get(timeout, TimeUnit.MILLISECONDS);
316-
} catch (InterruptedException | ExecutionException e) {
317-
throw new RuntimeException(e);
318-
} catch (TimeoutException e) {
319-
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
320-
throw new RuntimeException(e);
311+
getWriteLock();
312+
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
313+
while (retryCounter >= 0) {
314+
try {
315+
final CompletableFuture<Integer> future = new CompletableFuture<>();
316+
317+
byte[] bytes = jniWrapper.toByteArray();
318+
if (insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET)
319+
sharedBuffer.put(0, bytes, 0, bytes.length);
320+
else
321+
largeSharedBuffer.put(0, bytes, 0, bytes.length);
322+
323+
getLibLakeSoulMetaData().execute_insert(
324+
new ReferencedIntegerCallback((result, msg) -> {
325+
if (msg.isEmpty()) {
326+
future.complete(result);
327+
} else {
328+
future.completeExceptionally(new SQLException(msg));
329+
}
330+
}, getIntegerCallbackObjectReferenceManager()),
331+
tokioRuntime,
332+
tokioPostgresClient,
333+
preparedStatement,
334+
insertType,
335+
insertType < DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address(),
336+
bytes.length
337+
);
338+
return future.get(timeout, TimeUnit.MILLISECONDS);
339+
} catch (InterruptedException | ExecutionException e) {
340+
if (retryCounter == 0) {
341+
throw new RuntimeException(e);
342+
} else {
343+
enlargeBufferAndTimeout();
344+
retryCounter--;
345+
}
346+
} catch (TimeoutException e) {
347+
if (retryCounter == 0) {
348+
LOG.error("Execute Insert {} with {} timeout", insertType, jniWrapper);
349+
throw new RuntimeException(e);
350+
} else {
351+
enlargeBufferAndTimeout();
352+
retryCounter--;
353+
}
354+
}
355+
}
321356
} finally {
322357
unlockWriteLock();
323358
}
359+
return -1;
324360
}
325361

326362
public Integer executeUpdate(Integer updateType, List<String> params) {
327-
getWriteLock();
328363
try {
329-
final CompletableFuture<Integer> future = new CompletableFuture<>();
330-
331-
getLibLakeSoulMetaData().execute_update(
332-
new ReferencedIntegerCallback((result, msg) -> {
333-
if (msg.isEmpty()) {
334-
future.complete(result);
335-
} else {
336-
future.completeExceptionally(new SQLException(msg));
337-
}
338-
}, getIntegerCallbackObjectReferenceManager()),
339-
tokioRuntime,
340-
tokioPostgresClient,
341-
preparedStatement,
342-
updateType,
343-
String.join(PARAM_DELIM, params)
344-
);
345-
return future.get(timeout, TimeUnit.MILLISECONDS);
346-
} catch (InterruptedException | ExecutionException e) {
347-
throw new RuntimeException(e);
348-
} catch (TimeoutException e) {
349-
LOG.error("Execute Update {} with {} timeout", updateType, params);
350-
throw new RuntimeException(e);
364+
getWriteLock();
365+
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
366+
while (retryCounter >= 0) {
367+
try {
368+
final CompletableFuture<Integer> future = new CompletableFuture<>();
369+
370+
getLibLakeSoulMetaData().execute_update(
371+
new ReferencedIntegerCallback((result, msg) -> {
372+
if (msg.isEmpty()) {
373+
future.complete(result);
374+
} else {
375+
future.completeExceptionally(new SQLException(msg));
376+
}
377+
}, getIntegerCallbackObjectReferenceManager()),
378+
tokioRuntime,
379+
tokioPostgresClient,
380+
preparedStatement,
381+
updateType,
382+
String.join(PARAM_DELIM, params)
383+
);
384+
return future.get(timeout, TimeUnit.MILLISECONDS);
385+
} catch (InterruptedException | ExecutionException e) {
386+
if (retryCounter == 0) {
387+
throw new RuntimeException(e);
388+
} else {
389+
enlargeBufferAndTimeout();
390+
retryCounter--;
391+
}
392+
} catch (TimeoutException e) {
393+
if (retryCounter == 0) {
394+
LOG.error("Execute Update {} with {} timeout", updateType, params);
395+
throw new RuntimeException(e);
396+
} else {
397+
enlargeBufferAndTimeout();
398+
retryCounter--;
399+
}
400+
}
401+
}
351402
} finally {
352403
unlockWriteLock();
353404
}
405+
return -1;
354406
}
355407

356-
public List<String> executeQueryScalar(Integer updateType, List<String> params) {
357-
getReadLock();
408+
public List<String> executeQueryScalar(Integer queryScalarType, List<String> params) {
358409
try {
359-
final CompletableFuture<String> future = new CompletableFuture<>();
360-
361-
getLibLakeSoulMetaData().execute_query_scalar(
362-
new ReferencedStringCallback((result, msg) -> {
363-
if (msg.isEmpty()) {
364-
future.complete(result);
365-
} else {
366-
future.completeExceptionally(new SQLException(msg));
367-
}
368-
}, getStringCallbackObjectReferenceManager()),
369-
tokioRuntime,
370-
tokioPostgresClient,
371-
preparedStatement,
372-
updateType,
373-
String.join(PARAM_DELIM, params)
374-
);
375-
String result = future.get(timeout, TimeUnit.MILLISECONDS);
376-
if (result.isEmpty()) return Collections.emptyList();
377-
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
378-
} catch (InterruptedException | ExecutionException e) {
379-
throw new RuntimeException(e);
380-
} catch (TimeoutException e) {
381-
LOG.error("Execute Update {} with {} timeout", updateType, params);
382-
throw new RuntimeException(e);
410+
getReadLock();
411+
int retryCounter = NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
412+
while (retryCounter >= 0) {
413+
try {
414+
final CompletableFuture<String> future = new CompletableFuture<>();
415+
416+
getLibLakeSoulMetaData().execute_query_scalar(
417+
new ReferencedStringCallback((result, msg) -> {
418+
if (msg.isEmpty()) {
419+
future.complete(result);
420+
} else {
421+
future.completeExceptionally(new SQLException(msg));
422+
}
423+
}, getStringCallbackObjectReferenceManager()),
424+
tokioRuntime,
425+
tokioPostgresClient,
426+
preparedStatement,
427+
queryScalarType,
428+
String.join(PARAM_DELIM, params)
429+
);
430+
String result = future.get(timeout, TimeUnit.MILLISECONDS);
431+
if (result.isEmpty()) return Collections.emptyList();
432+
return Arrays.stream(result.split(PARAM_DELIM)).collect(Collectors.toList());
433+
} catch (InterruptedException | ExecutionException e) {
434+
if (retryCounter == 0) {
435+
throw new RuntimeException(e);
436+
} else {
437+
enlargeBufferAndTimeout();
438+
retryCounter--;
439+
}
440+
} catch (TimeoutException e) {
441+
if (retryCounter == 0) {
442+
LOG.error("Execute QueryScalar {} with {} timeout", queryScalarType, params);
443+
throw new RuntimeException(e);
444+
} else {
445+
enlargeBufferAndTimeout();
446+
retryCounter--;
447+
}
448+
}
449+
}
383450
} finally {
384451
unlockReadLock();
385452
}
453+
return Collections.emptyList();
386454
}
387455

388456
public static Integer insert(NativeUtils.CodedDaoType insertType, JniWrapper jniWrapper) {

lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public class NativeUtils {
1010

1111
public static boolean NATIVE_METADATA_UPDATE_ENABLED = true;
1212

13+
public static int NATIVE_METADATA_MAX_RETRY_ATTEMPTS = 3;
14+
1315
public static int DAO_TYPE_QUERY_ONE_OFFSET = 0;
1416
public static int DAO_TYPE_QUERY_LIST_OFFSET = 100;
1517
public static int DAO_TYPE_INSERT_ONE_OFFSET = 200;

0 commit comments

Comments
 (0)