@@ -51,6 +51,9 @@ use std::time::Duration;
5151
5252static ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 0 ) ;
5353
54+ const CONNECT_OUTBOUND_TIMEOUT : u64 = 10 ;
55+ const SOCKS5_CONNECT_OUTBOUND_TIMEOUT : u64 = 30 ;
56+
5457// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
5558// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
5659// define a trivial two- and three- select macro with the specific types we need and just use that.
@@ -462,13 +465,156 @@ where
462465 PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
463466{
464467 let connect_fut = async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ;
465- if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , connect_fut) . await {
468+ if let Ok ( Ok ( stream) ) =
469+ time:: timeout ( Duration :: from_secs ( CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
470+ {
466471 Some ( setup_outbound ( peer_manager, their_node_id, stream) )
467472 } else {
468473 None
469474 }
470475}
471476
477+ /// Same as [`connect_outbound`], using a SOCKS5 proxy
478+ pub async fn socks5_connect_outbound < PM : Deref + ' static + Send + Sync + Clone > (
479+ peer_manager : PM , their_node_id : PublicKey , socks5_proxy_addr : SocketAddr , addr : SocketAddress ,
480+ user_pass : Option < ( & str , & str ) > ,
481+ ) -> Option < impl std:: future:: Future < Output = ( ) > >
482+ where
483+ PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
484+ {
485+ let connect_fut = async {
486+ socks5_connect ( socks5_proxy_addr, addr, user_pass) . await . map ( |s| s. into_std ( ) . unwrap ( ) )
487+ } ;
488+ if let Ok ( Ok ( stream) ) =
489+ time:: timeout ( Duration :: from_secs ( SOCKS5_CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
490+ {
491+ Some ( setup_outbound ( peer_manager, their_node_id, stream) )
492+ } else {
493+ None
494+ }
495+ }
496+
497+ async fn socks5_connect (
498+ socks5_proxy_addr : SocketAddr , addr : SocketAddress , user_pass : Option < ( & str , & str ) > ,
499+ ) -> Result < TcpStream , ( ) > {
500+ use std:: io:: { Cursor , Write } ;
501+ use tokio:: io:: AsyncReadExt ;
502+
503+ const IPV4_ADDR_LEN : usize = 4 ;
504+ const IPV6_ADDR_LEN : usize = 16 ;
505+ const HOSTNAME_MAX_LEN : usize = 255 ;
506+
507+ // Constants defined in RFC 1928 and RFC 1929
508+ const VERSION : u8 = 5 ;
509+ const NMETHODS : u8 = 1 ;
510+ const NO_AUTH : u8 = 0 ;
511+ const USERNAME_PASSWORD_AUTH : u8 = 2 ;
512+ const METHOD_SELECT_REPLY_LEN : usize = 2 ;
513+ const USERNAME_PASSWORD_VERSION : u8 = 1 ;
514+ const USERNAME_PASSWORD_REPLY_LEN : usize = 2 ;
515+ const CMD_CONNECT : u8 = 1 ;
516+ const RSV : u8 = 0 ;
517+ const ATYP_IPV4 : u8 = 1 ;
518+ const ATYP_DOMAINNAME : u8 = 3 ;
519+ const ATYP_IPV6 : u8 = 4 ;
520+ const SUCCESS : u8 = 0 ;
521+ const USERNAME_MAX_LEN : usize = 255 ;
522+ const PASSWORD_MAX_LEN : usize = 255 ;
523+
524+ const USERNAME_PASSWORD_REQUEST_MAX_LEN : usize = 1 /* VER */ + 1 /* ULEN */ + USERNAME_MAX_LEN /* UNAME max len */
525+ + 1 /* PLEN */ + PASSWORD_MAX_LEN /* PASSWD max len */ ;
526+ const SOCKS5_REQUEST_MAX_LEN : usize = 1 /* VER */ + 1 /* CMD */ + 1 /* RSV */ + 1 /* ATYP */
527+ + 1 /* HOSTNAME len */ + HOSTNAME_MAX_LEN /* HOSTNAME */ + 2 /* PORT */ ;
528+
529+ let selected_auth = if user_pass. is_some ( ) { USERNAME_PASSWORD_AUTH } else { NO_AUTH } ;
530+ let method_selection_request = [ VERSION , NMETHODS , selected_auth] ;
531+ let mut tcp_stream = TcpStream :: connect ( & socks5_proxy_addr) . await . map_err ( |_| ( ) ) ?;
532+ tokio:: io:: AsyncWriteExt :: write_all ( & mut tcp_stream, & method_selection_request)
533+ . await
534+ . map_err ( |_| ( ) ) ?;
535+
536+ let mut method_selection_reply = [ 0u8 ; METHOD_SELECT_REPLY_LEN ] ;
537+ tcp_stream. read_exact ( & mut method_selection_reply) . await . map_err ( |_| ( ) ) ?;
538+ if method_selection_reply != [ VERSION , selected_auth] {
539+ return Err ( ( ) ) ;
540+ }
541+
542+ if let Some ( ( username, password) ) = user_pass {
543+ if username. len ( ) > USERNAME_MAX_LEN || password. len ( ) > PASSWORD_MAX_LEN {
544+ return Err ( ( ) ) ;
545+ }
546+
547+ let mut username_password_request = [ 0u8 ; USERNAME_PASSWORD_REQUEST_MAX_LEN ] ;
548+ let mut writer = Cursor :: new ( & mut username_password_request[ ..] ) ;
549+ writer. write_all ( & [ USERNAME_PASSWORD_VERSION , username. len ( ) as u8 ] ) . map_err ( |_| ( ) ) ?;
550+ writer. write_all ( username. as_bytes ( ) ) . map_err ( |_| ( ) ) ?;
551+ writer. write_all ( & [ password. len ( ) as u8 ] ) . map_err ( |_| ( ) ) ?;
552+ writer. write_all ( password. as_bytes ( ) ) . map_err ( |_| ( ) ) ?;
553+ let pos = writer. position ( ) as usize ;
554+ tokio:: io:: AsyncWriteExt :: write_all ( & mut tcp_stream, & username_password_request[ ..pos] )
555+ . await
556+ . map_err ( |_| ( ) ) ?;
557+
558+ let mut username_password_reply = [ 0u8 ; USERNAME_PASSWORD_REPLY_LEN ] ;
559+ tcp_stream. read_exact ( & mut username_password_reply) . await . map_err ( |_| ( ) ) ?;
560+ if username_password_reply != [ USERNAME_PASSWORD_VERSION , SUCCESS ] {
561+ return Err ( ( ) ) ;
562+ }
563+ }
564+
565+ let mut socks5_request = [ 0u8 ; SOCKS5_REQUEST_MAX_LEN ] ;
566+ let mut writer = Cursor :: new ( & mut socks5_request[ ..] ) ;
567+ writer. write_all ( & [ VERSION , CMD_CONNECT , RSV ] ) . map_err ( |_| ( ) ) ?;
568+ match addr {
569+ SocketAddress :: TcpIpV4 { addr, port } => {
570+ writer. write_all ( & [ ATYP_IPV4 ] ) . map_err ( |_| ( ) ) ?;
571+ writer. write_all ( & addr) . map_err ( |_| ( ) ) ?;
572+ writer. write_all ( & port. to_be_bytes ( ) ) . map_err ( |_| ( ) ) ?;
573+ } ,
574+ SocketAddress :: TcpIpV6 { addr, port } => {
575+ writer. write_all ( & [ ATYP_IPV6 ] ) . map_err ( |_| ( ) ) ?;
576+ writer. write_all ( & addr) . map_err ( |_| ( ) ) ?;
577+ writer. write_all ( & port. to_be_bytes ( ) ) . map_err ( |_| ( ) ) ?;
578+ } ,
579+ ref onion_v3 @ SocketAddress :: OnionV3 { port, .. } => {
580+ let onion_v3_url = onion_v3. to_string ( ) ;
581+ let hostname = onion_v3_url. split_once ( ':' ) . ok_or ( ( ) ) ?. 0 . as_bytes ( ) ;
582+ writer. write_all ( & [ ATYP_DOMAINNAME , hostname. len ( ) as u8 ] ) . map_err ( |_| ( ) ) ?;
583+ writer. write_all ( hostname) . map_err ( |_| ( ) ) ?;
584+ writer. write_all ( & port. to_be_bytes ( ) ) . map_err ( |_| ( ) ) ?;
585+ } ,
586+ SocketAddress :: Hostname { hostname, port } => {
587+ writer. write_all ( & [ ATYP_DOMAINNAME , hostname. len ( ) ] ) . map_err ( |_| ( ) ) ?;
588+ writer. write_all ( hostname. as_bytes ( ) ) . map_err ( |_| ( ) ) ?;
589+ writer. write_all ( & port. to_be_bytes ( ) ) . map_err ( |_| ( ) ) ?;
590+ } ,
591+ SocketAddress :: OnionV2 { .. } => return Err ( ( ) ) ,
592+ } ;
593+ let pos = writer. position ( ) as usize ;
594+ tokio:: io:: AsyncWriteExt :: write_all ( & mut tcp_stream, & socks5_request[ ..pos] )
595+ . await
596+ . map_err ( |_| ( ) ) ?;
597+
598+ let mut reply_buffer = [ 0u8 ; 4 ] ;
599+ tcp_stream. read_exact ( & mut reply_buffer) . await . map_err ( |_| ( ) ) ?;
600+ if reply_buffer[ ..3 ] != [ VERSION , SUCCESS , RSV ] {
601+ return Err ( ( ) ) ;
602+ }
603+ match reply_buffer[ 3 ] {
604+ ATYP_IPV4 => tcp_stream. read_exact ( & mut [ 0u8 ; IPV4_ADDR_LEN ] ) . await . map_err ( |_| ( ) ) ?,
605+ ATYP_DOMAINNAME => {
606+ let hostname_len = tcp_stream. read_u8 ( ) . await . map_err ( |_| ( ) ) ? as usize ;
607+ let mut hostname_buffer = [ 0u8 ; HOSTNAME_MAX_LEN ] ;
608+ tcp_stream. read_exact ( & mut hostname_buffer[ ..hostname_len] ) . await . map_err ( |_| ( ) ) ?
609+ } ,
610+ ATYP_IPV6 => tcp_stream. read_exact ( & mut [ 0u8 ; IPV6_ADDR_LEN ] ) . await . map_err ( |_| ( ) ) ?,
611+ _ => return Err ( ( ) ) ,
612+ } ;
613+ tcp_stream. read_u16 ( ) . await . map_err ( |_| ( ) ) ?;
614+
615+ Ok ( tcp_stream)
616+ }
617+
472618const SOCK_WAKER_VTABLE : task:: RawWakerVTable = task:: RawWakerVTable :: new (
473619 clone_socket_waker,
474620 wake_socket_waker,
@@ -608,6 +754,9 @@ impl Hash for SocketDescriptor {
608754
609755#[ cfg( test) ]
610756mod tests {
757+ #[ cfg( tor_socks5) ]
758+ use super :: socks5_connect;
759+
611760 use bitcoin:: constants:: ChainHash ;
612761 use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
613762 use bitcoin:: Network ;
@@ -621,6 +770,8 @@ mod tests {
621770 use tokio:: sync:: mpsc;
622771
623772 use std:: mem;
773+ #[ cfg( tor_socks5) ]
774+ use std:: net:: SocketAddr ;
624775 use std:: sync:: atomic:: { AtomicBool , Ordering } ;
625776 use std:: sync:: { Arc , Mutex } ;
626777 use std:: time:: Duration ;
@@ -941,4 +1092,70 @@ mod tests {
9411092 async fn unthreaded_race_disconnect_accept ( ) {
9421093 race_disconnect_accept ( ) . await ;
9431094 }
1095+
1096+ #[ cfg( tor_socks5) ]
1097+ #[ tokio:: test]
1098+ async fn test_socks5_connect ( ) {
1099+ // Set TOR_SOCKS5_PROXY=127.0.0.1:9050
1100+ let socks5_proxy_addr: SocketAddr = std:: env!( "TOR_SOCKS5_PROXY" ) . parse ( ) . unwrap ( ) ;
1101+
1102+ // Success cases
1103+
1104+ for ( addr_str, user_pass) in [
1105+ // google.com
1106+ ( "142.250.189.196:80" , None ) ,
1107+ // google.com
1108+ ( "[2607:f8b0:4005:813::2004]:80" , None ) ,
1109+ // torproject.org
1110+ ( "torproject.org:80" , None ) ,
1111+ // torproject.org
1112+ ( "2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" , None ) ,
1113+ // Same vectors as above, with a username and password
1114+ ( "142.250.189.196:80" , Some ( ( "<torS0X>0" , "" ) ) ) ,
1115+ ( "[2607:f8b0:4005:813::2004]:80" , Some ( ( "<torS0X>0" , "123" ) ) ) ,
1116+ ( "torproject.org:80" , Some ( ( "<torS0X>1abc" , "" ) ) ) ,
1117+ (
1118+ "2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" ,
1119+ Some ( ( "<torS0X>1abc" , "123" ) ) ,
1120+ ) ,
1121+ ] {
1122+ let addr: SocketAddress = addr_str. parse ( ) . unwrap ( ) ;
1123+ let tcp_stream = socks5_connect ( socks5_proxy_addr, addr, user_pass) . await . unwrap ( ) ;
1124+ assert_eq ! (
1125+ tcp_stream. try_read( & mut [ 0u8 ; 1 ] ) . unwrap_err( ) . kind( ) ,
1126+ std:: io:: ErrorKind :: WouldBlock
1127+ ) ;
1128+ }
1129+
1130+ // Failure cases
1131+
1132+ for ( addr_str, user_pass) in [
1133+ // google.com, with some invalid port
1134+ ( "142.250.189.196:1234" , None ) ,
1135+ // google.com, with some invalid port
1136+ ( "[2607:f8b0:4005:813::2004]:1234" , None ) ,
1137+ // torproject.org, with some invalid port
1138+ ( "torproject.org:1234" , None ) ,
1139+ // torproject.org, with a typo
1140+ ( "3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" , None ) ,
1141+ // Same vectors as above, with a username and password
1142+ ( "142.250.189.196:1234" , Some ( ( "<torS0X>0" , "" ) ) ) ,
1143+ ( "[2607:f8b0:4005:813::2004]:1234" , Some ( ( "<torS0X>0" , "123" ) ) ) ,
1144+ ( "torproject.org:1234" , Some ( ( "<torS0X>1abc" , "" ) ) ) ,
1145+ (
1146+ "3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" ,
1147+ Some ( ( "<torS0X>1abc" , "123" ) ) ,
1148+ ) ,
1149+ /* TODO: Uncomment when format types 30 and 31 land in tor stable, see https://spec.torproject.org/socks-extensions.html,
1150+ these are invalid usernames according to those standards.
1151+ ("142.250.189.196:80", Some(("<torS0X>0abc", "123"))),
1152+ ("[2607:f8b0:4005:813::2004]:80", Some(("<torS0X>1", "123"))),
1153+ ("torproject.org:80", Some(("<torS0X>9", "123"))),
1154+ ("2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", Some(("<torS0X>", "123"))),
1155+ */
1156+ ] {
1157+ let addr: SocketAddress = addr_str. parse ( ) . unwrap ( ) ;
1158+ assert ! ( socks5_connect( socks5_proxy_addr, addr, user_pass) . await . is_err( ) ) ;
1159+ }
1160+ }
9441161}
0 commit comments