@@ -65,11 +65,36 @@ public class PrometheusRSocketClient {
65
65
private volatile boolean requestedDisconnect = false ;
66
66
private RSocket sendingSocket ;
67
67
68
- private static Duration timeout = Duration .ofSeconds (5 );
68
+ private Duration timeout = Duration .ofSeconds (5 );
69
69
70
+ /**
71
+ * Creates a {@link PrometheusRSocketClient}.
72
+ *
73
+ * @param registryAndScrape the registry and scrape meter
74
+ * @param transport the client transport
75
+ * @param retry the retry configuration
76
+ * @param onKeyReceived the callback if a key has been received
77
+ */
78
+ private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
79
+ ClientTransport transport ,
80
+ Retry retry ,
81
+ Runnable onKeyReceived ) {
82
+ this (registryAndScrape , transport , retry , Duration .ofSeconds (5 ), onKeyReceived );
83
+ }
84
+
85
+ /**
86
+ * Creates a {@link PrometheusRSocketClient}.
87
+ *
88
+ * @param registryAndScrape the registry and scrape meter
89
+ * @param transport the client transport
90
+ * @param retry the retry configuration
91
+ * @param timeout the timeout to connect and push the data
92
+ * @param onKeyReceived the callback if a key has been received
93
+ */
70
94
private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
71
95
ClientTransport transport ,
72
96
Retry retry ,
97
+ Duration timeout ,
73
98
Runnable onKeyReceived ) {
74
99
this .registryAndScrape = registryAndScrape ;
75
100
@@ -118,8 +143,8 @@ public Mono<Void> fireAndForget(Payload payload) {
118
143
.increment ())
119
144
.doOnNext (connection -> this .connection = connection )
120
145
.flatMap (socket -> socket .onClose ()
121
- .map (v -> 1 ) // https://github.com/rsocket/rsocket-java/issues/819
122
- .onErrorReturn (1 ))
146
+ .map (v -> 1 ) // https://github.com/rsocket/rsocket-java/issues/819
147
+ .onErrorReturn (1 ))
123
148
.repeat (() -> !requestedDisconnect )
124
149
.subscribe ();
125
150
}
@@ -170,6 +195,7 @@ public void pushAndCloseBlockingly() {
170
195
171
196
/**
172
197
* Pushes the data in a blocking way and closes the connection.
198
+ *
173
199
* @param timeout the amount of time to wait for the data to be sent
174
200
*/
175
201
public void pushAndCloseBlockingly (Duration timeout ) {
@@ -188,8 +214,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
188
214
.doOnCancel (() -> LOGGER .warn ("Pushing data to RSocket Proxy before closing the connection was cancelled!" ))
189
215
.doFinally (signalType -> latch .countDown ())
190
216
.subscribe ();
191
- }
192
- catch (Exception exception ) {
217
+ } catch (Exception exception ) {
193
218
latch .countDown ();
194
219
LOGGER .warn ("Sending the payload failed!" , exception );
195
220
}
@@ -198,8 +223,7 @@ public void pushAndCloseBlockingly(Duration timeout) {
198
223
if (!latch .await (timeout .toMillis (), MILLISECONDS )) {
199
224
LOGGER .warn ("Sending the payload timed out!" );
200
225
}
201
- }
202
- catch (InterruptedException exception ) {
226
+ } catch (InterruptedException exception ) {
203
227
LOGGER .warn ("Waiting for sending the payload was interrupted!" , exception );
204
228
}
205
229
}
@@ -220,8 +244,7 @@ public void pushAndClose() {
220
244
.doOnError (throwable -> LOGGER .warn ("Pushing data to RSocket Proxy before closing the connection failed!" , throwable ))
221
245
.doOnCancel (() -> LOGGER .warn ("Pushing data to RSocket Proxy before closing the connection was cancelled!" ))
222
246
.subscribe ();
223
- }
224
- catch (Exception exception ) {
247
+ } catch (Exception exception ) {
225
248
LOGGER .warn ("Sending the payload failed!" , exception );
226
249
}
227
250
}
@@ -278,8 +301,10 @@ public static class Builder {
278
301
private Retry retry = Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10 ))
279
302
.maxBackoff (Duration .ofMinutes (10 ));
280
303
304
+ private Duration timeout ;
281
305
282
- private Runnable onKeyReceived = () -> { };
306
+ private Runnable onKeyReceived = () -> {
307
+ };
283
308
284
309
<M extends MeterRegistry > Builder (M registry , Supplier <String > scrape , ClientTransport clientTransport ) {
285
310
this .registryAndScrape = new MeterRegistryAndScrape <>(registry , scrape );
@@ -290,7 +315,6 @@ <M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTra
290
315
* Configures the retry for {@link PrometheusRSocketClient}.
291
316
*
292
317
* @param retry the retry configuration
293
- *
294
318
* @return the {@link Builder}
295
319
*/
296
320
public Builder retry (Retry retry ) {
@@ -305,7 +329,7 @@ public Builder retry(Retry retry) {
305
329
* @return the {@link Builder}
306
330
*/
307
331
public Builder timeout (Long timeout ) {
308
- PrometheusRSocketClient .timeout = Duration .ofSeconds (timeout );
332
+ this .timeout = Duration .ofSeconds (timeout );
309
333
return this ;
310
334
}
311
335
@@ -331,6 +355,7 @@ public PrometheusRSocketClient connect() {
331
355
registryAndScrape ,
332
356
clientTransport ,
333
357
retry ,
358
+ timeout ,
334
359
() -> {
335
360
LOGGER .info ("Connected to RSocket Proxy!" );
336
361
onKeyReceived .run ();
@@ -370,8 +395,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
370
395
if (!latch .await (timeout .toMillis (), MILLISECONDS )) {
371
396
LOGGER .warn ("Creating the connection and receiving the key timed out!" );
372
397
}
373
- }
374
- catch (InterruptedException exception ) {
398
+ } catch (InterruptedException exception ) {
375
399
LOGGER .warn ("Waiting for receiving the key was interrupted!" , exception );
376
400
}
377
401
0 commit comments