Skip to content

Commit 7135d06

Browse files
garyrussellartembilan
authored andcommitted
INT-4553: Store-backed QueueChannel improvements
JIRA: https://jira.spring.io/browse/INT-4553 Fixes #2628 Fixes #2629 - Avoid `size()` calls on the MGS, use `poll()` instead. - Optimize the indexes for the `INT_CHANNEL_MESSAGE` table. Avoid size call when no timeout too. Polishing - PR Comments Missed a doc fix Another missed %PREFIX% Fix underscores Polishing; PR comments; make MGQ extendable. Fix version in doc. * Polishing `@since` * Use diamonds whenever it is possible **Cherry-pick to 5.0.x**
1 parent cfa439d commit 7135d06

File tree

16 files changed

+306
-133
lines changed

16 files changed

+306
-133
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java

+21-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,7 +66,7 @@ public QueueChannel(Queue<Message<?>> queue) {
6666
public QueueChannel(int capacity) {
6767
Assert.isTrue(capacity > 0, "The capacity must be a positive integer. " +
6868
"For a zero-capacity alternative, consider using a 'RendezvousChannel'.");
69-
this.queue = new LinkedBlockingQueue<Message<?>>(capacity);
69+
this.queue = new LinkedBlockingQueue<>(capacity);
7070
}
7171

7272
/**
@@ -75,7 +75,7 @@ public QueueChannel(int capacity) {
7575
* unbounded queue may lead to OutOfMemoryErrors.
7676
*/
7777
public QueueChannel() {
78-
this(new LinkedBlockingQueue<Message<?>>());
78+
this(new LinkedBlockingQueue<>());
7979
}
8080

8181
@Override
@@ -116,13 +116,19 @@ protected Message<?> doReceive(long timeout) {
116116
return ((BlockingQueue<Message<?>>) this.queue).poll(timeout, TimeUnit.MILLISECONDS);
117117
}
118118
else {
119-
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
120-
long deadline = System.nanoTime() + nanos;
121-
while (this.queue.size() == 0 && nanos > 0) {
122-
this.queueSemaphore.tryAcquire(nanos, TimeUnit.NANOSECONDS); // NOSONAR - ok to ignore result
123-
nanos = deadline - System.nanoTime();
119+
Message<?> message = this.queue.poll();
120+
if (message == null) {
121+
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
122+
long deadline = System.nanoTime() + nanos;
123+
while (message == null && nanos > 0) {
124+
this.queueSemaphore.tryAcquire(nanos, TimeUnit.NANOSECONDS); // NOSONAR ok to ignore result
125+
message = this.queue.poll();
126+
if (message == null) {
127+
nanos = deadline - System.nanoTime();
128+
}
129+
}
124130
}
125-
return this.queue.poll();
131+
return message;
126132
}
127133
}
128134
if (timeout == 0) {
@@ -133,10 +139,12 @@ protected Message<?> doReceive(long timeout) {
133139
return ((BlockingQueue<Message<?>>) this.queue).take();
134140
}
135141
else {
136-
while (this.queue.size() == 0) {
142+
Message<?> message = this.queue.poll();
143+
while (message == null) {
137144
this.queueSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS);
145+
message = this.queue.poll();
138146
}
139-
return this.queue.poll();
147+
return message;
140148
}
141149
}
142150
catch (InterruptedException e) {
@@ -147,7 +155,7 @@ protected Message<?> doReceive(long timeout) {
147155

148156
@Override
149157
public List<Message<?>> clear() {
150-
List<Message<?>> clearedMessages = new ArrayList<Message<?>>();
158+
List<Message<?>> clearedMessages = new ArrayList<>();
151159
if (this.queue instanceof BlockingQueue) {
152160
((BlockingQueue<Message<?>>) this.queue).drainTo(clearedMessages);
153161
}
@@ -165,7 +173,7 @@ public List<Message<?>> purge(MessageSelector selector) {
165173
if (selector == null) {
166174
return this.clear();
167175
}
168-
List<Message<?>> purgedMessages = new ArrayList<Message<?>>();
176+
List<Message<?>> purgedMessages = new ArrayList<>();
169177
Object[] array = this.queue.toArray();
170178
for (Object o : array) {
171179
Message<?> message = (Message<?>) o;

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java

+47-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -121,6 +121,42 @@ public Iterator<Message<?>> iterator() {
121121
return getMessages().iterator();
122122
}
123123

124+
/**
125+
* Get the store.
126+
* @return the store.
127+
* @since 5.0.11
128+
*/
129+
protected BasicMessageGroupStore getMessageGroupStore() {
130+
return this.messageGroupStore;
131+
}
132+
133+
/**
134+
* Get the store lock.
135+
* @return the lock.
136+
* @since 5.0.11
137+
*/
138+
protected Lock getStoreLock() {
139+
return this.storeLock;
140+
}
141+
142+
/**
143+
* Get the not full condition.
144+
* @return the condition.
145+
* @since 5.0.11
146+
*/
147+
protected Condition getMessageStoreNotFull() {
148+
return this.messageStoreNotFull;
149+
}
150+
151+
/**
152+
* Get the not empty condition.
153+
* @return the condition.
154+
* @since 5.0.11
155+
*/
156+
protected Condition getMessageStoreNotEmpty() {
157+
return this.messageStoreNotEmpty;
158+
}
159+
124160
@Override
125161
public int size() {
126162
return this.messageGroupStore.messageGroupSize(this.groupId);
@@ -156,11 +192,11 @@ public Message<?> poll(long timeout, TimeUnit unit) throws InterruptedException
156192
storeLock.lockInterruptibly();
157193

158194
try {
159-
while (this.size() == 0 && timeoutInNanos > 0) {
195+
message = doPoll();
196+
while (message == null && timeoutInNanos > 0) {
160197
timeoutInNanos = this.messageStoreNotEmpty.awaitNanos(timeoutInNanos);
198+
message = doPoll();
161199
}
162-
message = this.doPoll();
163-
164200
}
165201
finally {
166202
storeLock.unlock();
@@ -196,7 +232,7 @@ public int drainTo(Collection<? super Message<?>> c) {
196232
public int drainTo(Collection<? super Message<?>> collection, int maxElements) {
197233
Assert.notNull(collection, "'collection' must not be null");
198234
int originalSize = collection.size();
199-
ArrayList<Message<?>> list = new ArrayList<Message<?>>();
235+
ArrayList<Message<?>> list = new ArrayList<>();
200236
final Lock storeLock = this.storeLock;
201237
try {
202238
storeLock.lockInterruptibly();
@@ -297,7 +333,7 @@ public Message<?> take() throws InterruptedException {
297333
while (this.size() == 0) {
298334
this.messageStoreNotEmpty.await();
299335
}
300-
message = this.doPoll();
336+
message = doPoll();
301337

302338
}
303339
finally {
@@ -306,15 +342,15 @@ public Message<?> take() throws InterruptedException {
306342
return message;
307343
}
308344

309-
private Collection<Message<?>> getMessages() {
345+
protected Collection<Message<?>> getMessages() {
310346
return this.messageGroupStore.getMessageGroup(this.groupId).getMessages();
311347
}
312348

313349
/**
314350
* It is assumed that the 'storeLock' is being held by the caller, otherwise
315351
* IllegalMonitorStateException may be thrown
316352
*/
317-
private Message<?> doPoll() {
353+
protected Message<?> doPoll() {
318354
Message<?> message = this.messageGroupStore.pollMessageFromGroup(this.groupId);
319355
this.messageStoreNotFull.signal();
320356
return message;
@@ -323,8 +359,9 @@ private Message<?> doPoll() {
323359
/**
324360
* It is assumed that the 'storeLock' is being held by the caller, otherwise
325361
* IllegalMonitorStateException may be thrown
362+
* @param message the message to offer.
326363
*/
327-
private boolean doOffer(Message<?> message) {
364+
protected boolean doOffer(Message<?> message) {
328365
boolean offered = false;
329366
if (this.capacity == Integer.MAX_VALUE || this.size() < this.capacity) {
330367
this.messageGroupStore.addMessageToGroup(this.groupId, message);
@@ -333,4 +370,5 @@ private boolean doOffer(Message<?> message) {
333370
}
334371
return offered;
335372
}
373+
336374
}

0 commit comments

Comments
 (0)