Skip to content

feat: timeout configuration for client #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,39 @@ public class PrometheusRSocketClient {
private volatile boolean requestedDisconnect = false;
private RSocket sendingSocket;

private Duration timeout;

/**
* Creates a {@link PrometheusRSocketClient}.
*
* @param registryAndScrape the registry and scrape meter
* @param transport the client transport
* @param retry the retry configuration
* @param onKeyReceived the callback if a key has been received
*/
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
ClientTransport transport,
Retry retry,
Runnable onKeyReceived) {
this(registryAndScrape, transport, retry, Duration.ofSeconds(5), onKeyReceived);
}

/**
* Creates a {@link PrometheusRSocketClient}.
*
* @param registryAndScrape the registry and scrape meter
* @param transport the client transport
* @param retry the retry configuration
* @param timeout the timeout to connect and push the data
* @param onKeyReceived the callback if a key has been received
*/
private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
ClientTransport transport,
Retry retry,
Duration timeout,
Runnable onKeyReceived) {
this.registryAndScrape = registryAndScrape;
this.timeout = timeout;

RSocketConnector.create()
.reconnect(new Retry() {
Expand Down Expand Up @@ -116,8 +144,8 @@ public Mono<Void> fireAndForget(Payload payload) {
.increment())
.doOnNext(connection -> this.connection = connection)
.flatMap(socket -> socket.onClose()
.map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819
.onErrorReturn(1))
.map(v -> 1) // https://github.com/rsocket/rsocket-java/issues/819
.onErrorReturn(1))
.repeat(() -> !requestedDisconnect)
.subscribe();
}
Expand Down Expand Up @@ -149,6 +177,9 @@ public static <M extends MeterRegistry> Builder build(M meterRegistry, Supplier<
return new Builder(meterRegistry, scrape, clientTransport);
}

/**
* Closes the {@link PrometheusRSocketClient}
*/
public void close() {
this.requestedDisconnect = true;
if (this.connection != null) {
Expand All @@ -160,11 +191,12 @@ public void close() {
* Pushes the data in a blocking way and closes the connection.
*/
public void pushAndCloseBlockingly() {
pushAndCloseBlockingly(Duration.ofSeconds(5));
pushAndCloseBlockingly(timeout);
}

/**
* Pushes the data in a blocking way and closes the connection.
*
* @param timeout the amount of time to wait for the data to be sent
*/
public void pushAndCloseBlockingly(Duration timeout) {
Expand All @@ -183,8 +215,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
.doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!"))
.doFinally(signalType -> latch.countDown())
.subscribe();
}
catch (Exception exception) {
} catch (Exception exception) {
latch.countDown();
LOGGER.warn("Sending the payload failed!", exception);
}
Expand All @@ -193,8 +224,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
if (!latch.await(timeout.toMillis(), MILLISECONDS)) {
LOGGER.warn("Sending the payload timed out!");
}
}
catch (InterruptedException exception) {
} catch (InterruptedException exception) {
LOGGER.warn("Waiting for sending the payload was interrupted!", exception);
}
}
Expand All @@ -215,8 +245,7 @@ public void pushAndClose() {
.doOnError(throwable -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection failed!", throwable))
.doOnCancel(() -> LOGGER.warn("Pushing data to RSocket Proxy before closing the connection was cancelled!"))
.subscribe();
}
catch (Exception exception) {
} catch (Exception exception) {
LOGGER.warn("Sending the payload failed!", exception);
}
}
Expand Down Expand Up @@ -263,54 +292,111 @@ private PublicKey decodePublicKey(ByteBuffer encodedKeyBuffer) {
}
}

/**
* Builder class to create a {@link PrometheusRSocketClient}.
*/
public static class Builder {
private MeterRegistryAndScrape<?> registryAndScrape;
private final ClientTransport clientTransport;

private Retry retry = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10))
.maxBackoff(Duration.ofMinutes(10));

private Runnable onKeyReceived = () -> { };
private Duration timeout = Duration.ofSeconds(5);

private Runnable onKeyReceived = () -> {
};

<M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTransport clientTransport) {
this.registryAndScrape = new MeterRegistryAndScrape<>(registry, scrape);
this.clientTransport = clientTransport;
}

/**
* Configures the retry for {@link PrometheusRSocketClient}.
*
* @param retry the retry configuration
* @return the {@link Builder}
*/
public Builder retry(Retry retry) {
this.retry = retry;
return this;
}

/**
* Timeout for the {@link PrometheusRSocketClient}.
*
* @param timeout the timeout in seconds
* @return the {@link Builder}
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Callback of {@link PrometheusRSocketClient} if a key is received.
*
* @param onKeyReceived callback which is executed if a key is received
* @return the {@link Builder}
*/
public Builder doOnKeyReceived(Runnable onKeyReceived) {
this.onKeyReceived = onKeyReceived;
return this;
}

/**
* Connects the {@link PrometheusRSocketClient}.
*
* @return the {@link PrometheusRSocketClient}
*/
public PrometheusRSocketClient connect() {
return connect(timeout);
}

/**
* Connects the {@link PrometheusRSocketClient}.
*
* @param timeout the timeout for the client to connect
*
* @return the {@link PrometheusRSocketClient}
*/
public PrometheusRSocketClient connect(Duration timeout) {
LOGGER.debug("Connecting to RSocket Proxy...");
return new PrometheusRSocketClient(
registryAndScrape,
clientTransport,
retry,
timeout,
() -> {
LOGGER.info("Connected to RSocket Proxy!");
onKeyReceived.run();
}
);
}

/**
* Connects the {@link PrometheusRSocketClient} blockingly.
*
* @return the {@link PrometheusRSocketClient}
*/
public PrometheusRSocketClient connectBlockingly() {
return connectBlockingly(Duration.ofSeconds(5));
return connectBlockingly(timeout);
}

/**
* Connects the {@link PrometheusRSocketClient} blockingly with the given timeout.
*
* @return the {@link PrometheusRSocketClient}
*/
public PrometheusRSocketClient connectBlockingly(Duration timeout) {
LOGGER.debug("Connecting to RSocket Proxy...");
CountDownLatch latch = new CountDownLatch(1);
PrometheusRSocketClient client = new PrometheusRSocketClient(
registryAndScrape,
clientTransport,
retry,
timeout,
() -> {
LOGGER.info("Connected to RSocket Proxy!");
onKeyReceived.run();
Expand All @@ -322,8 +408,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
if (!latch.await(timeout.toMillis(), MILLISECONDS)) {
LOGGER.warn("Creating the connection and receiving the key timed out!");
}
}
catch (InterruptedException exception) {
} catch (InterruptedException exception) {
LOGGER.warn("Waiting for receiving the key was interrupted!", exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.lang.reflect.Field;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
Expand Down Expand Up @@ -165,7 +166,7 @@ public Mono<Void> fireAndForget(Payload payload) {
}

@Test
void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException {
void blockingConnectAndPush() throws NoSuchAlgorithmException, InterruptedException, ExecutionException, NoSuchFieldException, IllegalAccessException {
CountDownLatch pushLatch = new CountDownLatch(1);
AtomicBoolean pushed = new AtomicBoolean(false);
Payload payload = DefaultPayload.create(KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic().getEncoded());
Expand Down Expand Up @@ -200,6 +201,7 @@ public Mono<Payload> requestResponse(Payload payload) {
serverTransport.clientTransport()
)
.retry(Retry.max(0))
.timeout(Duration.ofSeconds(10))
.doOnKeyReceived(() -> {
await(keyReceivedLatch);
keyReceived.set(true);
Expand All @@ -211,6 +213,11 @@ public Mono<Payload> requestResponse(Payload payload) {
PrometheusRSocketClient client = clientFuture.get();
assertThat(keyReceived).as("Public key should be received(connected)").isTrue();

Field timeoutField = PrometheusRSocketClient.class.getDeclaredField("timeout");
timeoutField.setAccessible(true);
Duration timeout = (Duration)timeoutField.get(client);
assertThat(timeout.toSeconds()).isEqualTo(10L);

CompletableFuture<Void> closeFuture = runAsync(client::pushAndCloseBlockingly, newSingleThreadExecutor());
runAsync(pushLatch::countDown, newDelayedExecutor());
assertThat(pushed).as("Data should not be pushed").isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterReg
return PrometheusRSocketClient.build(meterRegistry, properties.createClientTransport())
.retry(Retry.backoff(properties.getMaxRetries(), properties.getFirstBackoff())
.maxBackoff(properties.getMaxBackoff()))
.timeout(properties.getTimeout())
.connectBlockingly();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public class PrometheusRSocketClientProperties {
*/
private boolean secure = false;

/**
* The timeout in seconds to be used for establishing the connection and pushing the data
*/
private Duration timeout = Duration.ofSeconds(5);

public long getMaxRetries() {
return maxRetries;
}
Expand Down Expand Up @@ -119,6 +124,14 @@ public boolean isSecure() {
return secure;
}

public Duration getTimeout() {
return timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

ClientTransport createClientTransport() {
final TcpClient tcpClient = TcpClient.create().host(this.host).port(this.port);
return this.transport.create(this.secure ? tcpClient.secure() : tcpClient);
Expand Down