Skip to content

Commit f89e8aa

Browse files
artembilangaryrussell
authored andcommitted
Fix Redis components for JDK deserialization
It turns out that `JdkSerializationRedisSerializer` by default is based on the default Java class loader which may lead into `ClassCastException` downstream after deserialization * Make all the `JdkSerializationRedisSerializer` usage (default) in Redis components based on the BeanFactory `ClassLoader` * Fix tests to call `setBeanClassLoader()` * Fix Mark Fisher's name in the `MultipartFileReader` :-) **Cherry-pick to 5.1.x, 5.0.x & 4.3.x after restoring diamonds**
1 parent 890cd1f commit f89e8aa

File tree

6 files changed

+91
-62
lines changed

6 files changed

+91
-62
lines changed

spring-integration-http/src/main/java/org/springframework/integration/http/multipart/MultipartFileReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
/**
2424
* Strategy for reading {@link MultipartFile} content.
2525
*
26-
* @author mark Fisher
26+
* @author Mark Fisher
27+
*
2728
* @since 2.0
2829
*/
2930
public interface MultipartFileReader<T> {

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java

+25-17
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
2425
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -51,7 +52,8 @@
5152
*/
5253
@ManagedResource
5354
@IntegrationManagedResource
54-
public class RedisQueueInboundGateway extends MessagingGatewaySupport implements ApplicationEventPublisherAware {
55+
public class RedisQueueInboundGateway extends MessagingGatewaySupport
56+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
5557

5658
private static final String QUEUE_NAME_SUFFIX = ".reply";
5759

@@ -65,24 +67,24 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport implements
6567

6668
private final BoundListOperations<String, byte[]> boundListOperations;
6769

68-
private volatile ApplicationEventPublisher applicationEventPublisher;
70+
private ApplicationEventPublisher applicationEventPublisher;
6971

70-
private volatile boolean serializerExplicitlySet;
72+
private boolean serializerExplicitlySet;
7173

72-
private volatile Executor taskExecutor;
74+
private Executor taskExecutor;
7375

74-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
76+
private RedisSerializer<?> serializer;
7577

76-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
78+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7779

78-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
80+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
81+
82+
private boolean extractPayload = true;
7983

8084
private volatile boolean active;
8185

8286
private volatile boolean listening;
8387

84-
private volatile boolean extractPayload = true;
85-
8688
private volatile Runnable stopCallback;
8789

8890
/**
@@ -92,7 +94,7 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport implements
9294
public RedisQueueInboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
9395
Assert.hasText(queueName, "'queueName' is required");
9496
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
95-
this.template = new RedisTemplate<String, byte[]>();
97+
this.template = new RedisTemplate<>();
9698
this.template.setConnectionFactory(connectionFactory);
9799
this.template.setEnableDefaultSerializer(false);
98100
this.template.setKeySerializer(new StringRedisSerializer());
@@ -109,12 +111,18 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
109111
this.applicationEventPublisher = applicationEventPublisher;
110112
}
111113

114+
@Override
115+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
116+
if (!this.serializerExplicitlySet) {
117+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
118+
}
119+
}
120+
112121
public void setSerializer(RedisSerializer<?> serializer) {
113122
this.serializer = serializer;
114123
this.serializerExplicitlySet = true;
115124
}
116125

117-
118126
/**
119127
* This timeout (milliseconds) is used when retrieving elements from the queue
120128
* specified by {@link #boundListOperations}.
@@ -146,11 +154,11 @@ protected void onInit() {
146154
Assert.notNull(this.serializer, "'serializer' has to be provided where 'extractPayload == false'.");
147155
}
148156
if (this.taskExecutor == null) {
149-
String beanName = this.getComponentName();
157+
String beanName = getComponentName();
150158
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-")
151-
+ this.getComponentType());
159+
+ getComponentType());
152160
}
153-
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
161+
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && getBeanFactory() != null) {
154162
MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler();
155163
errorHandler.setBeanFactory(getBeanFactory());
156164
errorHandler.setDefaultErrorChannel(getErrorChannel());
@@ -168,8 +176,8 @@ private void handlePopException(Exception e) {
168176
if (this.active) {
169177
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
170178
+ " milliseconds.", e);
171-
this.publishException(e);
172-
this.sleepBeforeRecoveryAttempt();
179+
publishException(e);
180+
sleepBeforeRecoveryAttempt();
173181
}
174182
else {
175183
logger.debug("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage());
@@ -178,7 +186,7 @@ private void handlePopException(Exception e) {
178186

179187
@SuppressWarnings("unchecked")
180188
private void receiveAndReply() {
181-
byte[] value = null;
189+
byte[] value;
182190
try {
183191
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
184192
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

+23-21
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.TimeUnit;
2222

23+
import org.springframework.beans.factory.BeanClassLoaderAware;
2324
import org.springframework.beans.factory.BeanFactory;
2425
import org.springframework.context.ApplicationEventPublisher;
2526
import org.springframework.context.ApplicationEventPublisherAware;
@@ -40,7 +41,6 @@
4041
import org.springframework.jmx.export.annotation.ManagedOperation;
4142
import org.springframework.jmx.export.annotation.ManagedResource;
4243
import org.springframework.messaging.Message;
43-
import org.springframework.messaging.MessageChannel;
4444
import org.springframework.messaging.MessagingException;
4545
import org.springframework.scheduling.SchedulingAwareRunnable;
4646
import org.springframework.util.Assert;
@@ -57,44 +57,45 @@
5757
*/
5858
@ManagedResource
5959
@IntegrationManagedResource
60-
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport implements ApplicationEventPublisherAware {
60+
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
61+
implements ApplicationEventPublisherAware, BeanClassLoaderAware {
6162

6263
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
6364

6465
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
6566

6667
private final BoundListOperations<String, byte[]> boundListOperations;
6768

68-
private volatile ApplicationEventPublisher applicationEventPublisher;
69+
private ApplicationEventPublisher applicationEventPublisher;
6970

70-
private volatile MessageChannel errorChannel;
71+
private Executor taskExecutor;
7172

72-
private volatile Executor taskExecutor;
73+
private RedisSerializer<?> serializer;
7374

74-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
75+
private boolean serializerExplicitlySet;
7576

76-
private volatile boolean expectMessage = false;
77+
private boolean expectMessage = false;
7778

78-
private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
79+
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7980

80-
private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
81+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
82+
83+
private boolean rightPop = true;
8184

8285
private volatile boolean active;
8386

8487
private volatile boolean listening;
8588

8689
private volatile Runnable stopCallback;
8790

88-
private volatile boolean rightPop = true;
89-
9091
/**
9192
* @param queueName Must not be an empty String
9293
* @param connectionFactory Must not be null
9394
*/
9495
public RedisQueueMessageDrivenEndpoint(String queueName, RedisConnectionFactory connectionFactory) {
9596
Assert.hasText(queueName, "'queueName' is required");
9697
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
97-
RedisTemplate<String, byte[]> template = new RedisTemplate<String, byte[]>();
98+
RedisTemplate<String, byte[]> template = new RedisTemplate<>();
9899
template.setConnectionFactory(connectionFactory);
99100
template.setEnableDefaultSerializer(false);
100101
template.setKeySerializer(new StringRedisSerializer());
@@ -107,17 +108,24 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
107108
this.applicationEventPublisher = applicationEventPublisher;
108109
}
109110

111+
@Override
112+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
113+
if (!this.serializerExplicitlySet) {
114+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
115+
}
116+
}
117+
110118
public void setSerializer(RedisSerializer<?> serializer) {
111119
this.serializer = serializer;
120+
this.serializerExplicitlySet = true;
112121
}
113122

114123
/**
115124
* When data is retrieved from the Redis queue, does the returned data represent
116125
* just the payload for a Message, or does the data represent a serialized
117126
* {@link Message}?. {@code expectMessage} defaults to false. This means
118127
* the retrieved data will be used as the payload for a new Spring Integration
119-
* Message. Otherwise, the data is deserialized as Spring Integration
120-
* Message.
128+
* Message. Otherwise, the data is deserialized as Spring Integration Message.
121129
* @param expectMessage Defaults to false
122130
*/
123131
public void setExpectMessage(boolean expectMessage) {
@@ -144,12 +152,6 @@ public void setTaskExecutor(Executor taskExecutor) {
144152
this.taskExecutor = taskExecutor;
145153
}
146154

147-
@Override
148-
public void setErrorChannel(MessageChannel errorChannel) {
149-
super.setErrorChannel(errorChannel);
150-
this.errorChannel = errorChannel;
151-
}
152-
153155
public void setRecoveryInterval(long recoveryInterval) {
154156
this.recoveryInterval = recoveryInterval;
155157
}
@@ -178,7 +180,7 @@ protected void onInit() {
178180
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && beanFactory != null) {
179181
MessagePublishingErrorHandler errorHandler =
180182
new MessagePublishingErrorHandler(ChannelResolverUtils.getChannelResolver(beanFactory));
181-
errorHandler.setDefaultErrorChannel(this.errorChannel);
183+
errorHandler.setDefaultErrorChannel(getErrorChannel());
182184
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
183185
}
184186
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisQueueOutboundGateway.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ public class RedisQueueOutboundGateway extends AbstractReplyProducingMessageHand
4646

4747
private static final IdGenerator defaultIdGenerator = new AlternativeJdkIdGenerator();
4848

49-
private static final RedisSerializer<String> stringSerializer = new StringRedisSerializer();
49+
private static final RedisSerializer<String> stringSerializer = new StringRedisSerializer();
5050

5151
private final RedisTemplate<String, Object> template = new RedisTemplate<>();
5252

5353
private final BoundListOperations<String, Object> boundListOps;
5454

55-
private volatile boolean extractPayload = true;
55+
private boolean extractPayload = true;
5656

57-
private volatile RedisSerializer<?> serializer = new JdkSerializationRedisSerializer();
57+
private RedisSerializer<?> serializer;
5858

59-
private volatile boolean serializerExplicitlySet;
59+
private boolean serializerExplicitlySet;
6060

61-
private volatile int receiveTimeout = TIMEOUT;
61+
private int receiveTimeout = TIMEOUT;
6262

6363
public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connectionFactory) {
6464
Assert.hasText(queueName, "'queueName' is required");
@@ -70,6 +70,14 @@ public RedisQueueOutboundGateway(String queueName, RedisConnectionFactory connec
7070
this.boundListOps = this.template.boundListOps(queueName);
7171
}
7272

73+
@Override
74+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
75+
super.setBeanClassLoader(beanClassLoader);
76+
if (!this.serializerExplicitlySet) {
77+
this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
78+
}
79+
}
80+
7381
public void setReceiveTimeout(int timeout) {
7482
this.receiveTimeout = timeout;
7583
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisChannelMessageStore.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.Set;
2121

22+
import org.springframework.beans.factory.BeanClassLoaderAware;
2223
import org.springframework.beans.factory.BeanNameAware;
2324
import org.springframework.beans.factory.InitializingBean;
2425
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -41,30 +42,41 @@
4142
*
4243
* @author Gary Russell
4344
* @author Artem Bilan
45+
*
4446
* @since 4.0
4547
*
4648
*/
47-
public class RedisChannelMessageStore implements ChannelMessageStore, BeanNameAware, InitializingBean {
49+
public class RedisChannelMessageStore
50+
implements ChannelMessageStore, BeanNameAware, InitializingBean, BeanClassLoaderAware {
4851

4952
private final RedisTemplate<Object, Message<?>> redisTemplate;
5053

51-
private volatile MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
52-
5354
private String beanName;
5455

56+
private MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory();
57+
58+
private boolean valueSerializerExplicitlySet;
59+
5560
/**
5661
* Construct a message store that uses Java Serialization for messages.
5762
*
5863
* @param connectionFactory The redis connection factory.
5964
*/
6065
public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
61-
this.redisTemplate = new RedisTemplate<Object, Message<?>>();
66+
this.redisTemplate = new RedisTemplate<>();
6267
this.redisTemplate.setConnectionFactory(connectionFactory);
6368
this.redisTemplate.setKeySerializer(new StringRedisSerializer());
6469
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
6570
this.redisTemplate.afterPropertiesSet();
6671
}
6772

73+
@Override
74+
public void setBeanClassLoader(ClassLoader classLoader) {
75+
if (!this.valueSerializerExplicitlySet) {
76+
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer(classLoader));
77+
}
78+
}
79+
6880
/**
6981
* Use a different serializer (default {@link JdkSerializationRedisSerializer} for
7082
* the {@link Message}.
@@ -74,6 +86,7 @@ public RedisChannelMessageStore(RedisConnectionFactory connectionFactory) {
7486
public void setValueSerializer(RedisSerializer<?> valueSerializer) {
7587
Assert.notNull(valueSerializer, "'valueSerializer' must not be null");
7688
this.redisTemplate.setValueSerializer(valueSerializer);
89+
this.valueSerializerExplicitlySet = true;
7790
}
7891

7992
/**

0 commit comments

Comments
 (0)