Skip to content

Commit 756cd41

Browse files
artembilangaryrussell
authored andcommitted
Fix MessagingGatewaySupport implementors
The `AmqpInboundGateway` and `JmsInboundGateway` don't delegate to `super` in their `doStart()/doStop()` implementations causing the problem with an internal `replyMessageCorrelator` missed the proper lifecycle management **Cherry-pick to 5.1.x & 5.0.x**
1 parent 34bcd7d commit 756cd41

File tree

2 files changed

+14
-9
lines changed

2 files changed

+14
-9
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
*/
6666
public class AmqpInboundGateway extends MessagingGatewaySupport {
6767

68-
private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<AttributeAccessor>();
68+
private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
6969

7070
private final AbstractMessageListenerContainer messageListenerContainer;
7171

@@ -208,11 +208,13 @@ protected void onInit() throws Exception {
208208

209209
@Override
210210
protected void doStart() {
211+
super.doStart();
211212
this.messageListenerContainer.start();
212213
}
213214

214215
@Override
215216
protected void doStop() {
217+
super.doStop();
216218
this.messageListenerContainer.stop();
217219
}
218220

@@ -271,11 +273,11 @@ public void onMessage(final Message message, final Channel channel) throws Excep
271273
org.springframework.messaging.Message<Object> converted = convert(message, channel);
272274
if (converted != null) {
273275
AmqpInboundGateway.this.retryTemplate.execute(context -> {
274-
StaticMessageHeaderAccessor.getDeliveryAttempt(converted).incrementAndGet();
275-
process(message, converted);
276-
return null;
277-
},
278-
(RecoveryCallback<Object>) AmqpInboundGateway.this.recoveryCallback);
276+
StaticMessageHeaderAccessor.getDeliveryAttempt(converted).incrementAndGet();
277+
process(message, converted);
278+
return null;
279+
},
280+
(RecoveryCallback<Object>) AmqpInboundGateway.this.recoveryCallback);
279281
}
280282
}
281283
}
@@ -305,9 +307,9 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
305307
return null;
306308
}
307309
return getMessageBuilderFactory()
308-
.withPayload(payload)
309-
.copyHeaders(headers)
310-
.build();
310+
.withPayload(payload)
311+
.copyHeaders(headers)
312+
.build();
311313
}
312314

313315
private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class JmsInboundGateway extends MessagingGatewaySupport implements Dispos
3939

4040
public JmsInboundGateway(AbstractMessageListenerContainer listenerContainer,
4141
ChannelPublishingJmsMessageListener listener) {
42+
4243
this.endpoint = new JmsMessageDrivenEndpoint(listenerContainer, listener);
4344
}
4445

@@ -78,11 +79,13 @@ public ChannelPublishingJmsMessageListener getListener() {
7879

7980
@Override
8081
protected void doStart() {
82+
super.doStart();
8183
this.endpoint.start();
8284
}
8385

8486
@Override
8587
protected void doStop() {
88+
super.doStop();
8689
this.endpoint.stop();
8790
}
8891

0 commit comments

Comments
 (0)