Skip to content
This repository was archived by the owner on Jun 8, 2022. It is now read-only.

Commit 975bce4

Browse files
authored
Merge pull request #2 from hansieodendaal/ho_add_buffer_stats_and_logs
Buffer dropped messages stats
2 parents 8732352 + 17f8a4c commit 975bce4

File tree

7 files changed

+183
-30
lines changed

7 files changed

+183
-30
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tari_broadcast_channel"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
authors = ["Filip Dulic <[email protected]>", "Vladan Popovic <[email protected]>", "Bojan Petrovic <[email protected]>", "The Tari Development Community"]
55
description = "Bounded non-blocking single-producer-multi-consumer broadcast channel"
66
license = "Apache-2.0/MIT"
@@ -14,6 +14,7 @@ edition = "2018"
1414
arc-swap = "0.4.2"
1515
futures = { version = "^0.3.1" }
1616
crossbeam-channel = "0.3.9"
17+
log = "0.4.6"
1718

1819
[dev-dependencies]
1920
criterion = "0.3.0"

examples/async-simple.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@ use futures::{executor::block_on, stream, StreamExt};
22
use tari_broadcast_channel::bounded;
33

44
fn main() {
5-
let (publisher, subscriber1) = bounded(10);
5+
let (publisher, subscriber1) = bounded(10, 1);
66
let subscriber2 = subscriber1.clone();
7+
let subscriber3 = subscriber1.clone();
8+
9+
assert_eq!(subscriber1.get_receiver_id(), 10000);
10+
assert_eq!(subscriber2.get_receiver_id(), 10001);
11+
assert_eq!(subscriber3.get_receiver_id(), 10002);
712

813
block_on(async move {
914
stream::iter(1..15).map(Ok).forward(publisher).await.unwrap();

examples/raw-simple.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
11
use tari_broadcast_channel::raw_bounded;
22

33
fn main() {
4-
let (tx, rx) = raw_bounded(10);
4+
let (tx, rx) = raw_bounded(10, 1);
55
(1..15).for_each(|x| tx.broadcast(x).unwrap());
66

7+
let rx2 = rx.clone();
8+
79
let received: Vec<i32> = rx.map(|x| *x).collect();
810
// Test that only the last 10 elements are in the received list.
911
let expected: Vec<i32> = (5..15).collect();
1012

1113
assert_eq!(expected, received);
14+
15+
// Test messages discarded
16+
assert_eq!(rx2.get_dropped_messages_state(), false);
17+
rx2.try_recv().unwrap();
18+
assert_eq!(rx2.get_dropped_messages_count(), 4);
19+
assert_eq!(rx2.get_dropped_messages_state(), true);
20+
rx2.try_recv().unwrap();
21+
assert_eq!(rx2.get_dropped_messages_count(), 4);
22+
assert_eq!(rx2.get_dropped_messages_state(), false);
1223
}

src/async_channel.rs

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,22 @@ use futures::{
55
Sink,
66
Stream,
77
};
8+
use log::*;
89
use std::{pin::Pin, sync::Arc, task::Poll};
910

10-
pub fn bounded<T>(size: usize) -> (Publisher<T>, Subscriber<T>) {
11-
let (sender, receiver) = raw_bounded(size);
11+
const LOG_TARGET: &str = "tari_broadcast_channel::async_channel";
12+
const ID_MULTIPLIER: usize = 10_000;
13+
14+
pub fn bounded<T>(size: usize, receiver_id: usize) -> (Publisher<T>, Subscriber<T>) {
15+
let (sender, receiver) = raw_bounded(size, receiver_id);
1216
let (waker, sleeper) = alarm();
17+
debug!(
18+
target: LOG_TARGET,
19+
"Buffer created with ID '{}' ({}) and size {}, consecutive subscriptions add 1 to the ID.",
20+
receiver_id * ID_MULTIPLIER,
21+
receiver_id,
22+
size
23+
);
1324
(Publisher { sender, waker }, Subscriber { receiver, sleeper })
1425
}
1526

@@ -62,17 +73,40 @@ impl<T> Stream for Subscriber<T> {
6273
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
6374
self.sleeper.register(cx.waker());
6475
match self.receiver.try_recv() {
65-
Ok(item) => Poll::Ready(Some(item)),
66-
Err(error) => match error {
67-
TryRecvError::Empty => Poll::Pending,
68-
TryRecvError::Disconnected => Poll::Ready(None),
76+
Ok(item) => {
77+
if self.receiver.get_dropped_messages_state() {
78+
debug!(
79+
target: LOG_TARGET,
80+
"Subscriber with ID '{}' has {} buffer messages dropped.",
81+
self.receiver.get_id(),
82+
self.receiver.get_dropped_messages_count()
83+
);
84+
}
85+
Poll::Ready(Some(item))
86+
},
87+
Err(error) => {
88+
trace!(
89+
target: LOG_TARGET,
90+
"Subscriber with ID '{}', receive error: '{}'",
91+
self.receiver.get_id(),
92+
error
93+
);
94+
match error {
95+
TryRecvError::Empty => Poll::Pending,
96+
TryRecvError::Disconnected => Poll::Ready(None),
97+
}
6998
},
7099
}
71100
}
72101
}
73102

74103
impl<T> Clone for Subscriber<T> {
75104
fn clone(&self) -> Self {
105+
trace!(
106+
target: LOG_TARGET,
107+
"Subscriber with ID '{}' was cloned, new subscription.",
108+
self.receiver.get_id()
109+
);
76110
Self {
77111
receiver: self.receiver.clone(),
78112
sleeper: self.sleeper.clone(),
@@ -88,6 +122,22 @@ impl<T: Send> PartialEq for Subscriber<T> {
88122

89123
impl<T: Send> Eq for Subscriber<T> {}
90124

125+
impl<T> Subscriber<T> {
126+
/// Returns the receiver id
127+
pub fn get_receiver_id(&self) -> usize {
128+
self.receiver.get_id()
129+
}
130+
131+
/// Returns the amount of dropped messages for the receiver
132+
pub fn get_dropped_messages_count(&self) -> usize {
133+
self.receiver.get_dropped_messages_count()
134+
}
135+
136+
/// Returns the receiver's dropped messgages state for the last message read
137+
pub fn get_dropped_messages_state(&self) -> bool {
138+
self.receiver.get_dropped_messages_state()
139+
}
140+
}
91141
// Helper struct used by sync and async implementations to wake Tasks / Threads
92142
#[derive(Debug)]
93143
pub struct Waker {
@@ -154,15 +204,30 @@ pub fn alarm() -> (Waker, Sleeper) {
154204

155205
#[cfg(test)]
156206
mod test {
207+
use crate::async_channel;
157208
use futures::{executor::block_on, stream, StreamExt};
158209

159210
#[test]
160211
fn channel() {
161-
let (publisher, subscriber1) = super::bounded(10);
212+
let (publisher1, subscriber1) = super::bounded(10, 1);
162213
let subscriber2 = subscriber1.clone();
214+
let subscriber3 = subscriber1.clone();
215+
216+
assert_eq!(subscriber1.get_receiver_id(), 10000);
217+
assert_eq!(subscriber2.get_receiver_id(), 10001);
218+
assert_eq!(subscriber3.get_receiver_id(), 10002);
219+
220+
let (publisher2, subscriber4): (async_channel::Publisher<usize>, async_channel::Subscriber<usize>) =
221+
super::bounded(10, 2);
222+
let subscriber5 = subscriber4.clone();
223+
let subscriber6 = subscriber4.clone();
224+
225+
assert_eq!(subscriber4.get_receiver_id(), 20000);
226+
assert_eq!(subscriber5.get_receiver_id(), 20001);
227+
assert_eq!(subscriber6.get_receiver_id(), 20002);
163228

164229
block_on(async move {
165-
stream::iter(1..15).map(|i| Ok(i)).forward(publisher).await.unwrap();
230+
stream::iter(1..15).map(|i| Ok(i)).forward(publisher1).await.unwrap();
166231
});
167232

168233
let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
@@ -171,5 +236,12 @@ mod test {
171236
let expected = (5..15).collect::<Vec<u32>>();
172237
assert_eq!(received1, expected);
173238
assert_eq!(received2, expected);
239+
// Test messages discarded
240+
subscriber3.receiver.try_recv().unwrap();
241+
assert_eq!(subscriber3.receiver.get_dropped_messages_state(), true);
242+
assert_eq!(subscriber3.receiver.get_dropped_messages_count(), 4);
243+
subscriber3.receiver.try_recv().unwrap();
244+
assert_eq!(subscriber3.receiver.get_dropped_messages_state(), false);
245+
assert_eq!(subscriber3.receiver.get_dropped_messages_count(), 4);
174246
}
175247
}

src/atomic_counter.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@ impl PartialEq for AtomicCounter {
4848
}
4949

5050
impl Eq for AtomicCounter {}
51+
52+
impl fmt::Display for AtomicCounter {
53+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
54+
write!(f, "{}", self.get())
55+
}
56+
}

0 commit comments

Comments
 (0)