@@ -51,6 +51,9 @@ use std::time::Duration;
5151
5252static ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 0 ) ;
5353
54+ const CONNECT_OUTBOUND_TIMEOUT : u64 = 10 ;
55+ const SOCKS5_CONNECT_OUTBOUND_TIMEOUT : u64 = 30 ;
56+
5457// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
5558// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
5659// define a trivial two- and three- select macro with the specific types we need and just use that.
@@ -462,13 +465,118 @@ where
462465 PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
463466{
464467 let connect_fut = async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ;
465- if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , connect_fut) . await {
468+ if let Ok ( Ok ( stream) ) =
469+ time:: timeout ( Duration :: from_secs ( CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
470+ {
471+ Some ( setup_outbound ( peer_manager, their_node_id, stream) )
472+ } else {
473+ None
474+ }
475+ }
476+
477+ /// Same as [`connect_outbound`], using a SOCKS5 proxy
478+ pub async fn socks5_connect_outbound < PM : Deref + ' static + Send + Sync + Clone > (
479+ peer_manager : PM , their_node_id : PublicKey , socks5_proxy_addr : SocketAddr , addr : SocketAddress ,
480+ ) -> Option < impl std:: future:: Future < Output = ( ) > >
481+ where
482+ PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
483+ {
484+ let connect_fut =
485+ async { socks5_connect ( socks5_proxy_addr, addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ;
486+ if let Ok ( Ok ( stream) ) =
487+ time:: timeout ( Duration :: from_secs ( SOCKS5_CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
488+ {
466489 Some ( setup_outbound ( peer_manager, their_node_id, stream) )
467490 } else {
468491 None
469492 }
470493}
471494
495+ async fn socks5_connect (
496+ socks5_proxy_addr : SocketAddr , addr : SocketAddress ,
497+ ) -> Result < TcpStream , ( ) > {
498+ use tokio:: io:: AsyncReadExt ;
499+ use tokio:: io:: AsyncWriteExt ;
500+
501+ // Constants defined in RFC 1928
502+ const VERSION : u8 = 5 ;
503+ const NMETHODS : u8 = 1 ;
504+ const NO_AUTH : u8 = 0 ;
505+ const METHOD_SELECT_RES_LEN : usize = 2 ;
506+ const CMD_CONNECT : u8 = 1 ;
507+ const RSV : u8 = 0 ;
508+ const ATYP_IPV4 : u8 = 1 ;
509+ const ATYP_DOMAINNAME : u8 = 3 ;
510+ const ATYP_IPV6 : u8 = 4 ;
511+ const SUCCEEDED : u8 = 0 ;
512+
513+ // The size of a socks5 request for an onion V3 address
514+ const SOCKS5_REQUEST_CAPACITY : usize = 69 ;
515+ // The minimum size of a socks5 response, also the size of the response from a Tor proxy
516+ const SOCKS5_REPLY_MIN_LEN : usize = 10 ;
517+
518+ if let SocketAddress :: OnionV2 { .. } = addr {
519+ return Err ( ( ) ) ;
520+ }
521+
522+ let method_selection_message = [ VERSION , NMETHODS , NO_AUTH ] ;
523+ let mut tcp_stream = TcpStream :: connect ( & socks5_proxy_addr) . await . map_err ( |_| ( ) ) ?;
524+ tcp_stream. write_all ( & method_selection_message) . await . map_err ( |_| ( ) ) ?;
525+
526+ let mut method_selection_response = [ 0u8 ; METHOD_SELECT_RES_LEN ] ;
527+ let n_read = tcp_stream. read_exact ( & mut method_selection_response) . await . map_err ( |_| ( ) ) ?;
528+ if n_read != METHOD_SELECT_RES_LEN || method_selection_response != [ VERSION , NO_AUTH ] {
529+ return Err ( ( ) ) ;
530+ }
531+
532+ let mut socks5_request: Vec < u8 > = Vec :: with_capacity ( SOCKS5_REQUEST_CAPACITY ) ;
533+
534+ socks5_request. push ( VERSION ) ;
535+ socks5_request. push ( CMD_CONNECT ) ;
536+ socks5_request. push ( RSV ) ;
537+
538+ socks5_request. push ( match addr {
539+ SocketAddress :: TcpIpV4 { .. } => ATYP_IPV4 ,
540+ SocketAddress :: TcpIpV6 { .. } => ATYP_IPV6 ,
541+ SocketAddress :: OnionV3 { .. } | SocketAddress :: Hostname { .. } => ATYP_DOMAINNAME ,
542+ SocketAddress :: OnionV2 { .. } => unreachable ! ( ) ,
543+ } ) ;
544+
545+ match addr {
546+ SocketAddress :: TcpIpV4 { addr, port } => {
547+ socks5_request. extend_from_slice ( & addr) ;
548+ socks5_request. extend_from_slice ( & port. to_be_bytes ( ) ) ;
549+ } ,
550+ SocketAddress :: TcpIpV6 { addr, port } => {
551+ socks5_request. extend_from_slice ( & addr) ;
552+ socks5_request. extend_from_slice ( & port. to_be_bytes ( ) ) ;
553+ } ,
554+ onion_v3 @ SocketAddress :: OnionV3 { port, .. } => {
555+ let onion_v3_url = onion_v3. to_string ( ) ;
556+ let hostname = onion_v3_url. split_once ( ':' ) . ok_or ( ( ) ) ?. 0 . as_bytes ( ) ;
557+ socks5_request. extend_from_slice ( & [ hostname. len ( ) as u8 ] ) ;
558+ socks5_request. extend_from_slice ( hostname) ;
559+ socks5_request. extend_from_slice ( & port. to_be_bytes ( ) ) ;
560+ } ,
561+ SocketAddress :: Hostname { hostname, port } => {
562+ socks5_request. extend_from_slice ( & [ hostname. len ( ) as u8 ] ) ;
563+ socks5_request. extend_from_slice ( hostname. as_bytes ( ) ) ;
564+ socks5_request. extend_from_slice ( & port. to_be_bytes ( ) ) ;
565+ } ,
566+ SocketAddress :: OnionV2 { .. } => unreachable ! ( ) ,
567+ } ;
568+
569+ tcp_stream. write_all ( & socks5_request) . await . map_err ( |_| ( ) ) ?;
570+
571+ let mut buffer: Vec < u8 > = Vec :: with_capacity ( SOCKS5_REPLY_MIN_LEN ) ;
572+ let n_read = tcp_stream. read_to_end ( & mut buffer) . await . map_err ( |_| ( ) ) ?;
573+ if n_read < SOCKS5_REPLY_MIN_LEN || buffer[ ..3 ] != [ VERSION , SUCCEEDED , RSV ] {
574+ return Err ( ( ) ) ;
575+ }
576+
577+ Ok ( tcp_stream)
578+ }
579+
472580const SOCK_WAKER_VTABLE : task:: RawWakerVTable = task:: RawWakerVTable :: new (
473581 clone_socket_waker,
474582 wake_socket_waker,
0 commit comments