Skip to content

Commit 24ea6eb

Browse files
authored
examples: improve irpc-iroh examples (#86)
A bit better naming and docs for the examples. Also adds a new example that uses shared state on the server state instead of an actor loop.
1 parent ac699f3 commit 24ea6eb

File tree

3 files changed

+248
-25
lines changed

3 files changed

+248
-25
lines changed
Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
//! Demonstrates how to talk to an actor loop both from the same process and from remotes.
2+
//!
3+
//! The [`StorageApi`] struct is only defined once and can be used both locally and as a remote client.
4+
15
use anyhow::Result;
26
use iroh::{protocol::Router, Endpoint};
37

@@ -30,7 +34,7 @@ async fn remote() -> Result<()> {
3034
let endpoint = Endpoint::bind().await?;
3135
let api = StorageApi::spawn();
3236
let router = Router::builder(endpoint.clone())
33-
.accept(StorageApi::ALPN, api.expose()?)
37+
.accept(StorageApi::ALPN, api.protocol_handler()?)
3438
.spawn();
3539
let addr = endpoint.addr();
3640
(router, addr)
@@ -55,7 +59,7 @@ mod storage {
5559
//!
5660
//! The only `pub` item is [`StorageApi`], everything else is private.
5761
58-
use std::collections::BTreeMap;
62+
use std::{collections::BTreeMap, sync::Arc};
5963

6064
use anyhow::{Context, Result};
6165
use iroh::{protocol::ProtocolHandler, Endpoint};
@@ -66,6 +70,7 @@ mod storage {
6670
};
6771
// Import the macro
6872
use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol};
73+
use n0_future::task::AbortOnDropHandle;
6974
use serde::{Deserialize, Serialize};
7075
use tracing::info;
7176

@@ -96,26 +101,14 @@ mod storage {
96101
List(List),
97102
}
98103

104+
#[derive(Default)]
99105
struct StorageActor {
100-
recv: tokio::sync::mpsc::Receiver<StorageMessage>,
101106
state: BTreeMap<String, String>,
102107
}
103108

104109
impl StorageActor {
105-
pub fn spawn() -> StorageApi {
106-
let (tx, rx) = tokio::sync::mpsc::channel(1);
107-
let actor = Self {
108-
recv: rx,
109-
state: BTreeMap::new(),
110-
};
111-
n0_future::task::spawn(actor.run());
112-
StorageApi {
113-
inner: Client::local(tx),
114-
}
115-
}
116-
117-
async fn run(mut self) {
118-
while let Some(msg) = self.recv.recv().await {
110+
async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver<StorageMessage>) {
111+
while let Some(msg) = rx.recv().await {
119112
self.handle(msg).await;
120113
}
121114
}
@@ -147,14 +140,21 @@ mod storage {
147140
}
148141

149142
pub struct StorageApi {
150-
inner: Client<StorageProtocol>,
143+
client: Client<StorageProtocol>,
144+
_actor_task: Option<Arc<AbortOnDropHandle<()>>>,
151145
}
152146

153147
impl StorageApi {
154148
pub const ALPN: &[u8] = b"irpc-iroh/derive-demo/0";
155149

156150
pub fn spawn() -> Self {
157-
StorageActor::spawn()
151+
let (tx, rx) = tokio::sync::mpsc::channel(2);
152+
let actor = StorageActor::default();
153+
let actor_task = n0_future::task::spawn(actor.run(rx));
154+
StorageApi {
155+
client: Client::local(tx),
156+
_actor_task: Some(Arc::new(AbortOnDropHandle::new(actor_task))),
157+
}
158158
}
159159

160160
pub fn connect(
@@ -163,29 +163,30 @@ mod storage {
163163
) -> Result<StorageApi> {
164164
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
165165
Ok(StorageApi {
166-
inner: Client::boxed(conn),
166+
client: Client::boxed(conn),
167+
_actor_task: None,
167168
})
168169
}
169170

170-
pub fn expose(&self) -> Result<impl ProtocolHandler> {
171+
pub fn protocol_handler(&self) -> Result<impl ProtocolHandler> {
171172
let local = self
172-
.inner
173+
.client
173174
.as_local()
174175
.context("can not listen on remote service")?;
175176
Ok(IrohProtocol::new(StorageProtocol::remote_handler(local)))
176177
}
177178

178179
pub async fn get(&self, key: String) -> irpc::Result<Option<String>> {
179-
self.inner.rpc(Get { key }).await
180+
self.client.rpc(Get { key }).await
180181
}
181182

182183
pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
183-
self.inner.server_streaming(List, 10).await
184+
self.client.server_streaming(List, 10).await
184185
}
185186

186187
pub async fn set(&self, key: String, value: String) -> irpc::Result<()> {
187188
let msg = Set { key, value };
188-
self.inner.rpc(msg).await
189+
self.client.rpc(msg).await
189190
}
190191
}
191192
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//! Demonstrates the typical pattern where the server runs an actor loop that processes incoming
2+
//! messages sequentially.
3+
14
#[tokio::main]
25
async fn main() -> anyhow::Result<()> {
36
cli::run().await
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
//! This example demonstrates using irpc-iroh with a cloneable state struct
2+
//! on the server side instead of with an actor loop.
3+
4+
use anyhow::Result;
5+
use iroh::{protocol::Router, Endpoint};
6+
7+
use self::storage::{StorageClient, StorageServer};
8+
9+
#[tokio::main]
10+
async fn main() -> Result<()> {
11+
tracing_subscriber::fmt::init();
12+
13+
// Start the server.
14+
let (server_router, server_addr) = {
15+
let endpoint = Endpoint::bind().await?;
16+
let storage = StorageServer::default();
17+
let router = Router::builder(endpoint)
18+
.accept(storage::ALPN, storage)
19+
.spawn();
20+
let addr = router.endpoint().addr();
21+
(router, addr)
22+
};
23+
24+
// Connect by passing an endpoint, which allows automatic reconnection.
25+
let client_endpoint = Endpoint::bind().await?;
26+
let api = StorageClient::connect(client_endpoint, server_addr.clone());
27+
api.set("hello", "world").await?;
28+
api.set("goodbye", "see you soon").await?;
29+
let value = api.get("hello").await?;
30+
println!("hello = {value:?}");
31+
let mut list = api.list().await?;
32+
while let Some(value) = list.recv().await? {
33+
println!("list: {value:?}");
34+
}
35+
36+
// Or create a client from a connection directly.
37+
let client2 = Endpoint::bind().await?;
38+
let conn = client2.connect(server_addr, storage::ALPN).await?;
39+
let api = StorageClient::from_connection(conn);
40+
let value = api.get("goodbye").await?;
41+
println!("goodbye = {value:?}");
42+
43+
drop(server_router);
44+
Ok(())
45+
}
46+
47+
mod storage {
48+
//! Implementation of our storage service.
49+
50+
use std::{
51+
collections::BTreeMap,
52+
sync::{Arc, Mutex, MutexGuard},
53+
};
54+
55+
use anyhow::Result;
56+
use iroh::{
57+
endpoint::Connection,
58+
protocol::{AcceptError, ProtocolHandler},
59+
Endpoint,
60+
};
61+
use irpc::{
62+
channel::{mpsc, oneshot},
63+
rpc_requests, Client, WithChannels,
64+
};
65+
// Import the macro
66+
use irpc_iroh::{read_request, IrohLazyRemoteConnection, IrohRemoteConnection};
67+
use serde::{Deserialize, Serialize};
68+
use tracing::info;
69+
70+
pub const ALPN: &[u8] = b"irpc/example-storage/0";
71+
72+
#[derive(Debug, Serialize, Deserialize)]
73+
struct Get {
74+
key: String,
75+
}
76+
77+
#[derive(Debug, Serialize, Deserialize)]
78+
struct List;
79+
80+
#[derive(Debug, Serialize, Deserialize)]
81+
struct Set {
82+
key: String,
83+
value: String,
84+
}
85+
86+
#[derive(Debug, Serialize, Deserialize)]
87+
struct SetMany;
88+
89+
// Use the macro to generate both the StorageProtocol and StorageMessage enums
90+
// plus implement Channels for each type
91+
#[rpc_requests(message = StorageMessage)]
92+
#[derive(Serialize, Deserialize, Debug)]
93+
enum StorageProtocol {
94+
#[rpc(tx=oneshot::Sender<Option<String>>)]
95+
Get(Get),
96+
#[rpc(tx=oneshot::Sender<()>)]
97+
Set(Set),
98+
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
99+
SetMany(SetMany),
100+
#[rpc(tx=mpsc::Sender<String>)]
101+
List(List),
102+
}
103+
104+
#[derive(Debug, Clone, Default)]
105+
pub struct StorageServer {
106+
state: Arc<Mutex<BTreeMap<String, String>>>,
107+
}
108+
109+
impl ProtocolHandler for StorageServer {
110+
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
111+
while let Some(msg) = read_request::<StorageProtocol>(&conn).await? {
112+
self.handle_message(msg).await;
113+
}
114+
conn.closed().await;
115+
Ok(())
116+
}
117+
}
118+
119+
impl StorageServer {
120+
async fn handle_message(&self, msg: StorageMessage) {
121+
info!("handle message {:?}", msg);
122+
match msg {
123+
StorageMessage::Get(msg) => {
124+
let WithChannels { tx, inner, .. } = msg;
125+
let value = self.state().get(&inner.key).cloned();
126+
tx.send(value).await.ok();
127+
}
128+
StorageMessage::Set(msg) => {
129+
let WithChannels { tx, inner, .. } = msg;
130+
self.state().insert(inner.key, inner.value);
131+
tx.send(()).await.ok();
132+
}
133+
StorageMessage::SetMany(msg) => {
134+
let WithChannels { tx, mut rx, .. } = msg;
135+
let mut i = 0;
136+
while let Ok(Some((key, value))) = rx.recv().await {
137+
self.state().insert(key, value);
138+
i += 1;
139+
}
140+
tx.send(i).await.ok();
141+
}
142+
StorageMessage::List(msg) => {
143+
let WithChannels { tx, .. } = msg;
144+
let values = {
145+
let state = self.state();
146+
// We clone the values so that we don't keep the lock open for the lifetime of the request.
147+
// If we wouldn't want to clone here because there can be many entries,
148+
// we have to redesign the storage to support a notion of snapshots, or use an async lock
149+
// but that would mean that no other requests can be processed while the stream here is sent out.
150+
let values: Vec<_> = state
151+
.iter()
152+
.map(|(key, value)| format!("{key}={value}"))
153+
.collect();
154+
values
155+
};
156+
for value in values {
157+
if tx.send(value).await.is_err() {
158+
break;
159+
}
160+
}
161+
}
162+
}
163+
}
164+
165+
fn state(&self) -> MutexGuard<'_, BTreeMap<String, String>> {
166+
self.state.lock().expect("poisoned")
167+
}
168+
}
169+
170+
pub struct StorageClient {
171+
inner: Client<StorageProtocol>,
172+
}
173+
174+
impl StorageClient {
175+
/// Connect via an [`Endpoint`].
176+
///
177+
/// This will create a client that automatically reconnects if the connection closes.
178+
pub fn connect(endpoint: Endpoint, addr: impl Into<iroh::EndpointAddr>) -> StorageClient {
179+
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), ALPN.to_vec());
180+
StorageClient {
181+
inner: Client::boxed(conn),
182+
}
183+
}
184+
185+
/// Create a client from a [`Connection`].
186+
///
187+
/// This creates a client from a single [`Connection`]. If the connection closes, the client will
188+
/// not reconnect and all calls will return errors.
189+
pub fn from_connection(conn: Connection) -> StorageClient {
190+
StorageClient {
191+
inner: Client::boxed(IrohRemoteConnection::new(conn)),
192+
}
193+
}
194+
195+
pub async fn get(&self, key: impl ToString) -> Result<Option<String>, irpc::Error> {
196+
self.inner
197+
.rpc(Get {
198+
key: key.to_string(),
199+
})
200+
.await
201+
}
202+
203+
pub async fn list(&self) -> Result<mpsc::Receiver<String>, irpc::Error> {
204+
self.inner.server_streaming(List, 10).await
205+
}
206+
207+
pub async fn set(
208+
&self,
209+
key: impl ToString,
210+
value: impl ToString,
211+
) -> Result<(), irpc::Error> {
212+
let msg = Set {
213+
key: key.to_string(),
214+
value: value.to_string(),
215+
};
216+
self.inner.rpc(msg).await
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)