diff --git a/http/examples/http_async.rs b/http/examples/http_async.rs index 1711bdc8f..115ef0123 100644 --- a/http/examples/http_async.rs +++ b/http/examples/http_async.rs @@ -4,7 +4,7 @@ extern crate jsonrpc_http_server; use jsonrpc_core::*; use jsonrpc_core::futures::Future; -use jsonrpc_http_server::*; +use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin}; fn main() { let mut io = IoHandler::default(); diff --git a/http/examples/http_middleware.rs b/http/examples/http_middleware.rs index 57b7db24c..4a4054327 100644 --- a/http/examples/http_middleware.rs +++ b/http/examples/http_middleware.rs @@ -4,7 +4,7 @@ extern crate jsonrpc_http_server; use jsonrpc_core::{IoHandler, Value}; use jsonrpc_core::futures::{self, Future}; -use jsonrpc_http_server::*; +use jsonrpc_http_server::{hyper, ServerBuilder, DomainsValidation, AccessControlAllowOrigin, Response}; fn main() { let mut io = IoHandler::default(); diff --git a/http/examples/server.rs b/http/examples/server.rs index 6c5c3318a..97e548ed1 100644 --- a/http/examples/server.rs +++ b/http/examples/server.rs @@ -2,7 +2,7 @@ extern crate jsonrpc_core; extern crate jsonrpc_http_server; use jsonrpc_core::*; -use jsonrpc_http_server::*; +use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin}; fn main() { let mut io = IoHandler::default(); diff --git a/minihttp/examples/http_async.rs b/minihttp/examples/http_async.rs index 354d3f44f..b64504994 100644 --- a/minihttp/examples/http_async.rs +++ b/minihttp/examples/http_async.rs @@ -4,7 +4,7 @@ extern crate jsonrpc_minihttp_server; use jsonrpc_core::*; use jsonrpc_core::futures::Future; -use jsonrpc_minihttp_server::*; +use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation}; fn main() { let mut io = IoHandler::default(); diff --git a/minihttp/examples/http_meta.rs b/minihttp/examples/http_meta.rs index be59b36ff..eecb081f5 100644 --- a/minihttp/examples/http_meta.rs +++ b/minihttp/examples/http_meta.rs @@ -3,7 +3,7 @@ extern crate jsonrpc_minihttp_server; use jsonrpc_core::*; use jsonrpc_core::futures::Future; -use jsonrpc_minihttp_server::*; +use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation, Req}; #[derive(Clone, Default)] struct Meta(usize); diff --git a/minihttp/examples/server.rs b/minihttp/examples/server.rs index 22edeb755..f06b15c87 100644 --- a/minihttp/examples/server.rs +++ b/minihttp/examples/server.rs @@ -2,7 +2,7 @@ extern crate jsonrpc_core; extern crate jsonrpc_minihttp_server; use jsonrpc_core::*; -use jsonrpc_minihttp_server::*; +use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation}; fn main() { let mut io = IoHandler::default(); diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index e19955ba8..607685706 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -16,3 +16,4 @@ jsonrpc-core = { version = "7.0", path = "../core" } [dev-dependencies] jsonrpc-tcp-server = { version = "7.0", path = "../tcp" } +jsonrpc-ws-server = { version = "7.0", path = "../ws" } diff --git a/pubsub/examples/pubsub.rs b/pubsub/examples/pubsub.rs index 1353be8b4..6ae59a587 100644 --- a/pubsub/examples/pubsub.rs +++ b/pubsub/examples/pubsub.rs @@ -6,8 +6,8 @@ use std::{time, thread}; use std::sync::Arc; use jsonrpc_core::*; -use jsonrpc_pubsub::*; -use jsonrpc_tcp_server::*; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId}; +use jsonrpc_tcp_server::{ServerBuilder, RequestContext}; use jsonrpc_core::futures::Future; @@ -31,6 +31,13 @@ impl PubSubMetadata for Meta { } } +/// To test the server: +/// +/// ```bash +/// $ netcat localhost 3030 - +/// {"id":1,"jsonrpc":"2.0","method":"hello_subscribe","params":[10]} +/// +/// ``` fn main() { let mut io = PubSubHandler::new(MetaIoHandler::default()); io.add_method("say_hello", |_params: Params| { diff --git a/pubsub/examples/pubsub_ws.rs b/pubsub/examples/pubsub_ws.rs new file mode 100644 index 000000000..5ad6e6736 --- /dev/null +++ b/pubsub/examples/pubsub_ws.rs @@ -0,0 +1,107 @@ +extern crate jsonrpc_core; +extern crate jsonrpc_pubsub; +extern crate jsonrpc_ws_server; + +use std::{time, thread}; +use std::sync::Arc; + +use jsonrpc_core::*; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId}; +use jsonrpc_ws_server::{ServerBuilder, RequestContext}; + +use jsonrpc_core::futures::Future; + +#[derive(Clone)] +struct Meta { + session: Option>, +} + +impl Default for Meta { + fn default() -> Self { + Meta { + session: None, + } + } +} + +impl Metadata for Meta {} +impl PubSubMetadata for Meta { + fn session(&self) -> Option> { + self.session.clone() + } +} + +/// Use following node.js code to test: +/// +/// ```js +/// const WebSocket = require('websocket').w3cwebsocket; +/// +/// const ws = new WebSocket('ws://localhost:3030'); +/// ws.addEventListener('open', () => { +/// console.log('Sending request'); +/// +/// ws.send(JSON.stringify({ +/// jsonrpc: "2.0", +/// id: 1, +/// method: "subscribe_hello", +/// params: [], +/// })); +/// }); +/// +/// ws.addEventListener('message', (message) => { +/// console.log('Received: ', message.data); +/// }); +/// +/// console.log('Starting'); +/// ``` +fn main() { + let mut io = PubSubHandler::new(MetaIoHandler::default()); + io.add_method("say_hello", |_params: Params| { + Ok(Value::String("hello".to_string())) + }); + + io.add_subscription( + "hello", + ("subscribe_hello", |params: Params, _, subscriber: Subscriber| { + if params != Params::None { + subscriber.reject(Error { + code: ErrorCode::ParseError, + message: "Invalid parameters. Subscription rejected.".into(), + data: None, + }).unwrap(); + return; + } + + let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap(); + // or subscriber.reject(Error {} ); + // or drop(subscriber) + thread::spawn(move || { + loop { + thread::sleep(time::Duration::from_millis(1000)); + match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() { + Ok(_) => {}, + Err(_) => { + println!("Subscription has ended, finishing."); + break; + } + } + } + }); + }), + ("remove_hello", |_id: SubscriptionId| -> futures::BoxFuture { + println!("Closing subscription"); + futures::future::ok(Value::Bool(true)).boxed() + }), + ); + + let server = ServerBuilder::new(io) + .session_meta_extractor(|context: &RequestContext| { + Meta { + session: Some(Arc::new(Session::new(context.sender()))), + } + }) + .start(&"127.0.0.1:3030".parse().unwrap()) + .expect("Unable to start RPC server"); + + let _ = server.wait(); +} diff --git a/pubsub/src/lib.rs b/pubsub/src/lib.rs index ab9f297fb..9e773df76 100644 --- a/pubsub/src/lib.rs +++ b/pubsub/src/lib.rs @@ -12,7 +12,6 @@ mod handler; mod subscription; mod types; - pub use self::handler::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod}; pub use self::subscription::{Session, Sink, Subscriber, new_subscription}; pub use self::types::{PubSubMetadata, SubscriptionId, TransportError, SinkResult}; diff --git a/pubsub/src/subscription.rs b/pubsub/src/subscription.rs index 76590da53..fad2a800a 100644 --- a/pubsub/src/subscription.rs +++ b/pubsub/src/subscription.rs @@ -83,7 +83,7 @@ impl Drop for Session { #[derive(Debug, Clone)] pub struct Sink { notification: String, - transport: TransportSender + transport: TransportSender, } impl Sink { diff --git a/tcp/examples/tcp.rs b/tcp/examples/tcp.rs index 2a761066e..7498cbf68 100644 --- a/tcp/examples/tcp.rs +++ b/tcp/examples/tcp.rs @@ -2,7 +2,7 @@ extern crate jsonrpc_core; extern crate jsonrpc_tcp_server; use jsonrpc_core::*; -use jsonrpc_tcp_server::*; +use jsonrpc_tcp_server::ServerBuilder; fn main() { let mut io = IoHandler::default(); diff --git a/ws/examples/ws.rs b/ws/examples/ws.rs index 9f513f9e2..ad23d74c0 100644 --- a/ws/examples/ws.rs +++ b/ws/examples/ws.rs @@ -2,7 +2,7 @@ extern crate jsonrpc_core; extern crate jsonrpc_ws_server; use jsonrpc_core::*; -use jsonrpc_ws_server::*; +use jsonrpc_ws_server::ServerBuilder; fn main() { let mut io = IoHandler::default(); diff --git a/ws/src/lib.rs b/ws/src/lib.rs index 0deceb635..d049c2412 100644 --- a/ws/src/lib.rs +++ b/ws/src/lib.rs @@ -22,4 +22,3 @@ pub use self::server_builder::{ServerBuilder, Error}; pub use self::server_utils::cors::Origin; pub use self::server_utils::hosts::{Host, DomainsValidation}; pub use self::server_utils::tokio_core; - diff --git a/ws/src/metadata.rs b/ws/src/metadata.rs index 983732974..2b71150be 100644 --- a/ws/src/metadata.rs +++ b/ws/src/metadata.rs @@ -1,11 +1,64 @@ use std::fmt; +use std::sync::{atomic, Arc}; -use core; +use core::{self, futures}; +use core::futures::sync::mpsc; +use server_utils::tokio_core::reactor::Remote; use ws; use session; use Origin; +/// Output of WebSocket connection. Use this to send messages to the other endpoint. +#[derive(Clone)] +pub struct Sender { + out: ws::Sender, + active: Arc, +} + +impl Sender { + /// Creates a new `Sender`. + pub fn new(out: ws::Sender, active: Arc) -> Self { + Sender { + out: out, + active: active, + } + } + + fn check_active(&self) -> ws::Result<()> { + if self.active.load(atomic::Ordering::SeqCst) { + Ok(()) + } else { + Err(ws::Error::new(ws::ErrorKind::Internal, "Attempting to send a message to closed connection.")) + } + } + + /// Sends a message over the connection. + /// Will return error if the connection is not active any more. + pub fn send(&self, msg: M) -> ws::Result<()> where + M: Into + { + self.check_active()?; + self.out.send(msg) + } + + /// Sends a message over the endpoints of all connections. + /// Will return error if the connection is not active any more. + pub fn broadcast(&self, msg: M) -> ws::Result<()> where + M: Into + { + self.check_active()?; + self.out.broadcast(msg) + } + + /// Sends a close code to the other endpoint. + /// Will return error if the connection is not active any more. + pub fn close(&self, code: ws::CloseCode) -> ws::Result<()> { + self.check_active()?; + self.out.close(code) + } +} + /// Request context pub struct RequestContext { /// Session id @@ -15,7 +68,20 @@ pub struct RequestContext { /// Requested protocols pub protocols: Vec, /// Direct channel to send messages to a client. - pub out: ws::Sender, + pub out: Sender, + /// Remote to underlying event loop. + pub remote: Remote, +} + +impl RequestContext { + /// Get this session as a `Sink` spawning a new future + /// in the underlying event loop. + pub fn sender(&self) -> mpsc::Sender { + let out = self.out.clone(); + let (sender, receiver) = mpsc::channel(1); + self.remote.spawn(move |_| SenderFuture(out, receiver)); + sender + } } impl fmt::Debug for RequestContext { @@ -36,7 +102,44 @@ pub trait MetaExtractor: Send + Sync + 'static { } } +impl MetaExtractor for F where + M: core::Metadata, + F: Fn(&RequestContext) -> M + Send + Sync + 'static, +{ + fn extract(&self, context: &RequestContext) -> M { + (*self)(context) + } +} + /// Dummy metadata extractor #[derive(Clone)] pub struct NoopExtractor; impl MetaExtractor for NoopExtractor {} + +struct SenderFuture(Sender, mpsc::Receiver); +impl futures::Future for SenderFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + use self::futures::Stream; + + loop { + let item = self.1.poll()?; + match item { + futures::Async::NotReady => { + return Ok(futures::Async::NotReady); + }, + futures::Async::Ready(None) => { + return Ok(futures::Async::Ready(())); + }, + futures::Async::Ready(Some(val)) => { + if let Err(e) = self.0.send(val) { + warn!("Error sending a subscription update: {:?}", e); + return Ok(futures::Async::Ready(())); + } + }, + } + } + } +} diff --git a/ws/src/session.rs b/ws/src/session.rs index 8bf6cdc3f..3d220b43a 100644 --- a/ws/src/session.rs +++ b/ws/src/session.rs @@ -1,6 +1,6 @@ use std; use std::ascii::AsciiExt; -use std::sync::Arc; +use std::sync::{atomic, Arc}; use core; use core::futures::Future; @@ -83,6 +83,7 @@ impl From> for MiddlewareAction { } pub struct Session> { + active: Arc, context: metadata::RequestContext, handler: Arc>, meta_extractor: Arc>, @@ -96,6 +97,7 @@ pub struct Session> { impl> Drop for Session { fn drop(&mut self) { + self.active.store(false, atomic::Ordering::SeqCst); self.stats.as_ref().map(|stats| stats.close_session(self.context.session_id)); } } @@ -107,11 +109,11 @@ impl> Session { fn verify_origin(&self, origin: Option<&[u8]>) -> Option { if !header_is_allowed(&self.allowed_origins, origin) { - warn!(target: "signer", "Blocked connection to Signer API from untrusted origin: {:?}", origin); - Some(forbidden( - "URL Blocked", - "Connection Origin has been rejected.", - )) + warn!( + "Blocked connection to WebSockets server from untrusted origin: {:?}", + origin.and_then(|s| std::str::from_utf8(s).ok()), + ); + Some(forbidden("URL Blocked", "Connection Origin has been rejected.")) } else { None } @@ -120,11 +122,11 @@ impl> Session { fn verify_host(&self, req: &ws::Request) -> Option { let host = req.header("host").map(|x| &x[..]); if !header_is_allowed(&self.allowed_hosts, host) { - warn!(target: "signer", "Blocked connection to Signer API with untrusted host: {:?}", host); - Some(forbidden( - "URL Blocked", - "Connection Host has been rejected.", - )) + warn!( + "Blocked connection to WebSockets server with untrusted host: {:?}", + host.and_then(|s| std::str::from_utf8(s).ok()), + ); + Some(forbidden("URL Blocked", "Connection Host has been rejected.")) } else { None } @@ -182,7 +184,7 @@ impl> ws::Handler for Session { if let Some(result) = response { let res = out.send(result); if let Err(e) = res { - warn!(target: "signer", "Error while sending response: {:?}", e); + warn!("Error while sending response: {:?}", e); } } }); @@ -232,13 +234,16 @@ impl> ws::Factory for Factory { fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { self.session_id += 1; self.stats.as_ref().map(|stats| stats.open_session(self.session_id)); + let active = Arc::new(atomic::AtomicBool::new(true)); Session { + active: active.clone(), context: metadata::RequestContext { session_id: self.session_id, origin: None, protocols: Vec::new(), - out: sender, + out: metadata::Sender::new(sender, active), + remote: self.remote.clone(), }, handler: self.handler.clone(), meta_extractor: self.meta_extractor.clone(),