@@ -10,13 +10,15 @@ use crate::{
1010
1111use async_trait:: async_trait;
1212use std:: {
13+ collections:: { HashMap , hash_map} ,
1314 io, mem,
1415 net:: SocketAddr ,
15- os:: fd:: { AsFd , AsRawFd } ,
16+ os:: fd:: AsRawFd ,
1617 sync:: Arc ,
1718} ;
18- use tokio:: net:: TcpListener ;
19+ use tokio:: { net:: TcpListener , time :: Instant } ;
1920use tracing:: { trace, warn} ;
21+ use unix_udp_sock:: UdpSocket ;
2022
2123pub struct TproxyInbound {
2224 addr : SocketAddr ,
@@ -141,14 +143,29 @@ impl InboundHandlerTrait for TproxyInbound {
141143 . await
142144 }
143145}
146+ fn bind_nonlocal_socket ( src_addr : SocketAddr ) -> io:: Result < UdpSocket > {
147+ let domain = if src_addr. is_ipv4 ( ) {
148+ socket2:: Domain :: IPV4
149+ } else {
150+ socket2:: Domain :: IPV6
151+ } ;
152+ let socket = socket2:: Socket :: new ( domain, socket2:: Type :: DGRAM , None ) ?;
153+ if src_addr. is_ipv4 ( ) {
154+ socket. set_ip_transparent_v4 ( true ) ?;
155+ }
156+ socket. bind ( & src_addr. into ( ) ) ?;
157+
158+ let socket = UdpSocket :: from_std ( socket. into ( ) ) ?;
159+ Ok ( socket)
160+ }
144161
145162async fn handle_inbound_datagram (
146163 allow_lan : bool ,
147164 socket : Arc < unix_udp_sock:: UdpSocket > ,
148165 dispatcher : Arc < Dispatcher > ,
149166) -> std:: io:: Result < ( ) > {
150167 // dispatcher <-> tproxy communications
151- let ( l_tx, mut l_rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
168+ let ( l_tx, l_rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
152169
153170 // forward packets from tproxy to dispatcher
154171 let ( d_tx, d_rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
@@ -164,28 +181,12 @@ async fn handle_inbound_datagram(
164181 ..Default :: default ( )
165182 } ;
166183
167- let closer = dispatcher
184+ let closer: tokio :: sync :: oneshot :: Sender < u8 > = dispatcher
168185 . dispatch_datagram ( sess, Box :: new ( udp_stream) )
169186 . await ;
170187
171188 // dispatcher -> tproxy
172- let responder = socket. clone ( ) ;
173- let fut1 = tokio:: spawn ( async move {
174- while let Some ( pkt) = l_rx. recv ( ) . await {
175- trace ! ( "tproxy <- dispatcher: {:?}" , pkt) ;
176-
177- // remote -> local
178- match responder
179- . send_to ( & pkt. data [ ..] , pkt. dst_addr . must_into_socket_addr ( ) )
180- . await
181- {
182- Ok ( _) => { }
183- Err ( e) => {
184- warn ! ( "failed to send udp packet to proxy: {}" , e) ;
185- }
186- }
187- }
188- } ) ;
189+ let fut1 = tokio:: spawn ( handle_packet_from_dispatcher ( l_rx) ) ;
189190
190191 // tproxy -> dispatcher
191192 let fut2 = tokio:: spawn ( async move {
@@ -278,3 +279,50 @@ fn set_ip_recv_orig_dstaddr(
278279
279280 Ok ( ( ) )
280281}
282+
283+ async fn handle_packet_from_dispatcher (
284+ mut l_rx : tokio:: sync:: mpsc:: Receiver < UdpPacket > ,
285+ ) {
286+ let mut responder_map = HashMap :: < SocketAddr , ( Arc < UdpSocket > , Instant ) > :: new ( ) ;
287+ while let Some ( pkt) = l_rx. recv ( ) . await {
288+ trace ! ( "tproxy <- dispatcher: {:?}" , pkt) ;
289+
290+ // We must modify set the src address for the outgoing packet by binding
291+ // address to src_addr
292+ // To avoid repeated creating sockets, sockets need be cached here
293+ // We can't use tproxy listen socket, because the bound address is localhost
294+ // remote -> local
295+ let now = Instant :: now ( ) ;
296+ let src_addr = pkt. src_addr . must_into_socket_addr ( ) ;
297+ let responder = match responder_map. entry ( src_addr) {
298+ hash_map:: Entry :: Occupied ( mut occupied_entry) => {
299+ occupied_entry. get_mut ( ) . 1 = now;
300+ occupied_entry. get ( ) . 0 . clone ( )
301+ }
302+ hash_map:: Entry :: Vacant ( vacant_entry) => {
303+ let socket = match bind_nonlocal_socket ( src_addr) {
304+ Ok ( x) => Arc :: new ( x) ,
305+ Err ( e) => {
306+ tracing:: error!(
307+ "failed to bind nonlocal socket for tproxy:{}" ,
308+ e
309+ ) ;
310+ continue ;
311+ }
312+ } ;
313+ vacant_entry. insert ( ( socket. clone ( ) , now) ) ;
314+ socket
315+ }
316+ } ;
317+ match responder
318+ . send_to ( & pkt. data [ ..] , pkt. dst_addr . must_into_socket_addr ( ) )
319+ . await
320+ {
321+ Ok ( _) => { }
322+ Err ( e) => {
323+ warn ! ( "failed to send udp packet to proxy: {}" , e) ;
324+ }
325+ }
326+ responder_map. retain ( |_k, v| now. duration_since ( v. 1 ) . as_secs ( ) < 60 ) ;
327+ }
328+ }
0 commit comments