Skip to content

Commit 75643d1

Browse files
committed
Issue ReactiveX#12 Initial RateLimiter implementation and JavaDocs
1 parent 0bad9f5 commit 75643d1

11 files changed

+1220
-1
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,16 @@ jmh {
6262
dependencies {
6363
compile "io.javaslang:javaslang:2.0.4"
6464
compile "org.slf4j:slf4j-api:1.7.13"
65+
6566
testCompile "io.dropwizard.metrics:metrics-core:3.1.2"
6667
testCompile "junit:junit:4.11"
6768
testCompile "org.assertj:assertj-core:3.0.0"
6869
testCompile "ch.qos.logback:logback-classic:0.9.26"
6970
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
70-
testCompile "org.mockito:mockito-all:1.10.19"
71+
testCompile "org.mockito:mockito-core:1.10.19"
7172
testCompile "io.projectreactor:reactor-core:2.5.0.M2"
73+
testCompile "com.jayway.awaitility:awaitility:1.7.0"
74+
7275
jmh "ch.qos.logback:logback-classic:0.9.26"
7376
}
7477

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package javaslang.ratelimiter;
2+
3+
import javaslang.control.Try;
4+
5+
import java.time.Duration;
6+
import java.util.function.Consumer;
7+
import java.util.function.Function;
8+
import java.util.function.Supplier;
9+
10+
/**
11+
* A RateLimiter distributes permits at a configurable rate. {@link #getPermission} blocks if necessary
12+
* until a permit is available, and then takes it. Once acquired, permits need not be released.
13+
*/
14+
public interface RateLimiter {
15+
16+
/**
17+
* Acquires a permission from this rate limiter, blocking until one is
18+
* available.
19+
* <p>
20+
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
21+
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
22+
* but its interrupt status will be set.
23+
*
24+
* @return {@code true} if a permit was acquired and {@code false}
25+
* if waiting time elapsed before a permit was acquired
26+
*/
27+
boolean getPermission(Duration timeoutDuration);
28+
29+
/**
30+
* Get the name of this RateLimiter
31+
*
32+
* @return the name of this RateLimiter
33+
*/
34+
String getName();
35+
36+
/**
37+
* Get the RateLimiterConfig of this RateLimiter.
38+
*
39+
* @return the RateLimiterConfig of this RateLimiter
40+
*/
41+
RateLimiterConfig getRateLimiterConfig();
42+
43+
/**
44+
* Get the Metrics of this RateLimiter.
45+
*
46+
* @return the Metrics of this RateLimiter
47+
*/
48+
Metrics getMetrics();
49+
50+
interface Metrics {
51+
/**
52+
* Returns an estimate of the number of threads waiting for permission
53+
* in this JVM process.
54+
*
55+
* @return estimate of the number of threads waiting for permission.
56+
*/
57+
int getNumberOfWaitingThreads();
58+
}
59+
60+
/**
61+
* Creates a supplier which is restricted by a RateLimiter.
62+
*
63+
* @param rateLimiter the RateLimiter
64+
* @param supplier the original supplier
65+
* @return a supplier which is restricted by a RateLimiter.
66+
*/
67+
static <T> Try.CheckedSupplier<T> decorateCheckedSupplier(RateLimiter rateLimiter, Try.CheckedSupplier<T> supplier) {
68+
Try.CheckedSupplier<T> decoratedSupplier = () -> {
69+
waitForPermission(rateLimiter);
70+
T result = supplier.get();
71+
return result;
72+
};
73+
return decoratedSupplier;
74+
}
75+
76+
/**
77+
* Creates a runnable which is restricted by a RateLimiter.
78+
*
79+
* @param rateLimiter the RateLimiter
80+
* @param runnable the original runnable
81+
* @return a runnable which is restricted by a RateLimiter.
82+
*/
83+
static Try.CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, Try.CheckedRunnable runnable) {
84+
85+
Try.CheckedRunnable decoratedRunnable = () -> {
86+
waitForPermission(rateLimiter);
87+
runnable.run();
88+
};
89+
return decoratedRunnable;
90+
}
91+
92+
/**
93+
* Creates a function which is restricted by a RateLimiter.
94+
*
95+
* @param rateLimiter the RateLimiter
96+
* @param function the original function
97+
* @return a function which is restricted by a RateLimiter.
98+
*/
99+
static <T, R> Try.CheckedFunction<T, R> decorateCheckedFunction(RateLimiter rateLimiter, Try.CheckedFunction<T, R> function) {
100+
Try.CheckedFunction<T, R> decoratedFunction = (T t) -> {
101+
waitForPermission(rateLimiter);
102+
R result = function.apply(t);
103+
return result;
104+
};
105+
return decoratedFunction;
106+
}
107+
108+
/**
109+
* Creates a supplier which is restricted by a RateLimiter.
110+
*
111+
* @param rateLimiter the RateLimiter
112+
* @param supplier the original supplier
113+
* @return a supplier which is restricted by a RateLimiter.
114+
*/
115+
static <T> Supplier<T> decorateSupplier(RateLimiter rateLimiter, Supplier<T> supplier) {
116+
Supplier<T> decoratedSupplier = () -> {
117+
waitForPermission(rateLimiter);
118+
T result = supplier.get();
119+
return result;
120+
};
121+
return decoratedSupplier;
122+
}
123+
124+
/**
125+
* Creates a consumer which is restricted by a RateLimiter.
126+
*
127+
* @param rateLimiter the RateLimiter
128+
* @param consumer the original consumer
129+
* @return a consumer which is restricted by a RateLimiter.
130+
*/
131+
static <T> Consumer<T> decorateConsumer(RateLimiter rateLimiter, Consumer<T> consumer) {
132+
Consumer<T> decoratedConsumer = (T t) -> {
133+
waitForPermission(rateLimiter);
134+
consumer.accept(t);
135+
};
136+
return decoratedConsumer;
137+
}
138+
139+
/**
140+
* Creates a runnable which is restricted by a RateLimiter.
141+
*
142+
* @param rateLimiter the RateLimiter
143+
* @param runnable the original runnable
144+
* @return a runnable which is restricted by a RateLimiter.
145+
*/
146+
static Runnable decorateRunnable(RateLimiter rateLimiter, Runnable runnable) {
147+
Runnable decoratedRunnable = () -> {
148+
waitForPermission(rateLimiter);
149+
runnable.run();
150+
};
151+
return decoratedRunnable;
152+
}
153+
154+
/**
155+
* Creates a function which is restricted by a RateLimiter.
156+
*
157+
* @param rateLimiter the RateLimiter
158+
* @param function the original function
159+
* @return a function which is restricted by a RateLimiter.
160+
*/
161+
static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<T, R> function) {
162+
Function<T, R> decoratedFunction = (T t) -> {
163+
waitForPermission(rateLimiter);
164+
R result = function.apply(t);
165+
return result;
166+
};
167+
return decoratedFunction;
168+
}
169+
170+
/**
171+
* Will wait for permission within default timeout duration.
172+
* Throws {@link RequestNotPermitted} if waiting time elapsed before a permit was acquired.
173+
*
174+
* @param rateLimiter the RateLimiter to get permission from
175+
*/
176+
static void waitForPermission(final RateLimiter rateLimiter) {
177+
RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig();
178+
Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration();
179+
boolean permission = rateLimiter.getPermission(timeoutDuration);
180+
if (!permission) {
181+
throw new RequestNotPermitted("Request not permitted for limiter: " + rateLimiter.getName());
182+
}
183+
}
184+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package javaslang.ratelimiter;
2+
3+
import static java.util.Objects.requireNonNull;
4+
5+
import java.time.Duration;
6+
7+
public class RateLimiterConfig {
8+
private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null";
9+
private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null";
10+
11+
private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(500L); // TODO: use jmh to find real one
12+
13+
private final Duration timeoutDuration;
14+
private final Duration limitRefreshPeriod;
15+
private final int limitForPeriod;
16+
17+
private RateLimiterConfig(final Duration timeoutDuration, final Duration limitRefreshPeriod, final int limitForPeriod) {
18+
this.timeoutDuration = checkTimeoutDuration(timeoutDuration);
19+
this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
20+
this.limitForPeriod = checkLimitForPeriod(limitForPeriod);
21+
}
22+
23+
public static Builder builder() {
24+
return new Builder();
25+
}
26+
27+
public Duration getTimeoutDuration() {
28+
return timeoutDuration;
29+
}
30+
31+
public Duration getLimitRefreshPeriod() {
32+
return limitRefreshPeriod;
33+
}
34+
35+
public int getLimitForPeriod() {
36+
return limitForPeriod;
37+
}
38+
39+
public static class Builder {
40+
41+
private Duration timeoutDuration;
42+
private Duration limitRefreshPeriod;
43+
private int limitForPeriod;
44+
45+
/**
46+
* Builds a RateLimiterConfig
47+
*
48+
* @return the RateLimiterConfig
49+
*/
50+
public RateLimiterConfig build() {
51+
return new RateLimiterConfig(
52+
timeoutDuration,
53+
limitRefreshPeriod,
54+
limitForPeriod
55+
);
56+
}
57+
58+
/**
59+
* Configures the default wait for permission duration.
60+
*
61+
* @param timeoutDuration the default wait for permission duration
62+
* @return the RateLimiterConfig.Builder
63+
*/
64+
public Builder timeoutDuration(final Duration timeoutDuration) {
65+
this.timeoutDuration = checkTimeoutDuration(timeoutDuration);
66+
return this;
67+
}
68+
69+
/**
70+
* Configures the period of limit refresh.
71+
* After each period rate limiter sets its permissions
72+
* count to {@link RateLimiterConfig#limitForPeriod} value.
73+
*
74+
* @param limitRefreshPeriod the period of limit refresh
75+
* @return the RateLimiterConfig.Builder
76+
*/
77+
public Builder limitRefreshPeriod(final Duration limitRefreshPeriod) {
78+
this.limitRefreshPeriod = checkLimitRefreshPeriod(limitRefreshPeriod);
79+
return this;
80+
}
81+
82+
/**
83+
* Configures the permissions limit for refresh period.
84+
* Count of permissions available during one rate limiter period
85+
* specified by {@link RateLimiterConfig#limitRefreshPeriod} value.
86+
*
87+
* @param limitForPeriod the permissions limit for refresh period
88+
* @return the RateLimiterConfig.Builder
89+
*/
90+
public Builder limitForPeriod(final int limitForPeriod) {
91+
this.limitForPeriod = checkLimitForPeriod(limitForPeriod);
92+
return this;
93+
}
94+
95+
}
96+
97+
private static Duration checkTimeoutDuration(final Duration timeoutDuration) {
98+
return requireNonNull(timeoutDuration, TIMEOUT_DURATION_MUST_NOT_BE_NULL);
99+
}
100+
101+
private static Duration checkLimitRefreshPeriod(Duration limitRefreshPeriod) {
102+
requireNonNull(limitRefreshPeriod, LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL);
103+
boolean refreshPeriodIsTooShort = limitRefreshPeriod.compareTo(ACCEPTABLE_REFRESH_PERIOD) < 0;
104+
if (refreshPeriodIsTooShort) {
105+
throw new IllegalArgumentException("LimitRefreshPeriod is too short");
106+
}
107+
return limitRefreshPeriod;
108+
}
109+
110+
private static int checkLimitForPeriod(final int limitForPeriod) {
111+
if (limitForPeriod < 1) {
112+
throw new IllegalArgumentException("LimitForPeriod should be greater than 0");
113+
}
114+
return limitForPeriod;
115+
}
116+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package javaslang.ratelimiter;
2+
3+
import javaslang.ratelimiter.internal.InMemoryRateLimiterRegistry;
4+
5+
import java.util.function.Supplier;
6+
7+
/**
8+
* Manages all RateLimiter instances.
9+
*/
10+
public interface RateLimiterRegistry {
11+
12+
/**
13+
* Returns a managed {@link RateLimiter} or creates a new one with the default RateLimiter configuration.
14+
*
15+
* @param name the name of the RateLimiter
16+
* @return The {@link RateLimiter}
17+
*/
18+
RateLimiter rateLimiter(String name);
19+
20+
/**
21+
* Returns a managed {@link RateLimiter} or creates a new one with a custom RateLimiter configuration.
22+
*
23+
* @param name the name of the RateLimiter
24+
* @param rateLimiterConfig a custom RateLimiter configuration
25+
* @return The {@link RateLimiter}
26+
*/
27+
RateLimiter rateLimiter(String name, RateLimiterConfig rateLimiterConfig);
28+
29+
/**
30+
* Returns a managed {@link RateLimiterConfig} or creates a new one with a custom RateLimiterConfig configuration.
31+
*
32+
* @param name the name of the RateLimiterConfig
33+
* @param rateLimiterConfigSupplier a supplier of a custom RateLimiterConfig configuration
34+
* @return The {@link RateLimiterConfig}
35+
*/
36+
RateLimiter rateLimiter(String name, Supplier<RateLimiterConfig> rateLimiterConfigSupplier);
37+
38+
static RateLimiterRegistry of(RateLimiterConfig defaultRateLimiterConfig) {
39+
return new InMemoryRateLimiterRegistry(defaultRateLimiterConfig);
40+
}
41+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package javaslang.ratelimiter;
2+
3+
/**
4+
* Exception that indicates that current thread was not able to acquire permission
5+
* from {@link RateLimiter}.
6+
*/
7+
public class RequestNotPermitted extends RuntimeException {
8+
9+
/**
10+
* The constructor with a message.
11+
*
12+
* @param message The message.
13+
*/
14+
public RequestNotPermitted(final String message) {
15+
super(message);
16+
}
17+
}

0 commit comments

Comments
 (0)