Skip to content

Commit e921628

Browse files
artembilangaryrussell
authored andcommitted
INT-4497: Add RateLimiterRequestHandlerAdvice (#2781)
* INT-4497: Add RateLimiterRequestHandlerAdvice JIRA: https://jira.spring.io/browse/INT-4497 * * Remove unused property
1 parent e32f877 commit e921628

File tree

5 files changed

+289
-2
lines changed

5 files changed

+289
-2
lines changed

build.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
buildscript {
1+
buildscript {
22
ext.kotlinVersion = '1.3.21'
33
repositories {
44
maven { url 'https://repo.spring.io/plugins-release' }
@@ -131,6 +131,7 @@ subprojects { subproject ->
131131
postgresVersion = '42.2.5'
132132
reactorNettyVersion = '0.8.5.RELEASE'
133133
reactorVersion = '3.2.6.RELEASE'
134+
resilience4jVersion = '0.13.2'
134135
romeToolsVersion = '1.12.0'
135136
servletApiVersion = '4.0.1'
136137
smackVersion = '4.3.1'
@@ -381,6 +382,7 @@ project('spring-integration-core') {
381382
compile("io.fastjson:boon:$boonVersion", optional)
382383
compile("com.esotericsoftware:kryo-shaded:$kryoShadedVersion", optional)
383384
compile("io.micrometer:micrometer-core:$micrometerVersion", optional)
385+
compile("io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion", optional)
384386

385387
testCompile ("org.aspectj:aspectjweaver:$aspectjVersion")
386388
testCompile "io.projectreactor:reactor-test:$reactorVersion"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessagingException;
23+
import org.springframework.util.Assert;
24+
25+
import io.github.resilience4j.ratelimiter.RateLimiter;
26+
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
27+
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
28+
import io.vavr.CheckedFunction0;
29+
import io.vavr.control.Try;
30+
31+
/**
32+
* An {@link AbstractRequestHandlerAdvice} extension for a rate limiting to service method calls.
33+
* The implementation is based on the
34+
* <a href="https://github.com/resilience4j/resilience4j#ratelimiter">Resilience4j</a>.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.2
39+
*/
40+
public class RateLimiterRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
41+
42+
public static final String DEFAULT_NAME = "RateLimiterRequestHandlerAdvice";
43+
44+
private final RateLimiter rateLimiter;
45+
46+
/**
47+
* Construct an instance based on default rate limiter options
48+
* and {@value #DEFAULT_NAME} as a rate limiter name.
49+
* @see RateLimiter#ofDefaults
50+
*/
51+
public RateLimiterRequestHandlerAdvice() {
52+
this(RateLimiter.ofDefaults(DEFAULT_NAME));
53+
}
54+
55+
/**
56+
* Construct an instance based on default rate limiter options and provided name.
57+
* @param name the name for the rate limiter.
58+
*/
59+
public RateLimiterRequestHandlerAdvice(String name) {
60+
this(RateLimiter.ofDefaults(name));
61+
Assert.hasText(name, "'name' must not be empty");
62+
}
63+
64+
/**
65+
* Construct an instance based on the provided {@link RateLimiter}.
66+
* @param rateLimiter the {@link RateLimiter} to use.
67+
*/
68+
public RateLimiterRequestHandlerAdvice(RateLimiter rateLimiter) {
69+
Assert.notNull(rateLimiter, "'rateLimiter' must not be null");
70+
this.rateLimiter = rateLimiter;
71+
}
72+
73+
/**
74+
* Construct an instance based on the provided {@link RateLimiterConfig}
75+
* and {@value #DEFAULT_NAME} as a rate limiter name.
76+
* @param rateLimiterConfig the {@link RateLimiterConfig} to use.
77+
*/
78+
public RateLimiterRequestHandlerAdvice(RateLimiterConfig rateLimiterConfig) {
79+
this(rateLimiterConfig, DEFAULT_NAME);
80+
}
81+
82+
/**
83+
* Construct an instance based on the provided {@link RateLimiterConfig} and name.
84+
* @param rateLimiterConfig the {@link RateLimiterConfig} to use.
85+
* @param name the name for the rate limiter.
86+
*/
87+
public RateLimiterRequestHandlerAdvice(RateLimiterConfig rateLimiterConfig, String name) {
88+
Assert.notNull(rateLimiterConfig, "'rateLimiterConfig' must not be null");
89+
Assert.hasText(name, "'name' must not be empty");
90+
this.rateLimiter = RateLimiter.of(name, rateLimiterConfig);
91+
}
92+
93+
/**
94+
* Change the {@code limitForPeriod} option of the {@link #rateLimiter}.
95+
* @param limitForPeriod the {@code limitForPeriod} to use.
96+
* @see RateLimiter#changeLimitForPeriod(int)
97+
*/
98+
public void setLimitForPeriod(int limitForPeriod) {
99+
this.rateLimiter.changeLimitForPeriod(limitForPeriod);
100+
}
101+
102+
/**
103+
* Change the {@code timeoutDuration} option of the {@link #rateLimiter}.
104+
* @param timeoutDuration the {@code timeoutDuration} to use.
105+
* @see RateLimiter#changeTimeoutDuration(Duration)
106+
*/
107+
public void setTimeoutDuration(Duration timeoutDuration) {
108+
this.rateLimiter.changeTimeoutDuration(timeoutDuration);
109+
}
110+
111+
/**
112+
* Obtain the metrics from the rate limiter.
113+
* @return the {@link RateLimiter.Metrics} from rate limiter.
114+
* @see RateLimiter#getMetrics()
115+
*/
116+
public RateLimiter.Metrics getMetrics() {
117+
return this.rateLimiter.getMetrics();
118+
}
119+
120+
@Override
121+
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
122+
CheckedFunction0<Object> restrictedCall =
123+
RateLimiter.decorateCheckedSupplier(this.rateLimiter, callback::execute);
124+
try {
125+
return Try.of(restrictedCall).get();
126+
}
127+
catch (RequestNotPermitted ex) {
128+
throw new RateLimitExceededException(message, "Rate limit exceeded for: " + target, ex);
129+
}
130+
}
131+
132+
133+
/**
134+
* A {@link MessagingException} wrapper for the {@link RequestNotPermitted}
135+
* with the {@code requestMessage} and {@code target} context.
136+
*/
137+
public static class RateLimitExceededException extends MessagingException {
138+
139+
private static final long serialVersionUID = 1L;
140+
141+
RateLimitExceededException(Message<?> message, String description, RequestNotPermitted cause) {
142+
super(message, description, cause);
143+
}
144+
145+
}
146+
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
22+
import java.time.Duration;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.integration.annotation.ServiceActivator;
30+
import org.springframework.integration.channel.QueueChannel;
31+
import org.springframework.integration.config.EnableIntegration;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.MessageChannel;
34+
import org.springframework.messaging.MessagingException;
35+
import org.springframework.messaging.PollableChannel;
36+
import org.springframework.messaging.support.GenericMessage;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
38+
39+
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
40+
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
41+
42+
/**
43+
* @author Artem Bilan
44+
*
45+
* @since 5.2
46+
*/
47+
@SpringJUnitConfig
48+
public class RateLimiterRequestHandlerAdviceTests {
49+
50+
@Autowired
51+
private MessageChannel requestChannel;
52+
53+
@Autowired
54+
private PollableChannel resultChannel;
55+
56+
@Test
57+
void testRateLimiter() throws InterruptedException {
58+
Message<?> testMessage = new GenericMessage<>("test");
59+
this.requestChannel.send(testMessage);
60+
61+
assertThatExceptionOfType(MessagingException.class)
62+
.isThrownBy(() -> this.requestChannel.send(testMessage))
63+
.withCauseInstanceOf(RequestNotPermitted.class)
64+
.withMessageContaining("Rate limit exceeded for: ");
65+
66+
Thread.sleep(200);
67+
68+
this.requestChannel.send(testMessage);
69+
70+
assertThat(this.resultChannel.receive(10_000)).isNotNull();
71+
assertThat(this.resultChannel.receive(10_000)).isNotNull();
72+
}
73+
74+
@Configuration
75+
@EnableIntegration
76+
public static class ContextConfiguration {
77+
78+
@Bean
79+
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
80+
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
81+
.timeoutDuration(Duration.ofMillis(100))
82+
.limitRefreshPeriod(Duration.ofMillis(500))
83+
.limitForPeriod(1)
84+
.build());
85+
}
86+
87+
@Bean
88+
public PollableChannel resultChannel() {
89+
return new QueueChannel();
90+
}
91+
92+
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
93+
adviceChain = "rateLimiterRequestHandlerAdvice")
94+
public String handleRequest(String payload) {
95+
return payload;
96+
}
97+
98+
}
99+
100+
}

src/reference/asciidoc/handler-advice.adoc

+33-1
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ For chains that produce a reply, every child element can be advised.
4949
[[advice-classes]]
5050
==== Provided Advice Classes
5151

52-
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides three standard advice classes:
52+
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
5353

5454
* `RequestHandlerRetryAdvice` (described in <<retry-advice>>)
5555
* `RequestHandlerCircuitBreakerAdvice` (described in <<circuit-breaker-advice>>)
5656
* `ExpressionEvaluatingRequestHandlerAdvice` (described in <<expression-advice>>)
57+
* `RateLimiterRequestHandlerAdvice` (described in <<rate-limiter-advice>>)
5758

5859
[[retry-advice]]
5960
===== Retry Advice
@@ -464,6 +465,37 @@ public class EerhaApplication {
464465
----
465466
====
466467

468+
[[rate-limiter-advice]]
469+
===== Rate Limiter Advice
470+
471+
The Rate Limiter advice (`RateLimiterRequestHandlerAdvice`) allows to ensure that an endpoint does not get overloaded with requests.
472+
When the rate limit is breached the request will go in a blocked state.
473+
474+
A typical use case for this advice might be an external service provider not allowing more than `n` number of request per minute.
475+
476+
The `RateLimiterRequestHandlerAdvice` implementation is fully based on the https://github.com/resilience4j/resilience4j#ratelimiter[Resilience4j] project and requires either `RateLimiter` or `RateLimiterConfig` injections.
477+
Can also be configured with defaults and/or custom name.
478+
479+
The following example configures a rate limiter advice with one request per 1 second:
480+
====
481+
[source, java]
482+
----
483+
@Bean
484+
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
485+
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
486+
.limitRefreshPeriod(Duration.ofSeconds(1))
487+
.limitForPeriod(1)
488+
.build());
489+
}
490+
491+
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
492+
adviceChain = "rateLimiterRequestHandlerAdvice")
493+
public String handleRequest(String payload) {
494+
...
495+
}
496+
----
497+
====
498+
467499
[[custom-advice]]
468500
==== Custom Advice Classes
469501

src/reference/asciidoc/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ If you are interested in more details, see the Issue Tracker tickets that were r
77
[[x5.2-new-components]]
88
=== New Components
99

10+
[[x5.2-rateLimitAdvice]]
11+
=== Rate Limit Advice Support
12+
13+
The `RateLimiterRequestHandlerAdvice` is now available for limiting requests rate on handlers.
14+
See <<rate-limiter-advice>> for more information.
15+
1016
[[x5.2-general]]
1117
=== General Changes
1218

0 commit comments

Comments
 (0)