@@ -83,7 +83,9 @@ use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, R
83
83
use futures_io:: { AsyncRead , AsyncWrite } ;
84
84
use futures_lite:: stream:: { self , Stream } ;
85
85
use futures_lite:: { future, pin, ready} ;
86
- use socket2:: { Domain , Protocol , SockAddr , Socket , Type } ;
86
+
87
+ use rustix:: io as rio;
88
+ use rustix:: net as rn;
87
89
88
90
use crate :: reactor:: { Reactor , Registration , Source } ;
89
91
@@ -1487,10 +1489,15 @@ impl Async<TcpStream> {
1487
1489
/// # std::io::Result::Ok(()) });
1488
1490
/// ```
1489
1491
pub async fn connect < A : Into < SocketAddr > > ( addr : A ) -> io:: Result < Async < TcpStream > > {
1490
- // Begin async connect .
1492
+ // Figure out how to handle this address .
1491
1493
let addr = addr. into ( ) ;
1492
- let domain = Domain :: for_address ( addr) ;
1493
- let socket = connect ( addr. into ( ) , domain, Some ( Protocol :: TCP ) ) ?;
1494
+ let ( domain, sock_addr) = match addr {
1495
+ SocketAddr :: V4 ( v4) => ( rn:: AddressFamily :: INET , rn:: SocketAddrAny :: V4 ( v4) ) ,
1496
+ SocketAddr :: V6 ( v6) => ( rn:: AddressFamily :: INET6 , rn:: SocketAddrAny :: V6 ( v6) ) ,
1497
+ } ;
1498
+
1499
+ // Begin async connect.
1500
+ let socket = connect ( sock_addr, domain, Some ( rn:: ipproto:: TCP ) ) ?;
1494
1501
let stream = Async :: new ( TcpStream :: from ( socket) ) ?;
1495
1502
1496
1503
// The stream becomes writable when connected.
@@ -1819,7 +1826,11 @@ impl Async<UnixStream> {
1819
1826
/// ```
1820
1827
pub async fn connect < P : AsRef < Path > > ( path : P ) -> io:: Result < Async < UnixStream > > {
1821
1828
// Begin async connect.
1822
- let socket = connect ( SockAddr :: unix ( path) ?, Domain :: UNIX , None ) ?;
1829
+ let socket = connect (
1830
+ rn:: SocketAddrUnix :: new ( path. as_ref ( ) ) ?. into ( ) ,
1831
+ rn:: AddressFamily :: UNIX ,
1832
+ None ,
1833
+ ) ?;
1823
1834
let stream = Async :: new ( UnixStream :: from ( socket) ) ?;
1824
1835
1825
1836
// The stream becomes writable when connected.
@@ -2029,8 +2040,11 @@ async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>
2029
2040
. await
2030
2041
}
2031
2042
2032
- fn connect ( addr : SockAddr , domain : Domain , protocol : Option < Protocol > ) -> io:: Result < Socket > {
2033
- let sock_type = Type :: STREAM ;
2043
+ fn connect (
2044
+ addr : rn:: SocketAddrAny ,
2045
+ domain : rn:: AddressFamily ,
2046
+ protocol : Option < rn:: Protocol > ,
2047
+ ) -> io:: Result < rustix:: fd:: OwnedFd > {
2034
2048
#[ cfg( any(
2035
2049
target_os = "android" ,
2036
2050
target_os = "dragonfly" ,
@@ -2041,10 +2055,13 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
2041
2055
target_os = "netbsd" ,
2042
2056
target_os = "openbsd"
2043
2057
) ) ]
2044
- // If we can, set nonblocking at socket creation for unix
2045
- let sock_type = sock_type. nonblocking ( ) ;
2046
- // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
2047
- let socket = Socket :: new ( domain, sock_type, protocol) ?;
2058
+ let socket = rn:: socket_with (
2059
+ domain,
2060
+ rn:: SocketType :: STREAM ,
2061
+ rn:: SocketFlags :: CLOEXEC | rn:: SocketFlags :: NONBLOCK ,
2062
+ protocol,
2063
+ ) ?;
2064
+
2048
2065
#[ cfg( not( any(
2049
2066
target_os = "android" ,
2050
2067
target_os = "dragonfly" ,
@@ -2055,14 +2072,62 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
2055
2072
target_os = "netbsd" ,
2056
2073
target_os = "openbsd"
2057
2074
) ) ) ]
2058
- // If the current platform doesn't support nonblocking at creation, enable it after creation
2059
- socket. set_nonblocking ( true ) ?;
2060
- match socket. connect ( & addr) {
2075
+ let socket = {
2076
+ #[ cfg( not( any(
2077
+ target_os = "macos" ,
2078
+ target_os = "ios" ,
2079
+ target_os = "tvos" ,
2080
+ target_os = "watchos" ,
2081
+ windows,
2082
+ target_os = "aix" ,
2083
+ target_os = "haiku"
2084
+ ) ) ) ]
2085
+ let flags = rn:: SocketFlags :: CLOEXEC ;
2086
+ #[ cfg( any(
2087
+ target_os = "macos" ,
2088
+ target_os = "ios" ,
2089
+ target_os = "tvos" ,
2090
+ target_os = "watchos" ,
2091
+ windows,
2092
+ target_os = "aix" ,
2093
+ target_os = "haiku"
2094
+ ) ) ]
2095
+ let flags = rn:: SocketFlags :: empty ( ) ;
2096
+
2097
+ // Create the socket.
2098
+ let socket = rn:: socket_with ( domain, rn:: SocketFlags :: STREAM , flags, protocol) ?;
2099
+
2100
+ // Set cloexec if necessary.
2101
+ #[ cfg( any(
2102
+ target_os = "macos" ,
2103
+ target_os = "ios" ,
2104
+ target_os = "tvos" ,
2105
+ target_os = "watchos"
2106
+ ) ) ]
2107
+ rio:: fcntl_setfd ( & socket, rio:: fcntl_getfd ( & socket) ? | rio:: FdFlags :: CLOEXEC ) ?;
2108
+
2109
+ // Set non-blocking mode.
2110
+ rio:: ioctl_fionbio ( & socket, true ) ?;
2111
+
2112
+ socket
2113
+ } ;
2114
+
2115
+ // Set nosigpipe if necessary.
2116
+ #[ cfg( any(
2117
+ target_os = "macos" ,
2118
+ target_os = "ios" ,
2119
+ target_os = "tvos" ,
2120
+ target_os = "watchos" ,
2121
+ target_os = "freebsd"
2122
+ ) ) ]
2123
+ rn:: sockopt:: set_socket_nosigpipe ( & socket, true ) ?;
2124
+
2125
+ match rn:: connect_any ( & socket, & addr) {
2061
2126
Ok ( _) => { }
2062
2127
#[ cfg( unix) ]
2063
- Err ( err ) if err . raw_os_error ( ) == Some ( rustix :: io :: Errno :: INPROGRESS . raw_os_error ( ) ) => { }
2064
- Err ( err ) if err . kind ( ) == io :: ErrorKind :: WouldBlock => { }
2065
- Err ( err) => return Err ( err) ,
2128
+ Err ( rio :: Errno :: INPROGRESS ) => { }
2129
+ Err ( rio :: Errno :: AGAIN ) => { }
2130
+ Err ( err) => return Err ( err. into ( ) ) ,
2066
2131
}
2067
2132
Ok ( socket)
2068
2133
}
0 commit comments