diff --git a/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java b/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java index 980fdfd..48d1ca5 100644 --- a/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java +++ b/client/src/main/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClient.java @@ -65,11 +65,39 @@ public class PrometheusRSocketClient { private volatile boolean requestedDisconnect = false; private RSocket sendingSocket; + private Duration timeout; + + /** + * Creates a {@link PrometheusRSocketClient}. + * + * @param registryAndScrape the registry and scrape meter + * @param transport the client transport + * @param retry the retry configuration + * @param onKeyReceived the callback if a key has been received + */ + private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, + ClientTransport transport, + Retry retry, + Runnable onKeyReceived) { + this(registryAndScrape, transport, retry, Duration.ofSeconds(5), onKeyReceived); + } + + /** + * Creates a {@link PrometheusRSocketClient}. + * + * @param registryAndScrape the registry and scrape meter + * @param transport the client transport + * @param retry the retry configuration + * @param timeout the timeout to connect and push the data + * @param onKeyReceived the callback if a key has been received + */ private PrometheusRSocketClient(MeterRegistryAndScrape registryAndScrape, ClientTransport transport, Retry retry, + Duration timeout, Runnable onKeyReceived) { this.registryAndScrape = registryAndScrape; + this.timeout = timeout; RSocketConnector.create() .reconnect(new Retry() { @@ -116,8 +144,8 @@ public Mono fireAndForget(Payload payload) { .increment()) .doOnNext(connection -> this.connection = connection) .flatMap(socket -> socket.onClose() - .map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819 - .onErrorReturn(1)) + .map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819 + .onErrorReturn(1)) .repeat(() -> !requestedDisconnect) .subscribe(); } @@ -149,6 +177,9 @@ public static Builder build(M meterRegistry, Supplier< return new Builder(meterRegistry, scrape, clientTransport); } + /** + * Closes the {@link PrometheusRSocketClient} + */ public void close() { this.requestedDisconnect = true; if (this.connection != null) { @@ -160,11 +191,12 @@ public void close() { * Pushes the data in a blocking way and closes the connection. */ public void pushAndCloseBlockingly() { - pushAndCloseBlockingly(Duration.ofSeconds(5)); + pushAndCloseBlockingly(timeout); } /** * Pushes the data in a blocking way and closes the connection. + * * @param timeout the amount of time to wait for the data to be sent */ public void pushAndCloseBlockingly(Duration timeout) { @@ -183,8 +215,7 @@ public void pushAndCloseBlockingly(Duration timeout) { .doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!")) .doFinally(signalType -> latch.countDown()) .subscribe(); - } - catch (Exception exception) { + } catch (Exception exception) { latch.countDown(); LOGGER.warn("Sending the payload failed!", exception); } @@ -193,8 +224,7 @@ public void pushAndCloseBlockingly(Duration timeout) { if (!latch.await(timeout.toMillis(), MILLISECONDS)) { LOGGER.warn("Sending the payload timed out!"); } - } - catch (InterruptedException exception) { + } catch (InterruptedException exception) { LOGGER.warn("Waiting for sending the payload was interrupted!", exception); } } @@ -215,8 +245,7 @@ public void pushAndClose() { .doOnError(throwable -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection failed!", throwable)) .doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!")) .subscribe(); - } - catch (Exception exception) { + } catch (Exception exception) { LOGGER.warn("Sending the payload failed!", exception); } } @@ -263,6 +292,9 @@ private PublicKey decodePublicKey(ByteBuffer encodedKeyBuffer) { } } + /** + * Builder class to create a {@link PrometheusRSocketClient}. + */ public static class Builder { private MeterRegistryAndScrape registryAndScrape; private final ClientTransport clientTransport; @@ -270,29 +302,72 @@ public static class Builder { private Retry retry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10)) .maxBackoff(Duration.ofMinutes(10)); - private Runnable onKeyReceived = () -> { }; + private Duration timeout = Duration.ofSeconds(5); + + private Runnable onKeyReceived = () -> { + }; Builder(M registry, Supplier scrape, ClientTransport clientTransport) { this.registryAndScrape = new MeterRegistryAndScrape<>(registry, scrape); this.clientTransport = clientTransport; } + /** + * Configures the retry for {@link PrometheusRSocketClient}. + * + * @param retry the retry configuration + * @return the {@link Builder} + */ public Builder retry(Retry retry) { this.retry = retry; return this; } + /** + * Timeout for the {@link PrometheusRSocketClient}. + * + * @param timeout the timeout in seconds + * @return the {@link Builder} + */ + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + /** + * Callback of {@link PrometheusRSocketClient} if a key is received. + * + * @param onKeyReceived callback which is executed if a key is received + * @return the {@link Builder} + */ public Builder doOnKeyReceived(Runnable onKeyReceived) { this.onKeyReceived = onKeyReceived; return this; } + /** + * Connects the {@link PrometheusRSocketClient}. + * + * @return the {@link PrometheusRSocketClient} + */ public PrometheusRSocketClient connect() { + return connect(timeout); + } + + /** + * Connects the {@link PrometheusRSocketClient}. + * + * @param timeout the timeout for the client to connect + * + * @return the {@link PrometheusRSocketClient} + */ + public PrometheusRSocketClient connect(Duration timeout) { LOGGER.debug("Connecting to RSocket Proxy..."); return new PrometheusRSocketClient( registryAndScrape, clientTransport, retry, + timeout, () -> { LOGGER.info("Connected to RSocket Proxy!"); onKeyReceived.run(); @@ -300,10 +375,20 @@ public PrometheusRSocketClient connect() { ); } + /** + * Connects the {@link PrometheusRSocketClient} blockingly. + * + * @return the {@link PrometheusRSocketClient} + */ public PrometheusRSocketClient connectBlockingly() { - return connectBlockingly(Duration.ofSeconds(5)); + return connectBlockingly(timeout); } + /** + * Connects the {@link PrometheusRSocketClient} blockingly with the given timeout. + * + * @return the {@link PrometheusRSocketClient} + */ public PrometheusRSocketClient connectBlockingly(Duration timeout) { LOGGER.debug("Connecting to RSocket Proxy..."); CountDownLatch latch = new CountDownLatch(1); @@ -311,6 +396,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) { registryAndScrape, clientTransport, retry, + timeout, () -> { LOGGER.info("Connected to RSocket Proxy!"); onKeyReceived.run(); @@ -322,8 +408,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) { if (!latch.await(timeout.toMillis(), MILLISECONDS)) { LOGGER.warn("Creating the connection and receiving the key timed out!"); } - } - catch (InterruptedException exception) { + } catch (InterruptedException exception) { LOGGER.warn("Waiting for receiving the key was interrupted!", exception); } diff --git a/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java b/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java index 7613562..be71797 100644 --- a/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java +++ b/client/src/test/java/io/micrometer/prometheus/rsocket/PrometheusRSocketClientTests.java @@ -30,6 +30,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; +import java.lang.reflect.Field; import java.security.KeyPairGenerator; import java.security.NoSuchAlgorithmException; import java.time.Duration; @@ -165,7 +166,7 @@ public Mono fireAndForget(Payload payload) { } @Test - void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException { + void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException, NoSuchFieldException, IllegalAccessException { CountDownLatch pushLatch = new CountDownLatch(1); AtomicBoolean pushed = new AtomicBoolean(false); Payload payload = DefaultPayload.create(KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic().getEncoded()); @@ -200,6 +201,7 @@ public Mono requestResponse(Payload payload) { serverTransport.clientTransport() ) .retry(Retry.max(0)) + .timeout(Duration.ofSeconds(10)) .doOnKeyReceived(() -> { await(keyReceivedLatch); keyReceived.set(true); @@ -211,6 +213,11 @@ public Mono requestResponse(Payload payload) { PrometheusRSocketClient client = clientFuture.get(); assertThat(keyReceived).as("Public key should be received(connected)").isTrue(); + Field timeoutField = PrometheusRSocketClient.class.getDeclaredField("timeout"); + timeoutField.setAccessible(true); + Duration timeout = (Duration)timeoutField.get(client); + assertThat(timeout.toSeconds()).isEqualTo(10L); + CompletableFuture closeFuture = runAsync(client::pushAndCloseBlockingly, newSingleThreadExecutor()); runAsync(pushLatch::countDown, newDelayedExecutor()); assertThat(pushed).as("Data should not be pushed").isFalse(); diff --git a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java index 6bdbe3b..faaca93 100644 --- a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java +++ b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientAutoConfiguration.java @@ -41,6 +41,7 @@ PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterReg return PrometheusRSocketClient.build(meterRegistry, properties.createClientTransport()) .retry(Retry.backoff(properties.getMaxRetries(), properties.getFirstBackoff()) .maxBackoff(properties.getMaxBackoff())) + .timeout(properties.getTimeout()) .connectBlockingly(); } } diff --git a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java index 3fde026..4729af4 100644 --- a/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java +++ b/starter-spring/src/main/java/io/micrometer/prometheus/rsocket/autoconfigure/PrometheusRSocketClientProperties.java @@ -63,6 +63,11 @@ public class PrometheusRSocketClientProperties { */ private boolean secure = false; + /** + * The timeout in seconds to be used for establishing the connection and pushing the data + */ + private Duration timeout = Duration.ofSeconds(5); + public long getMaxRetries() { return maxRetries; } @@ -119,6 +124,14 @@ public boolean isSecure() { return secure; } + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + ClientTransport createClientTransport() { final TcpClient tcpClient = TcpClient.create().host(this.host).port(this.port); return this.transport.create(this.secure ? tcpClient.secure() : tcpClient);