Skip to content

Commit eb31825

Browse files
committed
Fix Redis components for JDK deserialization
It turns out that `JdkSerializationRedisSerializer` by default is based on the default Java class loader which may lead into `ClassCastException` downstream after deserialization * Make all the `JdkSerializationRedisSerializer` usage (default) in Redis components based on the BeanFactory `ClassLoader` * Fix tests to call `setBeanClassLoader()` * Fix Mark Fisher's name in the `MultipartFileReader` :-) **Cherry-pick to 5.1.x, 5.0.x & 4.3.x after restoring diamonds** # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisQueueOutboundGateway.java # spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java
1 parent 93cd669 commit eb31825

File tree

6 files changed

+91
-61
lines changed

6 files changed

+91
-61
lines changed

spring-integration-http/src/main/java/org/springframework/integration/http/multipart/MultipartFileReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
/**
2424
* Strategy for reading {@link MultipartFile} content.
2525
*
26-
* @author mark Fisher
26+
* @author Mark Fisher
27+
*
2728
* @since 2.0
2829
*/
2930
public interface MultipartFileReader<T> {

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java

+25-17
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
2425
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -52,7 +53,8 @@
5253
*/
5354
@ManagedResource
5455
@IntegrationManagedResource
55-
public class RedisQueueInboundGateway extends MessagingGatewaySupport implements ApplicationEventPublisherAware {
56+
public class RedisQueueInboundGateway extends MessagingGatewaySupport
57+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
5658

5759
private static final String QUEUE_NAME_SUFFIX = ".reply";
5860

@@ -66,24 +68,24 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport implements
6668

6769
private final BoundListOperations<String, byte[]> boundListOperations;
6870

69-
private volatile ApplicationEventPublisher applicationEventPublisher;
71+
private ApplicationEventPublisher applicationEventPublisher;
7072

71-
private volatile boolean serializerExplicitlySet;
73+
private boolean serializerExplicitlySet;
7274

73-
private volatile Executor taskExecutor;
75+
private Executor taskExecutor;
7476

75-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
77+
private RedisSerializer<?> serializer;
7678

77-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
79+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7880

79-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
81+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
82+
83+
private boolean extractPayload = true;
8084

8185
private volatile boolean active;
8286

8387
private volatile boolean listening;
8488

85-
private volatile boolean extractPayload = true;
86-
8789
private volatile Runnable stopCallback;
8890

8991
/**
@@ -93,7 +95,7 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport implements
9395
public RedisQueueInboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
9496
Assert.hasText(queueName, "'queueName' is required");
9597
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
96-
this.template = new RedisTemplate<String, byte[]>();
98+
this.template = new RedisTemplate<>();
9799
this.template.setConnectionFactory(connectionFactory);
98100
this.template.setEnableDefaultSerializer(false);
99101
this.template.setKeySerializer(new StringRedisSerializer());
@@ -110,12 +112,18 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
110112
this.applicationEventPublisher = applicationEventPublisher;
111113
}
112114

115+
@Override
116+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
117+
if (!this.serializerExplicitlySet) {
118+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
119+
}
120+
}
121+
113122
public void setSerializer(RedisSerializer<?> serializer) {
114123
this.serializer = serializer;
115124
this.serializerExplicitlySet = true;
116125
}
117126

118-
119127
/**
120128
* This timeout (milliseconds) is used when retrieving elements from the queue
121129
* specified by {@link #boundListOperations}.
@@ -147,11 +155,11 @@ protected void onInit() throws Exception {
147155
Assert.notNull(this.serializer, "'serializer' has to be provided where 'extractPayload == false'.");
148156
}
149157
if (this.taskExecutor == null) {
150-
String beanName = this.getComponentName();
158+
String beanName = getComponentName();
151159
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-")
152-
+ this.getComponentType());
160+
+ getComponentType());
153161
}
154-
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
162+
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && getBeanFactory() != null) {
155163
MessagePublishingErrorHandler errorHandler =
156164
new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory()));
157165
errorHandler.setDefaultErrorChannel(getErrorChannel());
@@ -169,8 +177,8 @@ private void handlePopException(Exception e) {
169177
if (this.active) {
170178
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
171179
+ " milliseconds.", e);
172-
this.publishException(e);
173-
this.sleepBeforeRecoveryAttempt();
180+
publishException(e);
181+
sleepBeforeRecoveryAttempt();
174182
}
175183
else {
176184
logger.debug("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage());
@@ -179,7 +187,7 @@ private void handlePopException(Exception e) {
179187

180188
@SuppressWarnings("unchecked")
181189
private void receiveAndReply() {
182-
byte[] value = null;
190+
byte[] value;
183191
try {
184192
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
185193
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

+23-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
2425
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -38,7 +39,6 @@
3839
import org.springframework.jmx.export.annotation.ManagedOperation;
3940
import org.springframework.jmx.export.annotation.ManagedResource;
4041
import org.springframework.messaging.Message;
41-
import org.springframework.messaging.MessageChannel;
4242
import org.springframework.messaging.MessagingException;
4343
import org.springframework.scheduling.SchedulingAwareRunnable;
4444
import org.springframework.util.Assert;
@@ -55,44 +55,45 @@
5555
*/
5656
@ManagedResource
5757
@IntegrationManagedResource
58-
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport implements ApplicationEventPublisherAware {
58+
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
59+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
5960

6061
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
6162

6263
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
6364

6465
private final BoundListOperations<String, byte[]> boundListOperations;
6566

66-
private volatile ApplicationEventPublisher applicationEventPublisher;
67+
private ApplicationEventPublisher applicationEventPublisher;
6768

68-
private volatile MessageChannel errorChannel;
69+
private Executor taskExecutor;
6970

70-
private volatile Executor taskExecutor;
71+
private RedisSerializer<?> serializer;
7172

72-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
73+
private boolean serializerExplicitlySet;
7374

74-
private volatile boolean expectMessage = false;
75+
private boolean expectMessage = false;
7576

76-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
77+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7778

78-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
79+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
80+
81+
private boolean rightPop = true;
7982

8083
private volatile boolean active;
8184

8285
private volatile boolean listening;
8386

8487
private volatile Runnable stopCallback;
8588

86-
private volatile boolean rightPop = true;
87-
8889
/**
8990
* @param queueName Must not be an empty String
9091
* @param connectionFactory Must not be null
9192
*/
9293
public RedisQueueMessageDrivenEndpoint(String queueName, RedisConnectionFactory connectionFactory) {
9394
Assert.hasText(queueName, "'queueName' is required");
9495
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
95-
RedisTemplate<String, byte[]> template = new RedisTemplate<String, byte[]>();
96+
RedisTemplate<String, byte[]> template = new RedisTemplate<>();
9697
template.setConnectionFactory(connectionFactory);
9798
template.setEnableDefaultSerializer(false);
9899
template.setKeySerializer(new StringRedisSerializer());
@@ -105,17 +106,24 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
105106
this.applicationEventPublisher = applicationEventPublisher;
106107
}
107108

109+
@Override
110+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
111+
if (!this.serializerExplicitlySet) {
112+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
113+
}
114+
}
115+
108116
public void setSerializer(RedisSerializer<?> serializer) {
109117
this.serializer = serializer;
118+
this.serializerExplicitlySet = true;
110119
}
111120

112121
/**
113122
* When data is retrieved from the Redis queue, does the returned data represent
114123
* just the payload for a Message, or does the data represent a serialized
115124
* {@link Message}?. {@code expectMessage} defaults to false. This means
116125
* the retrieved data will be used as the payload for a new Spring Integration
117-
* Message. Otherwise, the data is deserialized as Spring Integration
118-
* Message.
126+
* Message. Otherwise, the data is deserialized as Spring Integration Message.
119127
* @param expectMessage Defaults to false
120128
*/
121129
public void setExpectMessage(boolean expectMessage) {
@@ -142,12 +150,6 @@ public void setTaskExecutor(Executor taskExecutor) {
142150
this.taskExecutor = taskExecutor;
143151
}
144152

145-
@Override
146-
public void setErrorChannel(MessageChannel errorChannel) {
147-
super.setErrorChannel(errorChannel);
148-
this.errorChannel = errorChannel;
149-
}
150-
151153
public void setRecoveryInterval(long recoveryInterval) {
152154
this.recoveryInterval = recoveryInterval;
153155
}
@@ -175,7 +177,7 @@ protected void onInit() {
175177
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
176178
MessagePublishingErrorHandler errorHandler =
177179
new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(this.getBeanFactory()));
178-
errorHandler.setDefaultErrorChannel(this.errorChannel);
180+
errorHandler.setDefaultErrorChannel(getErrorChannel());
179181
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
180182
}
181183
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisQueueOutboundGateway.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ public class RedisQueueOutboundGateway extends AbstractReplyProducingMessageHand
4444

4545
private static final IdGenerator defaultIdGenerator = new AlternativeJdkIdGenerator();
4646

47-
private final static RedisSerializer<String> stringSerializer = new StringRedisSerializer();
47+
private static final RedisSerializer<String> stringSerializer = new StringRedisSerializer();
4848

4949
private final RedisTemplate<String, Object> template;
5050

5151
private final BoundListOperations<String, Object> boundListOps;
5252

53-
private volatile boolean extractPayload = true;
53+
private boolean extractPayload = true;
5454

55-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
55+
private RedisSerializer<?> serializer;
5656

57-
private volatile boolean serializerExplicitlySet;
57+
private boolean serializerExplicitlySet;
5858

59-
private volatile int receiveTimeout = TIMEOUT;
59+
private int receiveTimeout = TIMEOUT;
6060

6161
public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
6262
Assert.hasText(queueName, "'queueName' is required");
@@ -69,6 +69,14 @@ public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connec
6969
this.boundListOps = this.template.boundListOps(queueName);
7070
}
7171

72+
@Override
73+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
74+
super.setBeanClassLoader(beanClassLoader);
75+
if (!this.serializerExplicitlySet) {
76+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
77+
}
78+
}
79+
7280
public void setReceiveTimeout(int timeout) {
7381
this.receiveTimeout = timeout;
7482
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.Set;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.beans.factory.BeanNameAware;
2324
import org.springframework.beans.factory.InitializingBean;
2425
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -41,30 +42,41 @@
4142
*
4243
* @author Gary Russell
4344
* @author Artem Bilan
45+
*
4446
* @since 4.0
4547
*
4648
*/
47-
public class RedisChannelMessageStore implements ChannelMessageStore, BeanNameAware, InitializingBean {
49+
public class RedisChannelMessageStore
50+
implements ChannelMessageStore, BeanNameAware, InitializingBean, BeanClassLoaderAware {
4851

4952
private final RedisTemplate<Object, Message<?>> redisTemplate;
5053

51-
private volatile MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
52-
5354
private String beanName;
5455

56+
private MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
57+
58+
private boolean valueSerializerExplicitlySet;
59+
5560
/**
5661
* Construct a message store that uses Java Serialization for messages.
5762
*
5863
* @param connectionFactory The redis connection factory.
5964
*/
6065
public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
61-
this.redisTemplate = new RedisTemplate<Object, Message<?>>();
66+
this.redisTemplate = new RedisTemplate<>();
6267
this.redisTemplate.setConnectionFactory(connectionFactory);
6368
this.redisTemplate.setKeySerializer(new StringRedisSerializer());
6469
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
6570
this.redisTemplate.afterPropertiesSet();
6671
}
6772

73+
@Override
74+
public void setBeanClassLoader(ClassLoader classLoader) {
75+
if (!this.valueSerializerExplicitlySet) {
76+
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer(classLoader));
77+
}
78+
}
79+
6880
/**
6981
* Use a different serializer (default {@link JdkSerializationRedisSerializer} for
7082
* the {@link Message}.
@@ -74,6 +86,7 @@ public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
7486
public void setValueSerializer(RedisSerializer<?> valueSerializer) {
7587
Assert.notNull(valueSerializer, "'valueSerializer' must not be null");
7688
this.redisTemplate.setValueSerializer(valueSerializer);
89+
this.valueSerializerExplicitlySet = true;
7790
}
7891

7992
/**

0 commit comments

Comments
 (0)