Skip to content

Commit c601324

Browse files
committed
Start version 2.3
1 parent 213d793 commit c601324

File tree

65 files changed

+943
-1333
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+943
-1333
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ ext {
3737
jacksonVersion = '2.9.9'
3838
servletApiVersion = '4.0.1'
3939
log4jVersion = '2.11.2'
40-
springCloudAwsVersion = '2.1.1.RELEASE'
41-
springIntegrationVersion = '5.1.6.RELEASE'
40+
springCloudAwsVersion = '2.2.0.BUILD-SNAPSHOT'
41+
springIntegrationVersion = '5.2.0.BUILD-SNAPSHOT'
4242
kinesisClientVersion = '1.10.0'
4343
kinesisProducerVersion = '0.12.11'
4444

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=2.2.1.BUILD-SNAPSHOT
1+
version=2.3.0.BUILD-SNAPSHOT

src/main/java/org/springframework/integration/aws/config/xml/AwsNamespaceHandler.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@
2323
*
2424
* @author Amol Nayak
2525
* @author Artem Bilan
26-
*
2726
* @since 0.5
2827
*/
2928
public class AwsNamespaceHandler extends AbstractIntegrationNamespaceHandler {
3029

31-
3230
public void init() {
3331
registerBeanDefinitionParser("s3-outbound-channel-adapter", new S3OutboundChannelAdapterParser());
3432
registerBeanDefinitionParser("s3-outbound-gateway", new S3OutboundGatewayParser());
3533
registerBeanDefinitionParser("s3-inbound-channel-adapter", new S3InboundChannelAdapterParser());
36-
registerBeanDefinitionParser("s3-inbound-streaming-channel-adapter", new S3StreamingInboundChannelAdapterParser());
34+
registerBeanDefinitionParser("s3-inbound-streaming-channel-adapter",
35+
new S3StreamingInboundChannelAdapterParser());
3736
registerBeanDefinitionParser("sqs-outbound-channel-adapter", new SqsOutboundChannelAdapterParser());
3837
registerBeanDefinitionParser("sqs-message-driven-channel-adapter", new SqsMessageDrivenChannelAdapterParser());
3938
registerBeanDefinitionParser("sns-inbound-channel-adapter", new SnsInboundChannelAdapterParser());

src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,11 @@ private AwsParserUtils() {
5858
super();
5959
}
6060

61-
static void populateExpressionAttribute(String attributeName, BeanDefinitionBuilder builder,
62-
Element element, ParserContext parserContext) {
61+
static void populateExpressionAttribute(String attributeName, BeanDefinitionBuilder builder, Element element,
62+
ParserContext parserContext) {
6363

64-
BeanDefinition beanDefinition =
65-
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(attributeName,
66-
attributeName + "-expression", parserContext, element, false);
64+
BeanDefinition beanDefinition = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
65+
attributeName, attributeName + "-expression", parserContext, element, false);
6766
if (beanDefinition != null) {
6867
builder.addPropertyValue(Conventions.attributeNameToPropertyName(attributeName) + "Expression",
6968
beanDefinition);

src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ public class S3OutboundChannelAdapterParser extends AbstractOutboundChannelAdapt
3131

3232
@Override
3333
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
34-
AbstractBeanDefinition beanDefinition = new S3OutboundGatewayParser()
35-
.parseHandler(element, parserContext)
34+
AbstractBeanDefinition beanDefinition = new S3OutboundGatewayParser().parseHandler(element, parserContext)
3635
.getBeanDefinition();
3736
beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(2, false);
3837
return beanDefinition;

src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,15 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
5050
.error("One and only of 's3' and 'transfer-manager' attributes must be provided", element);
5151
}
5252

53-
BeanDefinition bucketExpression =
54-
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("bucket", "bucket-expression",
55-
parserContext, element, true);
53+
BeanDefinition bucketExpression = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
54+
"bucket", "bucket-expression", parserContext, element, true);
5655

5756
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(S3MessageHandler.class)
58-
.addConstructorArgReference(hasS3 ? s3 : transferManager)
59-
.addConstructorArgValue(bucketExpression)
57+
.addConstructorArgReference(hasS3 ? s3 : transferManager).addConstructorArgValue(bucketExpression)
6058
.addConstructorArgValue(true);
6159

62-
BeanDefinition commandExpression =
63-
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("command",
64-
"command-expression", parserContext, element, false);
60+
BeanDefinition commandExpression = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
61+
"command", "command-expression", parserContext, element, false);
6562

6663
if (commandExpression != null) {
6764
builder.addPropertyValue("commandExpression", commandExpression);
@@ -70,27 +67,26 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
7067
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "progress-listener");
7168
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "upload-metadata-provider");
7269

73-
BeanDefinition keyExpression =
74-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("key-expression", element);
70+
BeanDefinition keyExpression = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("key-expression",
71+
element);
7572
if (keyExpression != null) {
7673
builder.addPropertyValue("keyExpression", keyExpression);
7774
}
7875

79-
BeanDefinition objectAclExpression =
80-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("object-acl-expression", element);
76+
BeanDefinition objectAclExpression = IntegrationNamespaceUtils
77+
.createExpressionDefIfAttributeDefined("object-acl-expression", element);
8178
if (objectAclExpression != null) {
8279
builder.addPropertyValue("objectAclExpression", objectAclExpression);
8380
}
8481

85-
BeanDefinition destinationBucketExpression =
86-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("destination-bucket-expression",
87-
element);
82+
BeanDefinition destinationBucketExpression = IntegrationNamespaceUtils
83+
.createExpressionDefIfAttributeDefined("destination-bucket-expression", element);
8884
if (destinationBucketExpression != null) {
8985
builder.addPropertyValue("destinationBucketExpression", destinationBucketExpression);
9086
}
9187

92-
BeanDefinition destinationKeyExpression =
93-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("destination-key-expression", element);
88+
BeanDefinition destinationKeyExpression = IntegrationNamespaceUtils
89+
.createExpressionDefIfAttributeDefined("destination-key-expression", element);
9490
if (destinationKeyExpression != null) {
9591
builder.addPropertyValue("destinationKeyExpression", destinationKeyExpression);
9692
}

src/main/java/org/springframework/integration/aws/config/xml/S3StreamingInboundChannelAdapterParser.java

-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
*
3333
* @author Christian Tzolov
3434
* @author Artem Bilan
35-
*
3635
* @since 1.1
3736
*/
3837
public class S3StreamingInboundChannelAdapterParser extends AbstractRemoteFileStreamingInboundChannelAdapterParser {

src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ protected String resolveId(Element element, AbstractBeanDefinition definition, P
4747
String id = super.resolveId(element, definition, parserContext);
4848

4949
if (!element.hasAttribute("channel")) {
50-
// the created channel will get the 'id', so the adapter's bean name includes a suffix
50+
// the created channel will get the 'id', so the adapter's bean name includes
51+
// a suffix
5152
id = id + ".adapter";
5253
}
5354
if (!StringUtils.hasText(id)) {
@@ -71,8 +72,8 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
7172
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout", "requestTimeout");
7273
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.AUTO_STARTUP);
7374
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.PHASE);
74-
BeanDefinition payloadExpressionDef =
75-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("payload-expression", element);
75+
BeanDefinition payloadExpressionDef = IntegrationNamespaceUtils
76+
.createExpressionDefIfAttributeDefined("payload-expression", element);
7677
if (payloadExpressionDef != null) {
7778
builder.addPropertyValue("payloadExpression", payloadExpressionDef);
7879
}

src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ public class SnsOutboundChannelAdapterParser extends AbstractOutboundChannelAdap
3737
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
3838
String sns = element.getAttribute(AwsParserUtils.SNS_REF);
3939

40-
BeanDefinitionBuilder builder =
41-
BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class)
42-
.addConstructorArgReference(sns);
40+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class)
41+
.addConstructorArgReference(sns);
4342

4443
AwsParserUtils.populateExpressionAttribute("topic-arn", builder, element, parserContext);
4544
AwsParserUtils.populateExpressionAttribute("subject", builder, element, parserContext);
4645

47-
BeanDefinition message =
48-
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression", element);
46+
BeanDefinition message = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression",
47+
element);
4948
if (message != null) {
5049
builder.addPropertyValue("bodyExpression", message);
5150
}

src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ protected String resolveId(Element element, AbstractBeanDefinition definition, P
4747
String id = super.resolveId(element, definition, parserContext);
4848

4949
if (!element.hasAttribute("channel")) {
50-
// the created channel will get the 'id', so the adapter's bean name includes a suffix
50+
// the created channel will get the 'id', so the adapter's bean name includes
51+
// a suffix
5152
id = id + ".adapter";
5253
}
5354
if (!StringUtils.hasText(id)) {

src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public S3InboundFileSynchronizer(AmazonS3 amazonS3) {
4747
}
4848

4949
/**
50-
* Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances.
50+
* Create a synchronizer with the {@link SessionFactory} used to acquire
51+
* {@link Session} instances.
5152
* @param sessionFactory The session factory.
5253
*/
5354
public S3InboundFileSynchronizer(SessionFactory<S3ObjectSummary> sessionFactory) {

src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import com.amazonaws.services.s3.model.S3ObjectSummary;
2626

2727
/**
28-
* A {@link org.springframework.integration.core.MessageSource} implementation for the Amazon S3.
28+
* A {@link org.springframework.integration.core.MessageSource} implementation for the
29+
* Amazon S3.
2930
*
3031
* @author Artem Bilan
3132
*/
@@ -41,7 +42,6 @@ public S3InboundFileSynchronizingMessageSource(AbstractInboundFileSynchronizer<S
4142
super(synchronizer, comparator);
4243
}
4344

44-
4545
public String getComponentType() {
4646
return "aws:s3-inbound-channel-adapter";
4747
}

src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
*
3737
* @author Christian Tzolov
3838
* @author Artem Bilan
39-
*
4039
* @since 1.1
4140
*/
4241
public class S3StreamingMessageSource extends AbstractRemoteFileStreamingMessageSource<S3ObjectSummary> {
@@ -55,9 +54,7 @@ public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template,
5554

5655
@Override
5756
protected List<AbstractFileInfo<S3ObjectSummary>> asFileInfoList(Collection<S3ObjectSummary> collection) {
58-
return collection.stream()
59-
.map(S3FileInfo::new)
60-
.collect(Collectors.toList());
57+
return collection.stream().map(S3FileInfo::new).collect(Collectors.toList());
6158
}
6259

6360
@Override

src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java

+23-22
Original file line numberDiff line numberDiff line change
@@ -46,27 +46,29 @@
4646
import com.fasterxml.jackson.databind.JsonNode;
4747

4848
/**
49-
* The {@link HttpRequestHandlingMessagingGateway} extension for the Amazon WS SNS HTTP(S) endpoints.
50-
* Accepts all {@code x-amz-sns-message-type}s, converts the received Topic JSON message to the
51-
* {@link Map} using {@link MappingJackson2HttpMessageConverter} and send it to the provided
52-
* {@link #getRequestChannel()} as {@link Message} {@code payload}.
49+
* The {@link HttpRequestHandlingMessagingGateway} extension for the Amazon WS SNS HTTP(S)
50+
* endpoints. Accepts all {@code x-amz-sns-message-type}s, converts the received Topic
51+
* JSON message to the {@link Map} using {@link MappingJackson2HttpMessageConverter} and
52+
* send it to the provided {@link #getRequestChannel()} as {@link Message}
53+
* {@code payload}.
5354
* <p>
54-
* The mapped url must be configured inside the Amazon Web Service platform as a subscription.
55-
* Before receiving any notification itself this HTTP endpoint must confirm the subscription.
55+
* The mapped url must be configured inside the Amazon Web Service platform as a
56+
* subscription. Before receiving any notification itself this HTTP endpoint must confirm
57+
* the subscription.
5658
* <p>
5759
* The {@link #handleNotificationStatus} flag (defaults to {@code false}) indicates that
5860
* this endpoint should send the {@code SubscriptionConfirmation/UnsubscribeConfirmation}
59-
* messages to the the provided {@link #getRequestChannel()}.
60-
* If that, the {@link AwsHeaders#NOTIFICATION_STATUS} header is populated
61-
* with the {@link NotificationStatus} value. In that case it is a responsibility of
62-
* the application to {@link NotificationStatus#confirmSubscription()} or not.
61+
* messages to the the provided {@link #getRequestChannel()}. If that, the
62+
* {@link AwsHeaders#NOTIFICATION_STATUS} header is populated with the
63+
* {@link NotificationStatus} value. In that case it is a responsibility of the
64+
* application to {@link NotificationStatus#confirmSubscription()} or not.
6365
* <p>
64-
* By default this endpoint just does {@link NotificationStatus#confirmSubscription()}
65-
* for the {@code SubscriptionConfirmation} message type.
66-
* And does nothing for the {@code UnsubscribeConfirmation}.
66+
* By default this endpoint just does {@link NotificationStatus#confirmSubscription()} for
67+
* the {@code SubscriptionConfirmation} message type. And does nothing for the
68+
* {@code UnsubscribeConfirmation}.
6769
* <p>
68-
* For the convenience on the underlying message flow routing a {@link AwsHeaders#SNS_MESSAGE_TYPE}
69-
* header is present.
70+
* For the convenience on the underlying message flow routing a
71+
* {@link AwsHeaders#SNS_MESSAGE_TYPE} header is present.
7072
*
7173
* @author Artem Bilan
7274
* @author Kamil Przerwa
@@ -75,8 +77,7 @@ public class SnsInboundChannelAdapter extends HttpRequestHandlingMessagingGatewa
7577

7678
private final NotificationStatusResolver notificationStatusResolver;
7779

78-
private final MappingJackson2HttpMessageConverter jackson2HttpMessageConverter =
79-
new MappingJackson2HttpMessageConverter();
80+
private final MappingJackson2HttpMessageConverter jackson2HttpMessageConverter = new MappingJackson2HttpMessageConverter();
8081

8182
private volatile boolean handleNotificationStatus;
8283

@@ -94,8 +95,8 @@ public SnsInboundChannelAdapter(AmazonSNS amazonSns, String... path) {
9495
requestMapping.setMethods(HttpMethod.POST);
9596
requestMapping.setHeaders("x-amz-sns-message-type");
9697
requestMapping.setPathPatterns(path);
97-
this.jackson2HttpMessageConverter.setSupportedMediaTypes(
98-
Arrays.asList(MediaType.APPLICATION_JSON_UTF8, MediaType.TEXT_PLAIN));
98+
this.jackson2HttpMessageConverter
99+
.setSupportedMediaTypes(Arrays.asList(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN));
99100
super.setRequestMapping(requestMapping);
100101
super.setStatusCodeExpression(new ValueExpression<>(HttpStatus.NO_CONTENT));
101102
super.setMessageConverters(Collections.singletonList(this.jackson2HttpMessageConverter));
@@ -148,8 +149,9 @@ protected void send(Object object) {
148149
return;
149150
}
150151
}
151-
messageToSendBuilder.setHeader(AwsHeaders.SNS_MESSAGE_TYPE, type)
152-
.setHeader(AwsHeaders.MESSAGE_ID, payload.get("MessageId"));
152+
messageToSendBuilder.setHeader(AwsHeaders.SNS_MESSAGE_TYPE, type).setHeader(AwsHeaders.MESSAGE_ID,
153+
payload.get("MessageId"));
154+
153155
super.send(messageToSendBuilder.build());
154156
}
155157

@@ -203,7 +205,6 @@ public void setStatusCodeExpression(Expression statusCodeExpression) {
203205
throw new UnsupportedOperationException();
204206
}
205207

206-
207208
private static class NotificationStatusResolver extends NotificationStatusHandlerMethodArgumentResolver {
208209

209210
NotificationStatusResolver(AmazonSNS amazonSns) {

src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@
4545
import com.amazonaws.services.sqs.AmazonSQSAsync;
4646

4747
/**
48-
* The {@link MessageProducerSupport} implementation for the Amazon SQS {@code receiveMessage}.
49-
* Works in 'listener' manner and delegates hard to the {@link SimpleMessageListenerContainer}.
48+
* The {@link MessageProducerSupport} implementation for the Amazon SQS
49+
* {@code receiveMessage}. Works in 'listener' manner and delegates hard to the
50+
* {@link SimpleMessageListenerContainer}.
5051
*
5152
* @author Artem Bilan
5253
* @author Patrick Fitzsimons
53-
*
5454
* @see SimpleMessageListenerContainerFactory
5555
* @see SimpleMessageListenerContainer
5656
* @see QueueMessageHandler
@@ -59,8 +59,7 @@
5959
@IntegrationManagedResource
6060
public class SqsMessageDrivenChannelAdapter extends MessageProducerSupport implements DisposableBean {
6161

62-
private final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory =
63-
new SimpleMessageListenerContainerFactory();
62+
private final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
6463

6564
private final String[] queues;
6665

@@ -163,7 +162,7 @@ public boolean isRunning(String logicalQueueName) {
163162

164163
@ManagedAttribute
165164
public String[] getQueues() {
166-
return Arrays.copyOf(this.queues, this.queues.length);
165+
return Arrays.copyOf(this.queues, this.queues.length);
167166
}
168167

169168
@Override
@@ -185,17 +184,12 @@ public Map<MappingInformation, HandlerMethod> getHandlerMethods() {
185184
protected void handleMessageInternal(Message<?> message, String lookupDestination) {
186185
MessageHeaders headers = message.getHeaders();
187186

188-
Message<?> messageToSend = getMessageBuilderFactory()
189-
.fromMessage(message)
190-
.removeHeaders("LogicalResourceId",
191-
"MessageId",
192-
"ReceiptHandle",
193-
"Acknowledgment")
187+
Message<?> messageToSend = getMessageBuilderFactory().fromMessage(message)
188+
.removeHeaders("LogicalResourceId", "MessageId", "ReceiptHandle", "Acknowledgment")
194189
.setHeader(AwsHeaders.MESSAGE_ID, headers.get("MessageId"))
195190
.setHeader(AwsHeaders.RECEIPT_HANDLE, headers.get("ReceiptHandle"))
196191
.setHeader(AwsHeaders.RECEIVED_QUEUE, headers.get("LogicalResourceId"))
197-
.setHeader(AwsHeaders.ACKNOWLEDGMENT, headers.get("Acknowledgment"))
198-
.build();
192+
.setHeader(AwsHeaders.ACKNOWLEDGMENT, headers.get("Acknowledgment")).build();
199193

200194
sendMessage(messageToSend);
201195
}

src/main/java/org/springframework/integration/aws/inbound/kinesis/CheckpointMode.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
public enum CheckpointMode {
2828

2929
/**
30-
* Checkpoint after each processed record.
31-
* Makes sense only if {@link ListenerMode#record} is used.
30+
* Checkpoint after each processed record. Makes sense only if
31+
* {@link ListenerMode#record} is used.
3232
*/
3333
record,
3434

src/main/java/org/springframework/integration/aws/inbound/kinesis/Checkpointer.java

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
* A callback for target record process to perform checkpoint on the related shard.
2121
*
2222
* @author Artem Bilan
23-
*
2423
* @since 1.1
2524
*/
2625
public interface Checkpointer {

0 commit comments

Comments
 (0)