@@ -10,18 +10,20 @@ mod tests {
10
10
use chrono:: Utc ;
11
11
use futures:: StreamExt ;
12
12
use graphql_client:: * ;
13
- use std:: { sync:: Once , time:: Duration } ;
13
+ use std:: {
14
+ sync:: Once ,
15
+ time:: { Duration , Instant } ,
16
+ } ;
14
17
use tokio:: { select, sync:: oneshot} ;
15
18
use vector:: {
16
19
self ,
17
20
api:: { self , client:: subscription:: SubscriptionClient } ,
18
21
config:: Config ,
19
- heartbeat,
20
- internal_events:: { emit, GeneratorEventProcessed } ,
22
+ internal_events:: { emit, GeneratorEventProcessed , Heartbeat } ,
21
23
test_util:: { next_addr, retry_until} ,
22
24
} ;
23
25
24
- static METRIC_INIT : Once = Once :: new ( ) ;
26
+ static METRICS_INIT : Once = Once :: new ( ) ;
25
27
26
28
#[ derive( GraphQLQuery ) ]
27
29
#[ graphql(
@@ -58,10 +60,27 @@ mod tests {
58
60
struct EventsProcessedMetricsSubscription ;
59
61
60
62
// Initialize the metrics system. Idempotent.
61
- fn init_metrics ( ) {
62
- METRIC_INIT . call_once ( || {
63
+ fn init_metrics ( ) -> oneshot :: Sender < ( ) > {
64
+ METRICS_INIT . call_once ( || {
63
65
let _ = vector:: metrics:: init ( ) ;
64
- } )
66
+ } ) ;
67
+
68
+ let ( shutdown_tx, mut shutdown_rx) = oneshot:: channel :: < ( ) > ( ) ;
69
+ tokio:: spawn ( async move {
70
+ let since = Instant :: now ( ) ;
71
+ let mut timer = tokio:: time:: interval ( Duration :: from_secs ( 1 ) ) ;
72
+
73
+ loop {
74
+ select ! {
75
+ _ = & mut shutdown_rx => break ,
76
+ _ = timer. tick( ) => {
77
+ emit( Heartbeat { since } ) ;
78
+ }
79
+ }
80
+ }
81
+ } ) ;
82
+
83
+ shutdown_tx
65
84
}
66
85
67
86
// Provides a config that enables the API server, assigned to a random port. Implicitly
@@ -186,27 +205,20 @@ mod tests {
186
205
assert_matches ! ( heartbeats. next( ) . await , None ) ;
187
206
}
188
207
189
- async fn new_uptime_subscription (
190
- client : & SubscriptionClient ,
191
- num_results : usize ,
192
- interval : i64 ,
193
- ) {
208
+ async fn new_uptime_subscription ( client : & SubscriptionClient ) {
194
209
let request_body =
195
- UptimeMetricsSubscription :: build_query ( uptime_metrics_subscription:: Variables {
196
- interval,
197
- } ) ;
210
+ UptimeMetricsSubscription :: build_query ( uptime_metrics_subscription:: Variables ) ;
198
211
199
212
let subscription = client
200
213
. start :: < UptimeMetricsSubscription > ( & request_body)
201
214
. await
202
215
. unwrap ( ) ;
203
216
204
217
tokio:: pin! {
205
- let uptime = subscription. stream( ) . skip( num_results ) ;
218
+ let uptime = subscription. stream( ) . skip( 1 ) ;
206
219
}
207
220
208
- // Uptime should be at least the number of seconds as the results - 1, to account
209
- // for the initial uptime
221
+ // Uptime should be above zero
210
222
assert ! (
211
223
uptime
212
224
. take( 1 )
@@ -218,7 +230,7 @@ mod tests {
218
230
. unwrap( )
219
231
. uptime_metrics
220
232
. seconds
221
- > num_results as f64 - 1.0
233
+ > 0.00
222
234
)
223
235
}
224
236
@@ -331,10 +343,9 @@ mod tests {
331
343
let bind = config. api . bind . unwrap ( ) ;
332
344
let client = new_subscription_client ( bind) . await ;
333
345
334
- init_metrics ( ) ;
335
- tokio:: spawn ( heartbeat:: heartbeat ( ) ) ;
346
+ let _metrics = init_metrics ( ) ;
336
347
337
- new_uptime_subscription ( & client, 3 , 1200 ) . await ;
348
+ new_uptime_subscription ( & client) . await ;
338
349
}
339
350
340
351
#[ tokio:: test]
@@ -345,7 +356,7 @@ mod tests {
345
356
let bind = config. api . bind . unwrap ( ) ;
346
357
let client = new_subscription_client ( bind) . await ;
347
358
348
- init_metrics ( ) ;
359
+ let _metrics = init_metrics ( ) ;
349
360
350
361
new_events_processed_subscription ( & client, 3 , 100 ) . await ;
351
362
}
@@ -358,11 +369,10 @@ mod tests {
358
369
let bind = config. api . bind . unwrap ( ) ;
359
370
let client = new_subscription_client ( bind) . await ;
360
371
361
- init_metrics ( ) ;
362
- tokio:: spawn ( heartbeat:: heartbeat ( ) ) ;
372
+ let _metrics = init_metrics ( ) ;
363
373
364
374
futures:: join! {
365
- new_uptime_subscription( & client, 3 , 1200 ) ,
375
+ new_uptime_subscription( & client) ,
366
376
new_heartbeat_subscription( & client, 3 , 500 ) ,
367
377
} ;
368
378
}
0 commit comments