Skip to content

Commit ac040e4

Browse files
authored
Merge pull request #5 from n0-computer/Frando/ergonomics
feat: add methods on `Client` to issue requests on either local or remote APIs
2 parents cf72d99 + 33c2e93 commit ac040e4

File tree

2 files changed

+220
-83
lines changed

2 files changed

+220
-83
lines changed

examples/derive.rs

Lines changed: 81 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use std::{
44
sync::Arc,
55
};
66

7-
use anyhow::bail;
7+
use anyhow::{Context, Result};
88
use irpc::{
99
channel::{oneshot, spsc},
10-
rpc::{listen, Handler},
10+
rpc::Handler,
1111
util::{make_client_endpoint, make_server_endpoint},
12-
Client, LocalSender, Request, Service, WithChannels,
12+
Client, Error, LocalSender, Service, WithChannels,
1313
};
1414
// Import the macro
1515
use irpc_derive::rpc_requests;
@@ -37,6 +37,15 @@ struct Set {
3737
value: String,
3838
}
3939

40+
impl From<(String, String)> for Set {
41+
fn from((key, value): (String, String)) -> Self {
42+
Self { key, value }
43+
}
44+
}
45+
46+
#[derive(Debug, Serialize, Deserialize)]
47+
struct SetMany;
48+
4049
// Use the macro to generate both the StorageProtocol and StorageMessage enums
4150
// plus implement Channels for each type
4251
#[rpc_requests(StorageService, message = StorageMessage)]
@@ -46,6 +55,8 @@ enum StorageProtocol {
4655
Get(Get),
4756
#[rpc(tx=oneshot::Sender<()>)]
4857
Set(Set),
58+
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
59+
SetMany(SetMany),
4960
#[rpc(tx=spsc::Sender<String>)]
5061
List(List),
5162
}
@@ -56,7 +67,7 @@ struct StorageActor {
5667
}
5768

5869
impl StorageActor {
59-
pub fn local() -> StorageApi {
70+
pub fn spawn() -> StorageApi {
6071
let (tx, rx) = tokio::sync::mpsc::channel(1);
6172
let actor = Self {
6273
recv: rx,
@@ -88,6 +99,16 @@ impl StorageActor {
8899
self.state.insert(inner.key, inner.value);
89100
tx.send(()).await.ok();
90101
}
102+
StorageMessage::SetMany(set) => {
103+
info!("set-many {:?}", set);
104+
let WithChannels { mut rx, tx, .. } = set;
105+
let mut count = 0;
106+
while let Ok(Some((key, value))) = rx.recv().await {
107+
self.state.insert(key, value);
108+
count += 1;
109+
}
110+
tx.send(count).await.ok();
111+
}
91112
StorageMessage::List(list) => {
92113
info!("list {:?}", list);
93114
let WithChannels { mut tx, .. } = list;
@@ -106,120 +127,98 @@ struct StorageApi {
106127
}
107128

108129
impl StorageApi {
109-
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> anyhow::Result<StorageApi> {
130+
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<StorageApi> {
110131
Ok(StorageApi {
111132
inner: Client::quinn(endpoint, addr),
112133
})
113134
}
114135

115-
pub fn listen(&self, endpoint: quinn::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
116-
let Some(local) = self.inner.local() else {
117-
bail!("cannot listen on a remote service");
118-
};
119-
let handler: Handler<StorageProtocol> = Arc::new(move |msg, _, tx| {
136+
pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
137+
let local = self.inner.local().context("cannot listen on remote API")?;
138+
let handler: Handler<StorageProtocol> = Arc::new(move |msg, rx, tx| {
120139
let local = local.clone();
121140
Box::pin(match msg {
122141
StorageProtocol::Get(msg) => local.send((msg, tx)),
123142
StorageProtocol::Set(msg) => local.send((msg, tx)),
143+
StorageProtocol::SetMany(msg) => local.send((msg, tx, rx)),
124144
StorageProtocol::List(msg) => local.send((msg, tx)),
125145
})
126146
});
127-
Ok(AbortOnDropHandle::new(task::spawn(listen(
128-
endpoint, handler,
129-
))))
147+
let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
148+
Ok(AbortOnDropHandle::new(join_handle))
130149
}
131150

132-
pub async fn get(&self, key: String) -> anyhow::Result<oneshot::Receiver<Option<String>>> {
133-
let msg = Get { key };
134-
match self.inner.request().await? {
135-
Request::Local(request) => {
136-
let (tx, rx) = oneshot::channel();
137-
request.send((msg, tx)).await?;
138-
Ok(rx)
139-
}
140-
Request::Remote(request) => {
141-
let (_tx, rx) = request.write(msg).await?;
142-
Ok(rx.into())
143-
}
144-
}
151+
pub async fn get(&self, key: String) -> Result<Option<String>, Error> {
152+
self.inner.rpc(Get { key }).await
145153
}
146154

147-
pub async fn list(&self) -> anyhow::Result<spsc::Receiver<String>> {
148-
let msg = List;
149-
match self.inner.request().await? {
150-
Request::Local(request) => {
151-
let (tx, rx) = spsc::channel(10);
152-
request.send((msg, tx)).await?;
153-
Ok(rx)
154-
}
155-
Request::Remote(request) => {
156-
let (_tx, rx) = request.write(msg).await?;
157-
Ok(rx.into())
158-
}
159-
}
155+
pub async fn list(&self) -> Result<spsc::Receiver<String>, Error> {
156+
self.inner.server_streaming(List, 16).await
160157
}
161158

162-
pub async fn set(&self, key: String, value: String) -> anyhow::Result<oneshot::Receiver<()>> {
163-
let msg = Set { key, value };
164-
match self.inner.request().await? {
165-
Request::Local(request) => {
166-
let (tx, rx) = oneshot::channel();
167-
request.send((msg, tx)).await?;
168-
Ok(rx)
169-
}
170-
Request::Remote(request) => {
171-
let (_tx, rx) = request.write(msg).await?;
172-
Ok(rx.into())
173-
}
174-
}
159+
pub async fn set(&self, key: String, value: String) -> Result<(), Error> {
160+
self.inner.rpc(Set { key, value }).await
161+
}
162+
163+
pub async fn set_many(
164+
&self,
165+
) -> Result<(spsc::Sender<(String, String)>, oneshot::Receiver<u64>), Error> {
166+
self.inner.client_streaming(SetMany, 4).await
175167
}
176168
}
177169

178-
async fn local() -> anyhow::Result<()> {
179-
let api = StorageActor::local();
180-
api.set("hello".to_string(), "world".to_string())
181-
.await?
182-
.await?;
183-
let value = api.get("hello".to_string()).await?.await?;
170+
async fn client_demo(api: StorageApi) -> Result<()> {
171+
api.set("hello".to_string(), "world".to_string()).await?;
172+
let value = api.get("hello".to_string()).await?;
173+
println!("get: hello = {:?}", value);
174+
175+
let (mut tx, rx) = api.set_many().await?;
176+
for i in 0..3 {
177+
tx.send((format!("key{i}"), format!("value{i}"))).await?;
178+
}
179+
drop(tx);
180+
let count = rx.await?;
181+
println!("set-many: {count} values set");
182+
184183
let mut list = api.list().await?;
185184
while let Some(value) = list.recv().await? {
186185
println!("list value = {:?}", value);
187186
}
188-
println!("value = {:?}", value);
189187
Ok(())
190188
}
191189

192-
async fn remote() -> anyhow::Result<()> {
190+
async fn local() -> Result<()> {
191+
let api = StorageActor::spawn();
192+
client_demo(api).await?;
193+
Ok(())
194+
}
195+
196+
async fn remote() -> Result<()> {
193197
let port = 10113;
194-
let (server, cert) =
195-
make_server_endpoint(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).into())?;
196-
let client =
198+
let addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into();
199+
200+
let (server_handle, cert) = {
201+
let (endpoint, cert) = make_server_endpoint(addr)?;
202+
let api = StorageActor::spawn();
203+
let handle = api.listen(endpoint)?;
204+
(handle, cert)
205+
};
206+
207+
let endpoint =
197208
make_client_endpoint(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(), &[&cert])?;
198-
let store = StorageActor::local();
199-
let handle = store.listen(server)?;
200-
let api = StorageApi::connect(client, SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())?;
201-
api.set("hello".to_string(), "world".to_string())
202-
.await?
203-
.await?;
204-
api.set("goodbye".to_string(), "world".to_string())
205-
.await?
206-
.await?;
207-
let value = api.get("hello".to_string()).await?.await?;
208-
println!("value = {:?}", value);
209-
let mut list = api.list().await?;
210-
while let Some(value) = list.recv().await? {
211-
println!("list value = {:?}", value);
212-
}
213-
drop(handle);
209+
let api = StorageApi::connect(endpoint, addr)?;
210+
client_demo(api).await?;
211+
212+
drop(server_handle);
214213
Ok(())
215214
}
216215

217216
#[tokio::main]
218-
async fn main() -> anyhow::Result<()> {
219-
tracing_subscriber::fmt().init();
217+
async fn main() -> Result<()> {
218+
tracing_subscriber::fmt::init();
220219
println!("Local use");
221220
local().await?;
222221
println!("Remote use");
223-
remote().await?;
222+
remote().await.unwrap();
224223
Ok(())
225224
}

0 commit comments

Comments
 (0)