Skip to content

WS pubsub example. #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http/examples/http_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate jsonrpc_http_server;

use jsonrpc_core::*;
use jsonrpc_core::futures::Future;
use jsonrpc_http_server::*;
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin};

fn main() {
let mut io = IoHandler::default();
Expand Down
2 changes: 1 addition & 1 deletion http/examples/http_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate jsonrpc_http_server;

use jsonrpc_core::{IoHandler, Value};
use jsonrpc_core::futures::{self, Future};
use jsonrpc_http_server::*;
use jsonrpc_http_server::{hyper, ServerBuilder, DomainsValidation, AccessControlAllowOrigin, Response};

fn main() {
let mut io = IoHandler::default();
Expand Down
2 changes: 1 addition & 1 deletion http/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate jsonrpc_core;
extern crate jsonrpc_http_server;

use jsonrpc_core::*;
use jsonrpc_http_server::*;
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin};

fn main() {
let mut io = IoHandler::default();
Expand Down
2 changes: 1 addition & 1 deletion minihttp/examples/http_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate jsonrpc_minihttp_server;

use jsonrpc_core::*;
use jsonrpc_core::futures::Future;
use jsonrpc_minihttp_server::*;
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation};

fn main() {
let mut io = IoHandler::default();
Expand Down
2 changes: 1 addition & 1 deletion minihttp/examples/http_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate jsonrpc_minihttp_server;

use jsonrpc_core::*;
use jsonrpc_core::futures::Future;
use jsonrpc_minihttp_server::*;
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation, Req};

#[derive(Clone, Default)]
struct Meta(usize);
Expand Down
2 changes: 1 addition & 1 deletion minihttp/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate jsonrpc_core;
extern crate jsonrpc_minihttp_server;

use jsonrpc_core::*;
use jsonrpc_minihttp_server::*;
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation};

fn main() {
let mut io = IoHandler::default();
Expand Down
1 change: 1 addition & 0 deletions pubsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ jsonrpc-core = { version = "7.0", path = "../core" }

[dev-dependencies]
jsonrpc-tcp-server = { version = "7.0", path = "../tcp" }
jsonrpc-ws-server = { version = "7.0", path = "../ws" }
11 changes: 9 additions & 2 deletions pubsub/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::{time, thread};
use std::sync::Arc;

use jsonrpc_core::*;
use jsonrpc_pubsub::*;
use jsonrpc_tcp_server::*;
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId};
use jsonrpc_tcp_server::{ServerBuilder, RequestContext};

use jsonrpc_core::futures::Future;

Expand All @@ -31,6 +31,13 @@ impl PubSubMetadata for Meta {
}
}

/// To test the server:
///
/// ```bash
/// $ netcat localhost 3030 -
/// {"id":1,"jsonrpc":"2.0","method":"hello_subscribe","params":[10]}
///
/// ```
fn main() {
let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_method("say_hello", |_params: Params| {
Expand Down
107 changes: 107 additions & 0 deletions pubsub/examples/pubsub_ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
extern crate jsonrpc_core;
extern crate jsonrpc_pubsub;
extern crate jsonrpc_ws_server;

use std::{time, thread};
use std::sync::Arc;

use jsonrpc_core::*;
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::{ServerBuilder, RequestContext};

use jsonrpc_core::futures::Future;

#[derive(Clone)]
struct Meta {
session: Option<Arc<Session>>,
}

impl Default for Meta {
fn default() -> Self {
Meta {
session: None,
}
}
}

impl Metadata for Meta {}
impl PubSubMetadata for Meta {
fn session(&self) -> Option<Arc<Session>> {
self.session.clone()
}
}

/// Use following node.js code to test:
///
/// ```js
/// const WebSocket = require('websocket').w3cwebsocket;
///
/// const ws = new WebSocket('ws://localhost:3030');
/// ws.addEventListener('open', () => {
/// console.log('Sending request');
///
/// ws.send(JSON.stringify({
/// jsonrpc: "2.0",
/// id: 1,
/// method: "subscribe_hello",
/// params: [],
/// }));
/// });
///
/// ws.addEventListener('message', (message) => {
/// console.log('Received: ', message.data);
/// });
///
/// console.log('Starting');
/// ```
fn main() {
let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_method("say_hello", |_params: Params| {
Ok(Value::String("hello".to_string()))
});

io.add_subscription(
"hello",
("subscribe_hello", |params: Params, _, subscriber: Subscriber| {
if params != Params::None {
subscriber.reject(Error {
code: ErrorCode::ParseError,
message: "Invalid parameters. Subscription rejected.".into(),
data: None,
}).unwrap();
return;
}

let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
// or subscriber.reject(Error {} );
// or drop(subscriber)
thread::spawn(move || {
loop {
thread::sleep(time::Duration::from_millis(1000));
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {
Ok(_) => {},
Err(_) => {
println!("Subscription has ended, finishing.");
break;
}
}
}
});
}),
("remove_hello", |_id: SubscriptionId| -> futures::BoxFuture<Value, Error> {
println!("Closing subscription");
futures::future::ok(Value::Bool(true)).boxed()
}),
);

let server = ServerBuilder::new(io)
.session_meta_extractor(|context: &RequestContext| {
Meta {
session: Some(Arc::new(Session::new(context.sender()))),
}
})
.start(&"127.0.0.1:3030".parse().unwrap())
.expect("Unable to start RPC server");

let _ = server.wait();
}
1 change: 0 additions & 1 deletion pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod handler;
mod subscription;
mod types;


pub use self::handler::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
pub use self::subscription::{Session, Sink, Subscriber, new_subscription};
pub use self::types::{PubSubMetadata, SubscriptionId, TransportError, SinkResult};
2 changes: 1 addition & 1 deletion pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Drop for Session {
#[derive(Debug, Clone)]
pub struct Sink {
notification: String,
transport: TransportSender
transport: TransportSender,
}

impl Sink {
Expand Down
2 changes: 1 addition & 1 deletion tcp/examples/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate jsonrpc_core;
extern crate jsonrpc_tcp_server;

use jsonrpc_core::*;
use jsonrpc_tcp_server::*;
use jsonrpc_tcp_server::ServerBuilder;

fn main() {
let mut io = IoHandler::default();
Expand Down
2 changes: 1 addition & 1 deletion ws/examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate jsonrpc_core;
extern crate jsonrpc_ws_server;

use jsonrpc_core::*;
use jsonrpc_ws_server::*;
use jsonrpc_ws_server::ServerBuilder;

fn main() {
let mut io = IoHandler::default();
Expand Down
1 change: 0 additions & 1 deletion ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ pub use self::server_builder::{ServerBuilder, Error};
pub use self::server_utils::cors::Origin;
pub use self::server_utils::hosts::{Host, DomainsValidation};
pub use self::server_utils::tokio_core;

107 changes: 105 additions & 2 deletions ws/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
use std::fmt;
use std::sync::{atomic, Arc};

use core;
use core::{self, futures};
use core::futures::sync::mpsc;
use server_utils::tokio_core::reactor::Remote;
use ws;

use session;
use Origin;

/// Output of WebSocket connection. Use this to send messages to the other endpoint.
#[derive(Clone)]
pub struct Sender {
out: ws::Sender,
active: Arc<atomic::AtomicBool>,
}

impl Sender {
/// Creates a new `Sender`.
pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
Sender {
out: out,
active: active,
}
}

fn check_active(&self) -> ws::Result<()> {
if self.active.load(atomic::Ordering::SeqCst) {
Ok(())
} else {
Err(ws::Error::new(ws::ErrorKind::Internal, "Attempting to send a message to closed connection."))
}
}

/// Sends a message over the connection.
/// Will return error if the connection is not active any more.
pub fn send<M>(&self, msg: M) -> ws::Result<()> where
M: Into<ws::Message>
{
self.check_active()?;
self.out.send(msg)
}

/// Sends a message over the endpoints of all connections.
/// Will return error if the connection is not active any more.
pub fn broadcast<M>(&self, msg: M) -> ws::Result<()> where
M: Into<ws::Message>
{
self.check_active()?;
self.out.broadcast(msg)
}

/// Sends a close code to the other endpoint.
/// Will return error if the connection is not active any more.
pub fn close(&self, code: ws::CloseCode) -> ws::Result<()> {
self.check_active()?;
self.out.close(code)
}
}

/// Request context
pub struct RequestContext {
/// Session id
Expand All @@ -15,7 +68,20 @@ pub struct RequestContext {
/// Requested protocols
pub protocols: Vec<String>,
/// Direct channel to send messages to a client.
pub out: ws::Sender,
pub out: Sender,
/// Remote to underlying event loop.
pub remote: Remote,
}

impl RequestContext {
/// Get this session as a `Sink` spawning a new future
/// in the underlying event loop.
pub fn sender(&self) -> mpsc::Sender<String> {
let out = self.out.clone();
let (sender, receiver) = mpsc::channel(1);
self.remote.spawn(move |_| SenderFuture(out, receiver));
sender
}
}

impl fmt::Debug for RequestContext {
Expand All @@ -36,7 +102,44 @@ pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
}
}

impl<M, F> MetaExtractor<M> for F where
M: core::Metadata,
F: Fn(&RequestContext) -> M + Send + Sync + 'static,
{
fn extract(&self, context: &RequestContext) -> M {
(*self)(context)
}
}

/// Dummy metadata extractor
#[derive(Clone)]
pub struct NoopExtractor;
impl<M: core::Metadata> MetaExtractor<M> for NoopExtractor {}

struct SenderFuture(Sender, mpsc::Receiver<String>);
impl futures::Future for SenderFuture {
type Item = ();
type Error = ();

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
use self::futures::Stream;

loop {
let item = self.1.poll()?;
match item {
futures::Async::NotReady => {
return Ok(futures::Async::NotReady);
},
futures::Async::Ready(None) => {
return Ok(futures::Async::Ready(()));
},
futures::Async::Ready(Some(val)) => {
if let Err(e) = self.0.send(val) {
warn!("Error sending a subscription update: {:?}", e);
return Ok(futures::Async::Ready(()));
}
},
}
}
}
}
Loading