Skip to content

Commit 3465110

Browse files
committed
Fix Redis components for JDK deserialization
It turns out that `JdkSerializationRedisSerializer` by default is based on the default Java class loader which may lead into `ClassCastException` downstream after deserialization * Make all the `JdkSerializationRedisSerializer` usage (default) in Redis components based on the BeanFactory `ClassLoader` * Fix tests to call `setBeanClassLoader()` * Fix Mark Fisher's name in the `MultipartFileReader` :-) **Cherry-pick to 5.1.x, 5.0.x & 4.3.x after restoring diamonds** # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisQueueOutboundGateway.java # spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java # Conflicts: # spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java
1 parent 2e7c5d8 commit 3465110

File tree

6 files changed

+87
-57
lines changed

6 files changed

+87
-57
lines changed

spring-integration-http/src/main/java/org/springframework/integration/http/multipart/MultipartFileReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
/**
2424
* Strategy for reading {@link MultipartFile} content.
2525
*
26-
* @author mark Fisher
26+
* @author Mark Fisher
27+
*
2728
* @since 2.0
2829
*/
2930
public interface MultipartFileReader<T> {

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java

+24-16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
2425
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -52,7 +53,8 @@
5253
*/
5354
@ManagedResource
5455
@IntegrationManagedResource
55-
public class RedisQueueInboundGateway extends MessagingGatewaySupport implements ApplicationEventPublisherAware {
56+
public class RedisQueueInboundGateway extends MessagingGatewaySupport
57+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
5658

5759
private static final String QUEUE_NAME_SUFFIX = ".reply";
5860

@@ -66,24 +68,24 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport implements
6668

6769
private final BoundListOperations<String, byte[]> boundListOperations;
6870

69-
private volatile ApplicationEventPublisher applicationEventPublisher;
71+
private ApplicationEventPublisher applicationEventPublisher;
7072

71-
private volatile boolean serializerExplicitlySet;
73+
private boolean serializerExplicitlySet;
7274

73-
private volatile Executor taskExecutor;
75+
private Executor taskExecutor;
7476

75-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
77+
private RedisSerializer<?> serializer;
7678

77-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
79+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7880

79-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
81+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
82+
83+
private boolean extractPayload = true;
8084

8185
private volatile boolean active;
8286

8387
private volatile boolean listening;
8488

85-
private volatile boolean extractPayload = true;
86-
8789
private volatile Runnable stopCallback;
8890

8991
/**
@@ -110,12 +112,18 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
110112
this.applicationEventPublisher = applicationEventPublisher;
111113
}
112114

115+
@Override
116+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
117+
if (!this.serializerExplicitlySet) {
118+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
119+
}
120+
}
121+
113122
public void setSerializer(RedisSerializer<?> serializer) {
114123
this.serializer = serializer;
115124
this.serializerExplicitlySet = true;
116125
}
117126

118-
119127
/**
120128
* This timeout (milliseconds) is used when retrieving elements from the queue
121129
* specified by {@link #boundListOperations}.
@@ -157,11 +165,11 @@ protected void onInit() throws Exception {
157165
Assert.notNull(this.serializer, "'serializer' has to be provided where 'extractPayload == false'.");
158166
}
159167
if (this.taskExecutor == null) {
160-
String beanName = this.getComponentName();
168+
String beanName = getComponentName();
161169
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-")
162-
+ this.getComponentType());
170+
+ getComponentType());
163171
}
164-
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
172+
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && getBeanFactory() != null) {
165173
MessagePublishingErrorHandler errorHandler =
166174
new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory()));
167175
errorHandler.setDefaultErrorChannel(getErrorChannel());
@@ -179,8 +187,8 @@ private void handlePopException(Exception e) {
179187
if (this.active) {
180188
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
181189
+ " milliseconds.", e);
182-
this.publishException(e);
183-
this.sleepBeforeRecoveryAttempt();
190+
publishException(e);
191+
sleepBeforeRecoveryAttempt();
184192
}
185193
else {
186194
logger.debug("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage());
@@ -189,7 +197,7 @@ private void handlePopException(Exception e) {
189197

190198
@SuppressWarnings("unchecked")
191199
private void receiveAndReply() {
192-
byte[] value = null;
200+
byte[] value;
193201
try {
194202
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
195203
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

+22-20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
2425
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -38,7 +39,6 @@
3839
import org.springframework.jmx.export.annotation.ManagedOperation;
3940
import org.springframework.jmx.export.annotation.ManagedResource;
4041
import org.springframework.messaging.Message;
41-
import org.springframework.messaging.MessageChannel;
4242
import org.springframework.messaging.MessagingException;
4343
import org.springframework.scheduling.SchedulingAwareRunnable;
4444
import org.springframework.util.Assert;
@@ -55,36 +55,37 @@
5555
*/
5656
@ManagedResource
5757
@IntegrationManagedResource
58-
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport implements ApplicationEventPublisherAware {
58+
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
59+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
5960

6061
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
6162

6263
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
6364

6465
private final BoundListOperations<String, byte[]> boundListOperations;
6566

66-
private volatile ApplicationEventPublisher applicationEventPublisher;
67+
private ApplicationEventPublisher applicationEventPublisher;
6768

68-
private volatile MessageChannel errorChannel;
69+
private Executor taskExecutor;
6970

70-
private volatile Executor taskExecutor;
71+
private RedisSerializer<?> serializer;
7172

72-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
73+
private boolean serializerExplicitlySet;
7374

74-
private volatile boolean expectMessage = false;
75+
private boolean expectMessage = false;
7576

76-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
77+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7778

78-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
79+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
80+
81+
private boolean rightPop = true;
7982

8083
private volatile boolean active;
8184

8285
private volatile boolean listening;
8386

8487
private volatile Runnable stopCallback;
8588

86-
private volatile boolean rightPop = true;
87-
8889
/**
8990
* @param queueName Must not be an empty String
9091
* @param connectionFactory Must not be null
@@ -105,17 +106,24 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
105106
this.applicationEventPublisher = applicationEventPublisher;
106107
}
107108

109+
@Override
110+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
111+
if (!this.serializerExplicitlySet) {
112+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
113+
}
114+
}
115+
108116
public void setSerializer(RedisSerializer<?> serializer) {
109117
this.serializer = serializer;
118+
this.serializerExplicitlySet = true;
110119
}
111120

112121
/**
113122
* When data is retrieved from the Redis queue, does the returned data represent
114123
* just the payload for a Message, or does the data represent a serialized
115124
* {@link Message}?. {@code expectMessage} defaults to false. This means
116125
* the retrieved data will be used as the payload for a new Spring Integration
117-
* Message. Otherwise, the data is deserialized as Spring Integration
118-
* Message.
126+
* Message. Otherwise, the data is deserialized as Spring Integration Message.
119127
* @param expectMessage Defaults to false
120128
*/
121129
public void setExpectMessage(boolean expectMessage) {
@@ -153,12 +161,6 @@ public void setTaskExecutor(Executor taskExecutor) {
153161
this.taskExecutor = taskExecutor;
154162
}
155163

156-
@Override
157-
public void setErrorChannel(MessageChannel errorChannel) {
158-
super.setErrorChannel(errorChannel);
159-
this.errorChannel = errorChannel;
160-
}
161-
162164
public void setRecoveryInterval(long recoveryInterval) {
163165
this.recoveryInterval = recoveryInterval;
164166
}
@@ -186,7 +188,7 @@ protected void onInit() {
186188
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
187189
MessagePublishingErrorHandler errorHandler =
188190
new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(this.getBeanFactory()));
189-
errorHandler.setDefaultErrorChannel(this.errorChannel);
191+
errorHandler.setDefaultErrorChannel(getErrorChannel());
190192
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
191193
}
192194
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisQueueOutboundGateway.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,19 @@ public class RedisQueueOutboundGateway extends AbstractReplyProducingMessageHand
4343

4444
private static final IdGenerator defaultIdGenerator = new AlternativeJdkIdGenerator();
4545

46-
private final static RedisSerializer<String> stringSerializer = new StringRedisSerializer();
46+
private static final RedisSerializer<String> stringSerializer = new StringRedisSerializer();
4747

4848
private final RedisTemplate<String, Object> template;
4949

5050
private final BoundListOperations<String, Object> boundListOps;
5151

52-
private volatile boolean extractPayload = true;
52+
private boolean extractPayload = true;
5353

54-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
54+
private RedisSerializer<?> serializer;
5555

56-
private volatile boolean serializerExplicitlySet;
56+
private boolean serializerExplicitlySet;
5757

58-
private volatile int receiveTimeout = TIMEOUT;
58+
private int receiveTimeout = TIMEOUT;
5959

6060
public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
6161
Assert.hasText(queueName, "'queueName' is required");
@@ -68,6 +68,14 @@ public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connec
6868
this.boundListOps = this.template.boundListOps(queueName);
6969
}
7070

71+
@Override
72+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
73+
super.setBeanClassLoader(beanClassLoader);
74+
if (!this.serializerExplicitlySet) {
75+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
76+
}
77+
}
78+
7179
public void setReceiveTimeout(int timeout) {
7280
this.receiveTimeout = timeout;
7381
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.Set;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.beans.factory.BeanNameAware;
2324
import org.springframework.beans.factory.InitializingBean;
2425
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -41,17 +42,21 @@
4142
*
4243
* @author Gary Russell
4344
* @author Artem Bilan
45+
*
4446
* @since 4.0
4547
*
4648
*/
47-
public class RedisChannelMessageStore implements ChannelMessageStore, BeanNameAware, InitializingBean {
49+
public class RedisChannelMessageStore
50+
implements ChannelMessageStore, BeanNameAware, InitializingBean, BeanClassLoaderAware {
4851

4952
private final RedisTemplate<Object, Message<?>> redisTemplate;
5053

51-
private volatile MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
52-
5354
private String beanName;
5455

56+
private MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
57+
58+
private boolean valueSerializerExplicitlySet;
59+
5560
/**
5661
* Construct a message store that uses Java Serialization for messages.
5762
*
@@ -65,6 +70,13 @@ public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
6570
this.redisTemplate.afterPropertiesSet();
6671
}
6772

73+
@Override
74+
public void setBeanClassLoader(ClassLoader classLoader) {
75+
if (!this.valueSerializerExplicitlySet) {
76+
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer(classLoader));
77+
}
78+
}
79+
6880
/**
6981
* Use a different serializer (default {@link JdkSerializationRedisSerializer} for
7082
* the {@link Message}.
@@ -74,6 +86,7 @@ public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
7486
public void setValueSerializer(RedisSerializer<?> valueSerializer) {
7587
Assert.notNull(valueSerializer, "'valueSerializer' must not be null");
7688
this.redisTemplate.setValueSerializer(valueSerializer);
89+
this.valueSerializerExplicitlySet = true;
7790
}
7891

7992
/**

0 commit comments

Comments
 (0)