@@ -51,7 +51,7 @@ use crate::Msg;
51
51
use crossbeam_channel:: { bounded, SendTimeoutError , Sender } ;
52
52
use std:: io;
53
53
use std:: io:: Write ;
54
- use std:: sync:: atomic:: AtomicU64 ;
54
+ use std:: sync:: atomic:: AtomicUsize ;
55
55
use std:: sync:: atomic:: Ordering ;
56
56
use std:: sync:: Arc ;
57
57
use std:: thread:: JoinHandle ;
@@ -124,11 +124,20 @@ pub struct WorkerGuard {
124
124
/// [fmt]: mod@tracing_subscriber::fmt
125
125
#[ derive( Clone , Debug ) ]
126
126
pub struct NonBlocking {
127
- error_counter : Arc < AtomicU64 > ,
127
+ error_counter : ErrorCounter ,
128
128
channel : Sender < Msg > ,
129
129
is_lossy : bool ,
130
130
}
131
131
132
+ /// Tracks the number of times a log line was dropped by the background thread.
133
+ ///
134
+ /// If the non-blocking writer is not configured in [lossy mode], the error
135
+ /// count should always be 0.
136
+ ///
137
+ /// [lossy mode]: NonBlockingBuilder::lossy
138
+ #[ derive( Clone , Debug ) ]
139
+ pub struct ErrorCounter ( Arc < AtomicUsize > ) ;
140
+
132
141
impl NonBlocking {
133
142
/// Returns a new `NonBlocking` writer wrapping the provided `writer`.
134
143
///
@@ -157,7 +166,7 @@ impl NonBlocking {
157
166
(
158
167
Self {
159
168
channel : sender,
160
- error_counter : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
169
+ error_counter : ErrorCounter ( Arc :: new ( AtomicUsize :: new ( 0 ) ) ) ,
161
170
is_lossy,
162
171
} ,
163
172
worker_guard,
@@ -166,7 +175,7 @@ impl NonBlocking {
166
175
167
176
/// Returns a counter for the number of times logs where dropped. This will always return zero if
168
177
/// `NonBlocking` is not lossy.
169
- pub fn error_counter ( & self ) -> Arc < AtomicU64 > {
178
+ pub fn error_counter ( & self ) -> ErrorCounter {
170
179
self . error_counter . clone ( )
171
180
}
172
181
}
@@ -218,7 +227,7 @@ impl std::io::Write for NonBlocking {
218
227
let buf_size = buf. len ( ) ;
219
228
if self . is_lossy {
220
229
if self . channel . try_send ( Msg :: Line ( buf. to_vec ( ) ) ) . is_err ( ) {
221
- self . error_counter . fetch_add ( 1 , Ordering :: Release ) ;
230
+ self . error_counter . incr_saturating ( ) ;
222
231
}
223
232
} else {
224
233
return match self . channel . send ( Msg :: Line ( buf. to_vec ( ) ) ) {
@@ -279,6 +288,43 @@ impl Drop for WorkerGuard {
279
288
}
280
289
}
281
290
291
+ // === impl ErrorCounter ===
292
+
293
+ impl ErrorCounter {
294
+ /// Returns the number of log lines that have been dropped.
295
+ ///
296
+ /// If the non-blocking writer is not configured in [lossy mode], the error
297
+ /// count should always be 0.
298
+ ///
299
+ /// [lossy mode]: NonBlockingBuilder::lossy
300
+ pub fn dropped_lines ( & self ) -> usize {
301
+ self . 0 . load ( Ordering :: Acquire )
302
+ }
303
+
304
+ fn incr_saturating ( & self ) {
305
+ let mut curr = self . 0 . load ( Ordering :: Acquire ) ;
306
+ // We don't need to enter the CAS loop if the current value is already
307
+ // `usize::MAX`.
308
+ if curr == usize:: MAX {
309
+ return ;
310
+ }
311
+
312
+ // This is implemented as a CAS loop rather than as a simple
313
+ // `fetch_add`, because we don't want to wrap on overflow. Instead, we
314
+ // need to ensure that saturating addition is performed.
315
+ loop {
316
+ let val = curr. saturating_add ( 1 ) ;
317
+ match self
318
+ . 0
319
+ . compare_exchange ( curr, val, Ordering :: AcqRel , Ordering :: Acquire )
320
+ {
321
+ Ok ( _) => return ,
322
+ Err ( actual) => curr = actual,
323
+ }
324
+ }
325
+ }
326
+ }
327
+
282
328
#[ cfg( test) ]
283
329
mod test {
284
330
use super :: * ;
@@ -321,7 +367,7 @@ mod test {
321
367
let error_count = non_blocking. error_counter ( ) ;
322
368
323
369
non_blocking. write_all ( b"Hello" ) . expect ( "Failed to write" ) ;
324
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
370
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
325
371
326
372
let handle = thread:: spawn ( move || {
327
373
non_blocking. write_all ( b", World" ) . expect ( "Failed to write" ) ;
@@ -330,7 +376,7 @@ mod test {
330
376
// Sleep a little to ensure previously spawned thread gets blocked on write.
331
377
thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
332
378
// We should not drop logs when blocked.
333
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
379
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
334
380
335
381
// Read the first message to unblock sender.
336
382
let mut line = rx. recv ( ) . unwrap ( ) ;
@@ -365,17 +411,17 @@ mod test {
365
411
366
412
// First write will not block
367
413
write_non_blocking ( & mut non_blocking, b"Hello" ) ;
368
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
414
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
369
415
370
416
// Second write will not block as Worker will have called `recv` on channel.
371
417
// "Hello" is not yet consumed. MockWriter call to write_all will block until
372
418
// "Hello" is consumed.
373
419
write_non_blocking ( & mut non_blocking, b", World" ) ;
374
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
420
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
375
421
376
422
// Will sit in NonBlocking channel's buffer.
377
423
write_non_blocking ( & mut non_blocking, b"Test" ) ;
378
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
424
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
379
425
380
426
// Allow a line to be written. "Hello" message will be consumed.
381
427
// ", World" will be able to write to MockWriter.
@@ -385,12 +431,12 @@ mod test {
385
431
386
432
// This will block as NonBlocking channel is full.
387
433
write_non_blocking ( & mut non_blocking, b"Universe" ) ;
388
- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
434
+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
389
435
390
436
// Finally the second message sent will be consumed.
391
437
let line = rx. recv ( ) . unwrap ( ) ;
392
438
assert_eq ! ( line, ", World" ) ;
393
- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
439
+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
394
440
}
395
441
396
442
#[ test]
@@ -426,6 +472,6 @@ mod test {
426
472
}
427
473
428
474
assert_eq ! ( 10 , hello_count) ;
429
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
475
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
430
476
}
431
477
}
0 commit comments