Skip to content

Commit 85fb352

Browse files
committed
feat: backoff + websocket example reconnect
1 parent b78abf8 commit 85fb352

File tree

8 files changed

+163
-20
lines changed

8 files changed

+163
-20
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@
4949
- [BREAKING] Renamed `to_kbevent` to `to_keyboard_event`.
5050
- [BREAKING] `after_next_render` returns `RenderInfo`.
5151
- `web_sys` and `wasm_bindgen` available in `seed::web_sys` and `seed::wasm_bindgen`.
52-
- Added `WebSocket` + related items.
52+
- Added `WebSocket` + related items (#8).
5353
- Exposed `App::mailbox`.
54+
- Added `streams::backoff` + updated `websocket` example.
5455

5556
## v0.6.0
5657
- Implemented `UpdateEl` for `Filter` and `FilterMap`.

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@ enclose = "1.1.8"
2727
gloo-timers = { version = "0.2.1", features = ["futures"] }
2828
gloo-file = { version = "0.1.0", features = ["futures"] }
2929
indexmap = "1.3.2"
30-
js-sys = "0.3.37"
31-
pulldown-cmark = "0.7.0"
30+
js-sys = "0.3.39"
31+
pulldown-cmark = "0.7.1"
32+
rand = { version = "0.7.3", features = ["wasm-bindgen", "small_rng"] }
3233
serde = { version = "1.0.106", features = ['derive'] }
3334
serde_json = "1.0.51"
34-
wasm-bindgen = {version = "0.2.60", features = ["serde-serialize"]}
35-
wasm-bindgen-futures = "0.4.10"
35+
wasm-bindgen = {version = "0.2.62", features = ["serde-serialize"]}
36+
wasm-bindgen-futures = "0.4.12"
3637
# @TODO: remove once we can use entities without `Debug` in `log!` and `error!` on `stable` Rust.
3738
# https://github.com/Centril/rfcs/blob/rfc/quick-debug-macro/text/0000-quick-debug-macro.md#types-which-are-not-debug
3839
dbg = "1.0.4"
3940
futures = "0.3.4"
4041
uuid = { version = "0.8.1", features = ["v4", "wasm-bindgen"] }
4142

4243
[dependencies.web-sys]
43-
version = "0.3.37"
44+
version = "0.3.39"
4445
features = [
4546
"AbortController",
4647
"AbortSignal",

examples/websocket/src/client.rs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,62 @@ struct Model {
1313
messages: Vec<String>,
1414
input_text: String,
1515
web_socket: WebSocket,
16+
web_socket_reconnector: Option<StreamHandle>,
1617
}
1718

1819
// ------ ------
1920
// Init
2021
// ------ ------
2122

2223
fn init(_: Url, orders: &mut impl Orders<Msg>) -> Model {
23-
let web_socket = WebSocket::builder(WS_URL, orders)
24-
.on_open(|| log!("WebSocket connection is open now"))
25-
.on_message(Msg::MessageReceived)
26-
.on_close(Msg::WebSocketClosed)
27-
.on_error(|| log!("Error"))
28-
.build_and_open()
29-
.unwrap();
30-
3124
Model {
3225
sent_messages_count: 0,
3326
messages: Vec::new(),
3427
input_text: String::new(),
35-
web_socket,
28+
web_socket: create_websocket(orders),
29+
web_socket_reconnector: None,
3630
}
3731
}
3832

33+
fn create_websocket(orders: &impl Orders<Msg>) -> WebSocket {
34+
WebSocket::builder(WS_URL, orders)
35+
.on_open(|| Msg::WebSocketOpened)
36+
.on_message(Msg::MessageReceived)
37+
.on_close(Msg::WebSocketClosed)
38+
.on_error(|| Msg::WebSocketFailed)
39+
.build_and_open()
40+
.unwrap()
41+
}
42+
3943
// ------ ------
4044
// Update
4145
// ------ ------
4246

4347
enum Msg {
48+
WebSocketOpened,
4449
MessageReceived(WebSocketMessage),
4550
CloseWebSocket,
4651
WebSocketClosed(CloseEvent),
52+
WebSocketFailed,
53+
ReconnectWebSocket(usize),
4754
InputTextChanged(String),
4855
SendMessage(shared::ClientMessage),
4956
}
5057

51-
fn update(msg: Msg, mut model: &mut Model, _: &mut impl Orders<Msg>) {
58+
fn update(msg: Msg, mut model: &mut Model, orders: &mut impl Orders<Msg>) {
5259
match msg {
60+
Msg::WebSocketOpened => {
61+
model.web_socket_reconnector = None;
62+
log!("WebSocket connection is open now");
63+
}
5364
Msg::MessageReceived(message) => {
5465
log!("Client received a message");
5566
model
5667
.messages
5768
.push(message.json::<shared::ServerMessage>().unwrap().text);
5869
}
5970
Msg::CloseWebSocket => {
71+
model.web_socket_reconnector = None;
6072
model
6173
.web_socket
6274
.close(None, Some("user clicked Close button"))
@@ -69,6 +81,25 @@ fn update(msg: Msg, mut model: &mut Model, _: &mut impl Orders<Msg>) {
6981
log!("Code:", close_event.code());
7082
log!("Reason:", close_event.reason());
7183
log!("==================");
84+
85+
// Chrome doesn't invoke `on_error` when the connection is lost.
86+
if !close_event.was_clean() && model.web_socket_reconnector.is_none() {
87+
model.web_socket_reconnector = Some(
88+
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
89+
);
90+
}
91+
}
92+
Msg::WebSocketFailed => {
93+
log!("WebSocket failed");
94+
if model.web_socket_reconnector.is_none() {
95+
model.web_socket_reconnector = Some(
96+
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
97+
);
98+
}
99+
}
100+
Msg::ReconnectWebSocket(retries) => {
101+
log!("Reconnect attempt:", retries);
102+
model.web_socket = create_websocket(orders);
72103
}
73104
Msg::InputTextChanged(input_text) => {
74105
model.input_text = input_text;

src/app/streams.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use web_sys::Event;
77
mod event_stream;
88
use event_stream::EventStream;
99

10+
mod backoff_stream;
11+
use backoff_stream::BackoffStream;
12+
1013
// ------ Interval stream ------
1114

1215
/// Stream no values on predefined time interval in milliseconds.
@@ -31,6 +34,35 @@ pub fn interval<MsU>(
3134
IntervalStream::new(ms).map(move |_| handler.clone()())
3235
}
3336

37+
// ------ Backoff stream ------
38+
39+
/// Stream retries count in increasing intervals.
40+
///
41+
/// Algorithm - [Truncated exponential backoff](https://cloud.google.com/storage/docs/exponential-backoff)
42+
///
43+
/// # Arguments
44+
///
45+
/// * `max_seconds` - Typically `32` or `64` seconds. Default is `32`.
46+
/// * `handler` - Receives the number of retries (starting from 1); Has to return `Msg`, `Option<Msg>` or `()`.
47+
///
48+
/// # Example
49+
///
50+
/// ```rust,no_run
51+
///orders.stream(streams::backoff(None, |_retries| Msg::OnTick));
52+
///orders.stream_with_handle(streams::backoff(Some(15), |_| log!("Tick!")));
53+
/// ```
54+
///
55+
/// # Panics
56+
///
57+
/// Panics when the handler doesn't return `Msg`, `Option<Msg>` or `()`.
58+
/// (It will be changed to a compile-time error).
59+
pub fn backoff<MsU>(
60+
max_seconds: Option<u32>,
61+
handler: impl FnOnce(usize) -> MsU + Clone + 'static,
62+
) -> impl Stream<Item = MsU> {
63+
BackoffStream::new(max_seconds.unwrap_or(32)).map(move |retries| handler.clone()(retries))
64+
}
65+
3466
// ------ Window Event stream ------
3567

3668
/// Stream `Window` `web_sys::Event`s.

src/app/streams/backoff_stream.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use futures::channel::mpsc;
2+
use futures::stream::Stream;
3+
use gloo_timers::callback::Timeout;
4+
use rand::{rngs::SmallRng, Rng, SeedableRng};
5+
use std::convert::TryFrom;
6+
use std::pin::Pin;
7+
use std::rc::Rc;
8+
use std::task::{Context, Poll};
9+
10+
// ------ BackoffStream ------
11+
12+
/// [Truncated exponential backoff](https://cloud.google.com/storage/docs/exponential-backoff)
13+
#[derive(Debug)]
14+
pub struct BackoffStream {
15+
max_seconds: u32,
16+
retries: usize,
17+
timeout: Timeout,
18+
tick_sender: Rc<mpsc::UnboundedSender<()>>,
19+
tick_receiver: mpsc::UnboundedReceiver<()>,
20+
}
21+
22+
impl BackoffStream {
23+
pub fn new(max_seconds: u32) -> Self {
24+
let (tick_sender, tick_receiver) = mpsc::unbounded();
25+
let tick_sender = Rc::new(tick_sender);
26+
27+
let retries = 0;
28+
Self {
29+
max_seconds,
30+
retries,
31+
timeout: start_timeout(wait_time(retries, max_seconds), &tick_sender),
32+
tick_sender,
33+
tick_receiver,
34+
}
35+
}
36+
}
37+
38+
impl Stream for BackoffStream {
39+
type Item = usize;
40+
41+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
42+
match Stream::poll_next(Pin::new(&mut self.tick_receiver), cx) {
43+
Poll::Ready(Some(_)) => {
44+
self.retries += 1;
45+
self.timeout =
46+
start_timeout(wait_time(self.retries, self.max_seconds), &self.tick_sender);
47+
Poll::Ready(Some(self.retries))
48+
}
49+
Poll::Ready(None) => Poll::Ready(None),
50+
Poll::Pending => Poll::Pending,
51+
}
52+
}
53+
}
54+
55+
fn wait_time(retries: usize, max_seconds: u32) -> u32 {
56+
let retries = u32::try_from(retries).unwrap_or(u32::max_value());
57+
let random_ms = SmallRng::from_entropy().gen_range(0, 1001);
58+
59+
let duration = 2_u32
60+
.saturating_pow(retries)
61+
.saturating_mul(1000)
62+
.saturating_add(random_ms);
63+
let max_duration = max_seconds.saturating_mul(1000);
64+
65+
u32::min(duration, max_duration)
66+
}
67+
68+
fn start_timeout(ms: u32, tick_sender: &Rc<mpsc::UnboundedSender<()>>) -> Timeout {
69+
let tick_sender = Rc::clone(tick_sender);
70+
Timeout::new(ms, move || {
71+
tick_sender.unbounded_send(()).expect("send backoff tick")
72+
})
73+
}

src/app/streams/event_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use web_sys::EventTarget;
99

1010
// ------ EventStream ------
1111

12-
// @TODO Replace `mpsc` with `crossbeam`? (And integrate it into the other Seed parts (e.g. `Listener`, `SubManager`)).
12+
// @TODO Replace `mpsc` with `crossbeam`, `futures-signals` or `flume`?
13+
// (And integrate it into the other Seed parts (e.g. `Listener`, `SubManager`, `BackoffStream`)).
1314

1415
// @TODO Update it to support different `web_sys` events
1516
// during implementation of https://github.com/seed-rs/seed/issues/331

src/browser/web_socket.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub enum WebSocketError {
7474
///
7575
/// [MDN reference](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
7676
#[derive(Debug)]
77+
#[must_use = "WebSocket is closed on drop"]
7778
pub struct WebSocket {
7879
ws: web_sys::WebSocket,
7980
callbacks: Callbacks,
@@ -93,7 +94,7 @@ impl WebSocket {
9394
/// _Note:_ Always prefer `wss://` - encrypted and more reliable.
9495
pub fn builder<U: AsRef<str>, Ms: 'static, O: Orders<Ms>>(
9596
url: U,
96-
orders: &mut O,
97+
orders: &O,
9798
) -> Builder<U, Ms, O> {
9899
Builder::new(url, orders)
99100
}

src/browser/web_socket/builder.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) struct Callbacks {
3131
///```
3232
pub struct Builder<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> {
3333
url: U,
34-
orders: &'a mut O,
34+
orders: &'a O,
3535
callbacks: Callbacks,
3636
protocols: &'a [&'a str],
3737
binary_type: Option<BinaryType>,
@@ -40,7 +40,7 @@ pub struct Builder<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> {
4040

4141
impl<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> Builder<'a, U, Ms, O> {
4242
// Note: `WebSocket::builder` is the preferred way how to crate a new `Builder` instance.
43-
pub(crate) fn new(url: U, orders: &'a mut O) -> Self {
43+
pub(crate) fn new(url: U, orders: &'a O) -> Self {
4444
Self {
4545
url,
4646
orders,
@@ -160,6 +160,9 @@ impl<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> Builder<'a, U, Ms, O> {
160160
///
161161
/// Returns `WebSocketError::OpenError` when Web Socket opening fails.
162162
/// E.g. when the chosen port is blocked.
163+
///
164+
/// _Note:_: It doesn't return error when the socket is open on the client side,
165+
/// but fails to connect to the server - use `on_error` handler to resolve such cases.
163166
pub fn build_and_open(self) -> Result<WebSocket> {
164167
WebSocket::new(
165168
self.url.as_ref(),

0 commit comments

Comments
 (0)