diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java index 97e26dc8e8f..38c196588d5 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; +import org.springframework.integration.mapping.AbstractHeaderMapper; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; @@ -416,6 +417,10 @@ protected final void doInit() { configureDelayGenerator(beanFactory); endpointInit(); + + if (this.headerMapper instanceof AbstractHeaderMapper) { + ((AbstractHeaderMapper) this.headerMapper).setBeanClassLoader(getBeanClassLoader()); + } } private void configureExchangeNameGenerator(BeanFactory beanFactory) { diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java index 0e2f2316597..17d89133b2e 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -109,40 +109,41 @@ protected Map extractStandardHeaders(MessageProperties amqpMessa Map headers = new HashMap<>(); try { JavaUtils.INSTANCE - .acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put) - .acceptIfNotNull(AmqpHeaders.CLUSTER_ID, amqpMessageProperties.getClusterId(), headers::put) - .acceptIfNotNull(AmqpHeaders.CONTENT_ENCODING, amqpMessageProperties.getContentEncoding(), - headers::put); + .acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put) + .acceptIfNotNull(AmqpHeaders.CLUSTER_ID, amqpMessageProperties.getClusterId(), headers::put) + .acceptIfNotNull(AmqpHeaders.CONTENT_ENCODING, amqpMessageProperties.getContentEncoding(), + headers::put); long contentLength = amqpMessageProperties.getContentLength(); JavaUtils.INSTANCE - .acceptIfCondition(contentLength > 0, AmqpHeaders.CONTENT_LENGTH, contentLength, headers::put) - .acceptIfHasText(AmqpHeaders.CONTENT_TYPE, amqpMessageProperties.getContentType(), headers::put) - .acceptIfHasText(AmqpHeaders.CORRELATION_ID, amqpMessageProperties.getCorrelationId(), headers::put) - .acceptIfNotNull(AmqpHeaders.RECEIVED_DELIVERY_MODE, amqpMessageProperties.getReceivedDeliveryMode(), - headers::put); + .acceptIfCondition(contentLength > 0, AmqpHeaders.CONTENT_LENGTH, contentLength, headers::put) + .acceptIfHasText(AmqpHeaders.CONTENT_TYPE, amqpMessageProperties.getContentType(), headers::put) + .acceptIfHasText(AmqpHeaders.CORRELATION_ID, amqpMessageProperties.getCorrelationId(), headers::put) + .acceptIfNotNull(AmqpHeaders.RECEIVED_DELIVERY_MODE, + amqpMessageProperties.getReceivedDeliveryMode(), headers::put); long deliveryTag = amqpMessageProperties.getDeliveryTag(); JavaUtils.INSTANCE - .acceptIfCondition(deliveryTag > 0, AmqpHeaders.DELIVERY_TAG, deliveryTag, headers::put) - .acceptIfHasText(AmqpHeaders.EXPIRATION, amqpMessageProperties.getExpiration(), headers::put); + .acceptIfCondition(deliveryTag > 0, AmqpHeaders.DELIVERY_TAG, deliveryTag, headers::put) + .acceptIfHasText(AmqpHeaders.EXPIRATION, amqpMessageProperties.getExpiration(), headers::put); Integer messageCount = amqpMessageProperties.getMessageCount(); JavaUtils.INSTANCE - .acceptIfCondition(messageCount != null && messageCount > 0, AmqpHeaders.MESSAGE_COUNT, messageCount, - headers::put) - .acceptIfHasText(AmqpHeaders.MESSAGE_ID, amqpMessageProperties.getMessageId(), headers::put); + .acceptIfCondition(messageCount != null && messageCount > 0, AmqpHeaders.MESSAGE_COUNT, + messageCount, headers::put) + .acceptIfHasText(AmqpHeaders.MESSAGE_ID, amqpMessageProperties.getMessageId(), headers::put); Integer priority = amqpMessageProperties.getPriority(); JavaUtils.INSTANCE - .acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY, - priority, headers::put) - .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put) - .acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(), - headers::put) - .acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(), - headers::put) - .acceptIfNotNull(AmqpHeaders.REDELIVERED, amqpMessageProperties.isRedelivered(), headers::put) - .acceptIfNotNull(AmqpHeaders.REPLY_TO, amqpMessageProperties.getReplyTo(), headers::put) - .acceptIfNotNull(AmqpHeaders.TIMESTAMP, amqpMessageProperties.getTimestamp(), headers::put) - .acceptIfHasText(AmqpHeaders.TYPE, amqpMessageProperties.getType(), headers::put) - .acceptIfHasText(AmqpHeaders.RECEIVED_USER_ID, amqpMessageProperties.getReceivedUserId(), headers::put); + .acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY, + priority, headers::put) + .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put) + .acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(), + headers::put) + .acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(), + headers::put) + .acceptIfNotNull(AmqpHeaders.REDELIVERED, amqpMessageProperties.isRedelivered(), headers::put) + .acceptIfNotNull(AmqpHeaders.REPLY_TO, amqpMessageProperties.getReplyTo(), headers::put) + .acceptIfNotNull(AmqpHeaders.TIMESTAMP, amqpMessageProperties.getTimestamp(), headers::put) + .acceptIfHasText(AmqpHeaders.TYPE, amqpMessageProperties.getType(), headers::put) + .acceptIfHasText(AmqpHeaders.RECEIVED_USER_ID, + amqpMessageProperties.getReceivedUserId(), headers::put); for (String jsonHeader : JsonHeaders.HEADERS) { Object value = amqpMessageProperties.getHeaders().get(jsonHeader.replaceFirst(JsonHeaders.PREFIX, "")); @@ -151,6 +152,7 @@ protected Map extractStandardHeaders(MessageProperties amqpMessa } } + createJsonResolvableTypHeaderInAny(headers); } catch (Exception e) { if (logger.isWarnEnabled()) { @@ -160,6 +162,15 @@ protected Map extractStandardHeaders(MessageProperties amqpMessa return headers; } + private void createJsonResolvableTypHeaderInAny(Map headers) { + Object typeIdHeader = headers.get(JsonHeaders.TYPE_ID); + if (typeIdHeader != null) { + headers.put(JsonHeaders.RESOLVABLE_TYPE, + JsonHeaders.buildResolvableType(getClassLoader(), typeIdHeader, + headers.get(JsonHeaders.CONTENT_TYPE_ID), headers.get(JsonHeaders.KEY_TYPE_ID))); + } + } + /** * Extract user-defined headers from an AMQP MessageProperties instance. */ @@ -186,28 +197,28 @@ protected void populateStandardHeaders(@Nullable Map allHeaders, MessageProperties amqpMessageProperties) { JavaUtils.INSTANCE - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class), - amqpMessageProperties::setAppId) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class), - amqpMessageProperties::setClusterId) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class), - amqpMessageProperties::setContentEncoding) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class), - amqpMessageProperties::setContentLength) - .acceptIfHasText(this.extractContentTypeAsString(headers), - amqpMessageProperties::setContentType) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CORRELATION_ID, String.class), - amqpMessageProperties::setCorrelationId) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), - amqpMessageProperties::setDelay) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), - amqpMessageProperties::setDeliveryMode) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class), - amqpMessageProperties::setDeliveryTag) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class), - amqpMessageProperties::setExpiration) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class), - amqpMessageProperties::setMessageCount); + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class), + amqpMessageProperties::setAppId) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class), + amqpMessageProperties::setClusterId) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class), + amqpMessageProperties::setContentEncoding) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class), + amqpMessageProperties::setContentLength) + .acceptIfHasText(this.extractContentTypeAsString(headers), + amqpMessageProperties::setContentType) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CORRELATION_ID, String.class), + amqpMessageProperties::setCorrelationId) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), + amqpMessageProperties::setDelay) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), + amqpMessageProperties::setDeliveryMode) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class), + amqpMessageProperties::setDeliveryTag) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class), + amqpMessageProperties::setExpiration) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class), + amqpMessageProperties::setMessageCount); String messageId = getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_ID, String.class); if (StringUtils.hasText(messageId)) { amqpMessageProperties.setMessageId(messageId); @@ -219,16 +230,17 @@ else if (allHeaders != null) { } } JavaUtils.INSTANCE - .acceptIfNotNull(getHeaderIfAvailable(headers, IntegrationMessageHeaderAccessor.PRIORITY, Integer.class), - amqpMessageProperties::setPriority) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class), - amqpMessageProperties::setReceivedExchange) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class), - amqpMessageProperties::setReceivedRoutingKey) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class), - amqpMessageProperties::setRedelivered) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class), - amqpMessageProperties::setReplyTo); + .acceptIfNotNull(getHeaderIfAvailable(headers, IntegrationMessageHeaderAccessor.PRIORITY, + Integer.class), + amqpMessageProperties::setPriority) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class), + amqpMessageProperties::setReceivedExchange) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class), + amqpMessageProperties::setReceivedRoutingKey) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class), + amqpMessageProperties::setRedelivered) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class), + amqpMessageProperties::setReplyTo); Date timestamp = getHeaderIfAvailable(headers, AmqpHeaders.TIMESTAMP, Date.class); if (timestamp != null) { amqpMessageProperties.setTimestamp(timestamp); @@ -240,18 +252,19 @@ else if (allHeaders != null) { } } JavaUtils.INSTANCE - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class), - amqpMessageProperties::setType) - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class), - amqpMessageProperties::setUserId); + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class), + amqpMessageProperties::setType) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class), + amqpMessageProperties::setUserId); mapJsonHeaders(headers, amqpMessageProperties); JavaUtils.INSTANCE - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_CORRELATION, String.class), - replyCorrelation -> amqpMessageProperties.setHeader("spring_reply_correlation", replyCorrelation)) - .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_TO_STACK, String.class), - replyToStack -> amqpMessageProperties.setHeader("spring_reply_to", replyToStack)); + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_CORRELATION, String.class), + replyCorrelation -> amqpMessageProperties + .setHeader("spring_reply_correlation", replyCorrelation)) + .acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_TO_STACK, String.class), + replyToStack -> amqpMessageProperties.setHeader("spring_reply_to", replyToStack)); } private void mapJsonHeaders(Map headers, MessageProperties amqpMessageProperties) { diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java index cd737892ccd..038d50d43a1 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,10 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.util.Date; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.amqp.core.Message; @@ -38,10 +39,13 @@ import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.beans.factory.BeanFactory; +import org.springframework.core.ResolvableType; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.mapping.support.JsonHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; @@ -50,6 +54,7 @@ /** * @author Gary Russell * @author Artem Bilan + * * @since 3.0 */ public class OutboundEndpointTests { @@ -94,9 +99,9 @@ public void testAsyncDelayExpression() { new SimpleMessageListenerContainer(connectionFactory), "replyTo")); amqpTemplate.setTaskScheduler(mock(TaskScheduler.class)); AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate); - willAnswer( - invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2))) - .given(amqpTemplate).sendAndReceive(anyString(), anyString(), any(Message.class)); + willAnswer(invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2))) + .given(amqpTemplate) + .sendAndReceive(anyString(), anyString(), any(Message.class)); gateway.setExchangeName("foo"); gateway.setRoutingKey("bar"); gateway.setDelayExpressionString("42"); @@ -158,6 +163,33 @@ public void testHeaderMapperWinsGateway() { assertThat(amqpMessage.get().getMessageProperties().getHeaders().get(MessageHeaders.REPLY_CHANNEL)).isNull(); } + @Test + public void testReplyHeadersWin() { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + TestRabbitTemplate amqpTemplate = spy(new TestRabbitTemplate(connectionFactory)); + amqpTemplate.setUseTemporaryReplyQueues(true); + AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate); + endpoint.setExpectReply(true); + willAnswer(invocation -> + org.springframework.amqp.core.MessageBuilder.withBody(new byte[0]) + .setHeader(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, String.class.getName()) + .build() + ).given(amqpTemplate) + .doSendAndReceiveWithTemporary(isNull(), isNull(), any(Message.class), isNull()); + QueueChannel replyChannel = new QueueChannel(); + org.springframework.messaging.Message message = MessageBuilder.withPayload("foo") + .setHeader(JsonHeaders.RESOLVABLE_TYPE, ResolvableType.forClass(Date.class)) + .setReplyChannel(replyChannel) + .build(); + endpoint.handleMessage(message); + org.springframework.messaging.Message receive = replyChannel.receive(10_000); + assertThat(receive).isNotNull(); + assertThat(receive.getHeaders()) + .containsEntry(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, String.class.getName()) + .containsEntry(JsonHeaders.TYPE_ID, String.class.getName()) + .containsEntry(JsonHeaders.RESOLVABLE_TYPE, ResolvableType.forClass(String.class)); + } + /** * Increase method visibility */ diff --git a/spring-integration-core/src/main/java/org/springframework/integration/json/JsonToObjectTransformer.java b/spring-integration-core/src/main/java/org/springframework/integration/json/JsonToObjectTransformer.java index aac982fcdda..463ca063298 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/json/JsonToObjectTransformer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/json/JsonToObjectTransformer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +29,13 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; /** * Transformer implementation that converts a JSON string payload into an instance of the * provided target Class. By default this transformer uses * {@linkplain org.springframework.integration.support.json.JsonObjectMapperProvider} - * factory to get an instance of Jackson 1 or Jackson 2 JSON-processor - * {@linkplain JsonObjectMapper} implementation depending on the jackson-databind or - * jackson-mapper-asl libs on the classpath. Any other {@linkplain JsonObjectMapper} + * factory to get an instance of Jackson JSON-processor + * if jackson-databind lib is present on the classpath. Any other {@linkplain JsonObjectMapper} * implementation can be provided. *

Since version 3.0, you can omit the target class and the target type can be * determined by the {@link JsonHeaders} type entries - including the contents of a @@ -47,10 +45,11 @@ * @author Mark Fisher * @author Artem Bilan * + * @since 2.0 + * * @see JsonObjectMapper * @see org.springframework.integration.support.json.JsonObjectMapperProvider * - * @since 2.0 */ public class JsonToObjectTransformer extends AbstractTransformer implements BeanClassLoaderAware { @@ -149,37 +148,13 @@ private ResolvableType obtainResolvableTypeFromHeadersIfAny(MessageHeaders heade Object valueType = headers.get(JsonHeaders.RESOLVABLE_TYPE); Object typeIdHeader = headers.get(JsonHeaders.TYPE_ID); if (!(valueType instanceof ResolvableType) && typeIdHeader != null) { - Class targetClass = getClassForValue(typeIdHeader); - Class contentClass = null; - Class keyClass = null; - Object contentTypeHeader = headers.get(JsonHeaders.CONTENT_TYPE_ID); - if (contentTypeHeader != null) { - contentClass = getClassForValue(contentTypeHeader); - } - Object keyTypeHeader = headers.get(JsonHeaders.KEY_TYPE_ID); - if (keyTypeHeader != null) { - keyClass = getClassForValue(keyTypeHeader); - } - - valueType = JsonObjectMapper.buildResolvableType(targetClass, contentClass, keyClass); + valueType = + JsonHeaders.buildResolvableType(this.classLoader, typeIdHeader, + headers.get(JsonHeaders.CONTENT_TYPE_ID), headers.get(JsonHeaders.KEY_TYPE_ID)); } return valueType instanceof ResolvableType ? (ResolvableType) valueType : null; } - private Class getClassForValue(Object classValue) { - if (classValue instanceof Class) { - return (Class) classValue; - } - else { - try { - return ClassUtils.forName(classValue.toString(), this.classLoader); - } - catch (ClassNotFoundException | LinkageError e) { - throw new IllegalStateException(e); - } - } - } - } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/mapping/AbstractHeaderMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/mapping/AbstractHeaderMapper.java index dba98849619..b583abc296a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/mapping/AbstractHeaderMapper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/mapping/AbstractHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,10 +28,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.PatternMatchUtils; import org.springframework.util.StringUtils; @@ -43,9 +45,11 @@ * @author Oleg Zhurakousky * @author Stephane Nicoll * @author Gary Russell + * @author Artem Bilan + * * @since 2.1 */ -public abstract class AbstractHeaderMapper implements RequestReplyHeaderMapper { +public abstract class AbstractHeaderMapper implements RequestReplyHeaderMapper, BeanClassLoaderAware { /** * A special pattern that only matches standard request headers. @@ -74,9 +78,11 @@ public abstract class AbstractHeaderMapper implements RequestReplyHeaderMappe private final Collection replyHeaderNames; - private volatile HeaderMatcher requestHeaderMatcher; + private HeaderMatcher requestHeaderMatcher; + + private HeaderMatcher replyHeaderMatcher; - private volatile HeaderMatcher replyHeaderMatcher; + private ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); /** * Create a new instance. @@ -96,6 +102,15 @@ protected AbstractHeaderMapper(String standardHeaderPrefix, this.replyHeaderMatcher = createDefaultHeaderMatcher(this.standardHeaderPrefix, this.replyHeaderNames); } + @Override + public void setBeanClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + } + + protected ClassLoader getClassLoader() { + return this.classLoader; + } + /** * Provide the header names that should be mapped from a request * to a {@link MessageHeaders}. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/mapping/support/JsonHeaders.java b/spring-integration-core/src/main/java/org/springframework/integration/mapping/support/JsonHeaders.java index 7d33d825702..b011eb3be89 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/mapping/support/JsonHeaders.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/mapping/support/JsonHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,11 @@ import java.util.Collection; import java.util.Collections; +import org.springframework.core.ResolvableType; +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; + /** * Pre-defined names and prefixes to be used for setting and/or retrieving JSON * entries from/to Message Headers and other adapter, e.g. AMQP. @@ -43,7 +48,7 @@ private JsonHeaders() { public static final String KEY_TYPE_ID = PREFIX + "__KeyTypeId__"; /** - * The header to represent a {@link org.springframework.core.ResolvableType} + * The header to represent a {@link ResolvableType} * for the target deserialized object. * @since 5.2 */ @@ -52,4 +57,70 @@ private JsonHeaders() { public static final Collection HEADERS = Collections.unmodifiableList(Arrays.asList(TYPE_ID, CONTENT_TYPE_ID, KEY_TYPE_ID, RESOLVABLE_TYPE)); + /** + * Build a {@link ResolvableType} for provided class components. + * @param classLoader a {@link ClassLoader} t load classes for components if needed. + * @param targetClassValue the class representation object. + * @param contentClassValue the collection element (or map value) class representation object. + * @param keyClassValue the map key class representation object. + * @return the {@link ResolvableType} based on provided class components + * @since 5.2.4 + */ + public static ResolvableType buildResolvableType(ClassLoader classLoader, Object targetClassValue, + @Nullable Object contentClassValue, @Nullable Object keyClassValue) { + + Class targetClass = getClassForValue(classLoader, targetClassValue); + Class keyClass = getClassForValue(classLoader, keyClassValue); + Class contentClass = getClassForValue(classLoader, contentClassValue); + + return buildResolvableType(targetClass, contentClass, keyClass); + } + + @Nullable + private static Class getClassForValue(ClassLoader classLoader, @Nullable Object classValue) { + if (classValue instanceof Class) { + return (Class) classValue; + } + else if (classValue != null) { + try { + return ClassUtils.forName(classValue.toString(), classLoader); + } + catch (ClassNotFoundException | LinkageError e) { + throw new IllegalStateException(e); + } + } + else { + return null; + } + } + + /** + * Build a {@link ResolvableType} for provided class components. + * @param targetClass the class to use. + * @param contentClass the collection element (or map value) class. + * @param keyClass the map key class. + * @return the {@link ResolvableType} based on provided class components + * @since 5.2.4 + */ + public static ResolvableType buildResolvableType(Class targetClass, @Nullable Class contentClass, + @Nullable Class keyClass) { + + if (keyClass != null) { + return TypeDescriptor + .map(targetClass, + TypeDescriptor.valueOf(keyClass), + TypeDescriptor.valueOf(contentClass)) + .getResolvableType(); + } + else if (contentClass != null) { + return TypeDescriptor + .collection(targetClass, + TypeDescriptor.valueOf(contentClass)) + .getResolvableType(); + } + else { + return ResolvableType.forClass(targetClass); + } + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/json/JsonObjectMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/support/json/JsonObjectMapper.java index 4f2b1605ba7..8c57ebeaf92 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/json/JsonObjectMapper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/json/JsonObjectMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +23,7 @@ import java.util.Map; import org.springframework.core.ResolvableType; -import org.springframework.core.convert.TypeDescriptor; import org.springframework.integration.mapping.support.JsonHeaders; -import org.springframework.lang.Nullable; /** * Strategy interface to convert an Object to/from the JSON representation. @@ -102,29 +100,7 @@ default void populateJavaTypes(Map map, Object object) { } } - - map.put(JsonHeaders.RESOLVABLE_TYPE, buildResolvableType(targetClass, contentClass, keyClass)); - } - - static ResolvableType buildResolvableType(Class targetClass, @Nullable Class contentClass, - @Nullable Class keyClass) { - - if (keyClass != null) { - return TypeDescriptor - .map(targetClass, - TypeDescriptor.valueOf(keyClass), - TypeDescriptor.valueOf(contentClass)) - .getResolvableType(); - } - else if (contentClass != null) { - return TypeDescriptor - .collection(targetClass, - TypeDescriptor.valueOf(contentClass)) - .getResolvableType(); - } - else { - return ResolvableType.forClass(targetClass); - } + map.put(JsonHeaders.RESOLVABLE_TYPE, JsonHeaders.buildResolvableType(targetClass, contentClass, keyClass)); } }