Skip to content

Commit 09aeaac

Browse files
artembilangaryrussell
authored andcommitted
INT-4396: Add retrying lock in case of exception
JIRA: https://jira.spring.io/browse/INT-4396 When target distributed `Lock` implementation throws an exception, e.g. in case of no connection to the service, the `LockRegistryLeaderInitiator` exists the loop and can come back to the elections only after restart * Catch all the exception on `this.lock.tryLock()` and resubmit `LeaderSelector` for a new locking cycle if `LockRegistryLeaderInitiator.isRunning()` and `InterruptedException` * Remove diagnostics from the `JdbcLockRegistryLeaderInitiatorTests` since this fix confirms that we just didn't have a reconnect logic before when this test failed sporadically **Cherry-pick to 4.3.x**
1 parent e53e894 commit 09aeaac

File tree

3 files changed

+56
-25
lines changed

3 files changed

+56
-25
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -326,7 +326,7 @@ protected class LeaderSelector implements Callable<Void> {
326326
@Override
327327
public Void call() throws Exception {
328328
try {
329-
while (LockRegistryLeaderInitiator.this.running) {
329+
while (isRunning()) {
330330
try {
331331
// We always try to acquire the lock, in case it expired
332332
boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis,
@@ -345,26 +345,40 @@ else if (acquired) {
345345
// If we were able to acquire it but we were already locked we
346346
// should release it
347347
this.lock.unlock();
348-
// Give it a chance to expire.
349-
Thread.sleep(LockRegistryLeaderInitiator.this.heartBeatMillis);
348+
if (isRunning()) {
349+
// Give it a chance to expire.
350+
Thread.sleep(LockRegistryLeaderInitiator.this.heartBeatMillis);
351+
}
350352
}
351353
else {
352354
this.locked = false;
353355
// We were not able to acquire it, therefore not leading any more
354356
handleRevoked();
355-
// Try again quickly in case the lock holder dropped it
356-
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
357+
if (isRunning()) {
358+
// Try again quickly in case the lock holder dropped it
359+
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
360+
}
357361
}
358362
}
359-
catch (InterruptedException e) {
363+
catch (Exception e) {
360364
if (this.locked) {
361365
this.lock.unlock();
362366
this.locked = false;
363367
// The lock was broken and we are no longer leader
364368
handleRevoked();
365-
// Give it a chance to elect some other leader.
366-
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
369+
if (isRunning()) {
370+
// Give it a chance to elect some other leader.
371+
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
372+
}
373+
}
374+
375+
if (e instanceof InterruptedException) {
367376
Thread.currentThread().interrupt();
377+
if (isRunning()) {
378+
logger.warn("Restarting LeaderSelector because of error.", e);
379+
LockRegistryLeaderInitiator.this.future =
380+
LockRegistryLeaderInitiator.this.executorService.submit(this);
381+
}
368382
return null;
369383
}
370384
}
@@ -446,11 +460,6 @@ public boolean isLeader() {
446460
public void yield() {
447461
if (LockRegistryLeaderInitiator.this.future != null) {
448462
LockRegistryLeaderInitiator.this.future.cancel(true);
449-
if (isRunning()) {
450-
LockRegistryLeaderInitiator.this.future =
451-
LockRegistryLeaderInitiator.this.executorService
452-
.submit(LockRegistryLeaderInitiator.this.leaderSelector);
453-
}
454463
}
455464
}
456465

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2017 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -265,6 +265,38 @@ public void testGracefulLeaderSelectorExit() throws Exception {
265265
assertNull(throwable);
266266
}
267267

268+
@Test
269+
public void testExceptionFromLock() throws Exception {
270+
Lock mockLock = mock(Lock.class);
271+
272+
AtomicBoolean exceptionThrown = new AtomicBoolean();
273+
274+
willAnswer(invocation -> {
275+
if (!exceptionThrown.getAndSet(true)) {
276+
throw new RuntimeException("lock is broken");
277+
}
278+
else {
279+
return true;
280+
}
281+
}).given(mockLock).tryLock(anyLong(), any(TimeUnit.class));
282+
283+
LockRegistry registry = lockKey -> mockLock;
284+
285+
CountDownLatch onGranted = new CountDownLatch(1);
286+
287+
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(registry);
288+
289+
initiator.setLeaderEventPublisher(new CountingPublisher(onGranted));
290+
291+
initiator.start();
292+
293+
assertTrue(onGranted.await(10, TimeUnit.SECONDS));
294+
assertTrue(initiator.getContext().isLeader());
295+
assertTrue(exceptionThrown.get());
296+
297+
initiator.stop();
298+
}
299+
268300
private static class CountingPublisher implements LeaderEventPublisher {
269301

270302
private final CountDownLatch granted;

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import org.junit.AfterClass;
2929
import org.junit.BeforeClass;
30-
import org.junit.Rule;
3130
import org.junit.Test;
3231

3332
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
@@ -36,7 +35,6 @@
3635
import org.springframework.integration.leader.DefaultCandidate;
3736
import org.springframework.integration.leader.event.LeaderEventPublisher;
3837
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
39-
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
4038
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
4139
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
4240
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -52,14 +50,6 @@ public class JdbcLockRegistryLeaderInitiatorTests {
5250

5351
public static EmbeddedDatabase dataSource;
5452

55-
@Rule
56-
public Log4j2LevelAdjuster adjuster =
57-
Log4j2LevelAdjuster.trace()
58-
.categories("org.springframework.integration",
59-
"org.springframework.integration.jdbc",
60-
"org.springframework.jdbc",
61-
"org.apache.derby");
62-
6353
@BeforeClass
6454
public static void init() {
6555
dataSource = new EmbeddedDatabaseBuilder()

0 commit comments

Comments
 (0)