Skip to content

Commit 6474097

Browse files
committed
Merge branch '4.0.x-stable'
2 parents 6afe9b1 + 965a6a5 commit 6474097

File tree

2 files changed

+50
-25
lines changed

2 files changed

+50
-25
lines changed

src/main/java/com/rabbitmq/utility/BlockingCell.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,13 @@ public synchronized T get() throws InterruptedException {
6363
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
6464
if (timeout == INFINITY) return get();
6565

66-
if (timeout < 0)
66+
if (timeout < 0) {
6767
throw new AssertionError("Timeout cannot be less than zero");
68+
}
6869

69-
long maxTime = System.currentTimeMillis() + timeout;
70-
long now;
71-
while (!_filled && (now = System.currentTimeMillis()) < maxTime) {
70+
long now = System.nanoTime() / NANOS_IN_MILLI;
71+
long maxTime = now + timeout;
72+
while (!_filled && (now = (System.nanoTime() / NANOS_IN_MILLI)) < maxTime) {
7273
wait(maxTime - now);
7374
}
7475

@@ -83,11 +84,19 @@ public synchronized T get(long timeout) throws InterruptedException, TimeoutExce
8384
* @return the waited-for value
8485
*/
8586
public synchronized T uninterruptibleGet() {
86-
while (true) {
87-
try {
88-
return get();
89-
} catch (InterruptedException ex) {
90-
// no special handling necessary
87+
boolean wasInterrupted = false;
88+
try {
89+
while (true) {
90+
try {
91+
return get();
92+
} catch (InterruptedException ex) {
93+
// no special handling necessary
94+
wasInterrupted = true;
95+
}
96+
}
97+
} finally {
98+
if (wasInterrupted) {
99+
Thread.currentThread().interrupt();
91100
}
92101
}
93102
}
@@ -104,14 +113,21 @@ public synchronized T uninterruptibleGet() {
104113
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
105114
long now = System.nanoTime() / NANOS_IN_MILLI;
106115
long runTime = now + timeout;
107-
108-
do {
109-
try {
110-
return get(runTime - now);
111-
} catch (InterruptedException e) {
112-
// Ignore.
116+
boolean wasInterrupted = false;
117+
try {
118+
do {
119+
try {
120+
return get(runTime - now);
121+
} catch (InterruptedException e) {
122+
// Ignore.
123+
wasInterrupted = true;
124+
}
125+
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
126+
} finally {
127+
if (wasInterrupted) {
128+
Thread.currentThread().interrupt();
113129
}
114-
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
130+
}
115131

116132
throw new TimeoutException();
117133
}

src/main/java/com/rabbitmq/utility/SingleShotLinearTimer.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
* package-private.
3232
*
3333
* We currently just use this to time the quiescing RPC in AMQChannel.
34-
*
34+
*
35+
* Will be removed in next major release.
36+
*
37+
* @deprecated
3538
* @see AMQChannel
3639
*/
3740

@@ -72,21 +75,27 @@ public TimerThread(long timeoutMillisec) {
7275
public void run() {
7376
try {
7477
long now;
75-
while ((now = System.nanoTime() / NANOS_IN_MILLI) < _runTime) {
76-
if (_task == null) break;
78+
boolean wasInterrupted = false;
79+
try {
80+
while ((now = System.nanoTime() / NANOS_IN_MILLI) < _runTime) {
81+
if (_task == null) break;
7782

78-
try {
79-
synchronized(this) {
80-
wait(_runTime - now);
83+
try {
84+
synchronized(this) {
85+
wait(_runTime - now);
86+
}
87+
} catch (InterruptedException e) {
88+
wasInterrupted = true;
8189
}
82-
} catch (InterruptedException e) {
83-
Thread.currentThread().interrupt();
8490
}
91+
} finally {
92+
if (wasInterrupted)
93+
Thread.currentThread().interrupt();
8594
}
8695

8796
Runnable task = _task;
8897
if (task != null) {
89-
task.run();
98+
task.run();
9099
}
91100

92101
} finally {

0 commit comments

Comments
 (0)