Skip to content

Commit d9c7b94

Browse files
authored
feat: timeout configuration for client (#87)
Allow the client timeout to be configured on the builder and via the Spring Boot configuration properties.
1 parent 6cd90c9 commit d9c7b94

File tree

4 files changed

+120
-14
lines changed

4 files changed

+120
-14
lines changed

client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,39 @@ public class PrometheusRSocketClient {
6565
private volatile boolean requestedDisconnect = false;
6666
private RSocket sendingSocket;
6767

68+
private Duration timeout;
69+
70+
/**
71+
* Creates a {@link PrometheusRSocketClient}.
72+
*
73+
* @param registryAndScrape the registry and scrape meter
74+
* @param transport the client transport
75+
* @param retry the retry configuration
76+
* @param onKeyReceived the callback if a key has been received
77+
*/
78+
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
79+
ClientTransport transport,
80+
Retry retry,
81+
Runnable onKeyReceived) {
82+
this(registryAndScrape, transport, retry, Duration.ofSeconds(5), onKeyReceived);
83+
}
84+
85+
/**
86+
* Creates a {@link PrometheusRSocketClient}.
87+
*
88+
* @param registryAndScrape the registry and scrape meter
89+
* @param transport the client transport
90+
* @param retry the retry configuration
91+
* @param timeout the timeout to connect and push the data
92+
* @param onKeyReceived the callback if a key has been received
93+
*/
6894
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
6995
ClientTransport transport,
7096
Retry retry,
97+
Duration timeout,
7198
Runnable onKeyReceived) {
7299
this.registryAndScrape = registryAndScrape;
100+
this.timeout = timeout;
73101

74102
RSocketConnector.create()
75103
.reconnect(new Retry() {
@@ -116,8 +144,8 @@ public Mono<Void> fireAndForget(Payload payload) {
116144
.increment())
117145
.doOnNext(connection -> this.connection = connection)
118146
.flatMap(socket -> socket.onClose()
119-
.map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819
120-
.onErrorReturn(1))
147+
.map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819
148+
.onErrorReturn(1))
121149
.repeat(() -> !requestedDisconnect)
122150
.subscribe();
123151
}
@@ -149,6 +177,9 @@ public static <M extends MeterRegistry> Builder build(M meterRegistry, Supplier<
149177
return new Builder(meterRegistry, scrape, clientTransport);
150178
}
151179

180+
/**
181+
* Closes the {@link PrometheusRSocketClient}
182+
*/
152183
public void close() {
153184
this.requestedDisconnect = true;
154185
if (this.connection != null) {
@@ -160,11 +191,12 @@ public void close() {
160191
* Pushes the data in a blocking way and closes the connection.
161192
*/
162193
public void pushAndCloseBlockingly() {
163-
pushAndCloseBlockingly(Duration.ofSeconds(5));
194+
pushAndCloseBlockingly(timeout);
164195
}
165196

166197
/**
167198
* Pushes the data in a blocking way and closes the connection.
199+
*
168200
* @param timeout the amount of time to wait for the data to be sent
169201
*/
170202
public void pushAndCloseBlockingly(Duration timeout) {
@@ -183,8 +215,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
183215
.doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!"))
184216
.doFinally(signalType -> latch.countDown())
185217
.subscribe();
186-
}
187-
catch (Exception exception) {
218+
} catch (Exception exception) {
188219
latch.countDown();
189220
LOGGER.warn("Sending the payload failed!", exception);
190221
}
@@ -193,8 +224,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
193224
if (!latch.await(timeout.toMillis(), MILLISECONDS)) {
194225
LOGGER.warn("Sending the payload timed out!");
195226
}
196-
}
197-
catch (InterruptedException exception) {
227+
} catch (InterruptedException exception) {
198228
LOGGER.warn("Waiting for sending the payload was interrupted!", exception);
199229
}
200230
}
@@ -215,8 +245,7 @@ public void pushAndClose() {
215245
.doOnError(throwable -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection failed!", throwable))
216246
.doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!"))
217247
.subscribe();
218-
}
219-
catch (Exception exception) {
248+
} catch (Exception exception) {
220249
LOGGER.warn("Sending the payload failed!", exception);
221250
}
222251
}
@@ -263,54 +292,111 @@ private PublicKey decodePublicKey(ByteBuffer encodedKeyBuffer) {
263292
}
264293
}
265294

295+
/**
296+
* Builder class to create a {@link PrometheusRSocketClient}.
297+
*/
266298
public static class Builder {
267299
private MeterRegistryAndScrape<?> registryAndScrape;
268300
private final ClientTransport clientTransport;
269301

270302
private Retry retry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10))
271303
.maxBackoff(Duration.ofMinutes(10));
272304

273-
private Runnable onKeyReceived = () -> { };
305+
private Duration timeout = Duration.ofSeconds(5);
306+
307+
private Runnable onKeyReceived = () -> {
308+
};
274309

275310
<M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTransport clientTransport) {
276311
this.registryAndScrape = new MeterRegistryAndScrape<>(registry, scrape);
277312
this.clientTransport = clientTransport;
278313
}
279314

315+
/**
316+
* Configures the retry for {@link PrometheusRSocketClient}.
317+
*
318+
* @param retry the retry configuration
319+
* @return the {@link Builder}
320+
*/
280321
public Builder retry(Retry retry) {
281322
this.retry = retry;
282323
return this;
283324
}
284325

326+
/**
327+
* Timeout for the {@link PrometheusRSocketClient}.
328+
*
329+
* @param timeout the timeout in seconds
330+
* @return the {@link Builder}
331+
*/
332+
public Builder timeout(Duration timeout) {
333+
this.timeout = timeout;
334+
return this;
335+
}
336+
337+
/**
338+
* Callback of {@link PrometheusRSocketClient} if a key is received.
339+
*
340+
* @param onKeyReceived callback which is executed if a key is received
341+
* @return the {@link Builder}
342+
*/
285343
public Builder doOnKeyReceived(Runnable onKeyReceived) {
286344
this.onKeyReceived = onKeyReceived;
287345
return this;
288346
}
289347

348+
/**
349+
* Connects the {@link PrometheusRSocketClient}.
350+
*
351+
* @return the {@link PrometheusRSocketClient}
352+
*/
290353
public PrometheusRSocketClient connect() {
354+
return connect(timeout);
355+
}
356+
357+
/**
358+
* Connects the {@link PrometheusRSocketClient}.
359+
*
360+
* @param timeout the timeout for the client to connect
361+
*
362+
* @return the {@link PrometheusRSocketClient}
363+
*/
364+
public PrometheusRSocketClient connect(Duration timeout) {
291365
LOGGER.debug("Connecting to RSocket Proxy...");
292366
return new PrometheusRSocketClient(
293367
registryAndScrape,
294368
clientTransport,
295369
retry,
370+
timeout,
296371
() -> {
297372
LOGGER.info("Connected to RSocket Proxy!");
298373
onKeyReceived.run();
299374
}
300375
);
301376
}
302377

378+
/**
379+
* Connects the {@link PrometheusRSocketClient} blockingly.
380+
*
381+
* @return the {@link PrometheusRSocketClient}
382+
*/
303383
public PrometheusRSocketClient connectBlockingly() {
304-
return connectBlockingly(Duration.ofSeconds(5));
384+
return connectBlockingly(timeout);
305385
}
306386

387+
/**
388+
* Connects the {@link PrometheusRSocketClient} blockingly with the given timeout.
389+
*
390+
* @return the {@link PrometheusRSocketClient}
391+
*/
307392
public PrometheusRSocketClient connectBlockingly(Duration timeout) {
308393
LOGGER.debug("Connecting to RSocket Proxy...");
309394
CountDownLatch latch = new CountDownLatch(1);
310395
PrometheusRSocketClient client = new PrometheusRSocketClient(
311396
registryAndScrape,
312397
clientTransport,
313398
retry,
399+
timeout,
314400
() -> {
315401
LOGGER.info("Connected to RSocket Proxy!");
316402
onKeyReceived.run();
@@ -322,8 +408,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
322408
if (!latch.await(timeout.toMillis(), MILLISECONDS)) {
323409
LOGGER.warn("Creating the connection and receiving the key timed out!");
324410
}
325-
}
326-
catch (InterruptedException exception) {
411+
} catch (InterruptedException exception) {
327412
LOGGER.warn("Waiting for receiving the key was interrupted!", exception);
328413
}
329414

client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import reactor.core.scheduler.Schedulers;
3131
import reactor.util.retry.Retry;
3232

33+
import java.lang.reflect.Field;
3334
import java.security.KeyPairGenerator;
3435
import java.security.NoSuchAlgorithmException;
3536
import java.time.Duration;
@@ -165,7 +166,7 @@ public Mono<Void> fireAndForget(Payload payload) {
165166
}
166167

167168
@Test
168-
void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException {
169+
void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException, NoSuchFieldException, IllegalAccessException {
169170
CountDownLatch pushLatch = new CountDownLatch(1);
170171
AtomicBoolean pushed = new AtomicBoolean(false);
171172
Payload payload = DefaultPayload.create(KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic().getEncoded());
@@ -200,6 +201,7 @@ public Mono<Payload> requestResponse(Payload payload) {
200201
serverTransport.clientTransport()
201202
)
202203
.retry(Retry.max(0))
204+
.timeout(Duration.ofSeconds(10))
203205
.doOnKeyReceived(() -> {
204206
await(keyReceivedLatch);
205207
keyReceived.set(true);
@@ -211,6 +213,11 @@ public Mono<Payload> requestResponse(Payload payload) {
211213
PrometheusRSocketClient client = clientFuture.get();
212214
assertThat(keyReceived).as("Public key should be received(connected)").isTrue();
213215

216+
Field timeoutField = PrometheusRSocketClient.class.getDeclaredField("timeout");
217+
timeoutField.setAccessible(true);
218+
Duration timeout = (Duration)timeoutField.get(client);
219+
assertThat(timeout.toSeconds()).isEqualTo(10L);
220+
214221
CompletableFuture<Void> closeFuture = runAsync(client::pushAndCloseBlockingly, newSingleThreadExecutor());
215222
runAsync(pushLatch::countDown, newDelayedExecutor());
216223
assertThat(pushed).as("Data should not be pushed").isFalse();

starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterReg
4141
return PrometheusRSocketClient.build(meterRegistry, properties.createClientTransport())
4242
.retry(Retry.backoff(properties.getMaxRetries(), properties.getFirstBackoff())
4343
.maxBackoff(properties.getMaxBackoff()))
44+
.timeout(properties.getTimeout())
4445
.connectBlockingly();
4546
}
4647
}

starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public class PrometheusRSocketClientProperties {
6363
*/
6464
private boolean secure = false;
6565

66+
/**
67+
* The timeout in seconds to be used for establishing the connection and pushing the data
68+
*/
69+
private Duration timeout = Duration.ofSeconds(5);
70+
6671
public long getMaxRetries() {
6772
return maxRetries;
6873
}
@@ -119,6 +124,14 @@ public boolean isSecure() {
119124
return secure;
120125
}
121126

127+
public Duration getTimeout() {
128+
return timeout;
129+
}
130+
131+
public void setTimeout(Duration timeout) {
132+
this.timeout = timeout;
133+
}
134+
122135
ClientTransport createClientTransport() {
123136
final TcpClient tcpClient = TcpClient.create().host(this.host).port(this.port);
124137
return this.transport.create(this.secure ? tcpClient.secure() : tcpClient);

0 commit comments

Comments
 (0)