@@ -11,22 +11,31 @@ use super::Connection;
1111use crate :: { error:: Error , utils:: UdpRelayMode } ;
1212
1313impl Connection {
14- pub async fn handle_uni_stream ( self , recv : RecvStream , _reg : Register ) {
14+ pub async fn handle_uni_stream ( self , recv : RecvStream , reg : Register ) {
1515 debug ! (
1616 "[{id:#010x}] [{addr}] [{user}] incoming unidirectional stream" ,
1717 id = self . id( ) ,
1818 addr = self . inner. remote_address( ) ,
1919 user = self . auth,
2020 ) ;
2121
22- let max = self . max_concurrent_uni_streams . load ( Ordering :: Relaxed ) ;
23-
24- if self . remote_uni_stream_cnt . count ( ) as u32 == max {
25- self . max_concurrent_uni_streams
26- . store ( max * 2 , Ordering :: Relaxed ) ;
22+ let current_max = self . max_concurrent_uni_streams . load ( Ordering :: Relaxed ) ;
2723
24+ if self . remote_uni_stream_cnt . count ( ) as u32 >= current_max - 1
25+ && let Ok ( _) = self . max_concurrent_uni_streams . compare_exchange (
26+ current_max,
27+ current_max * 2 ,
28+ Ordering :: AcqRel ,
29+ Ordering :: Acquire ,
30+ )
31+ {
32+ debug ! (
33+ "[{id:#010x}] reached max concurrent uni_streams, setting bigger limitation={num}" ,
34+ id = self . id( ) ,
35+ num = current_max * 2
36+ ) ;
2837 self . inner
29- . set_max_concurrent_uni_streams ( VarInt :: from ( max * 2 ) ) ;
38+ . set_max_concurrent_uni_streams ( VarInt :: from ( current_max * 2 ) ) ;
3039 }
3140
3241 let pre_process = async {
@@ -71,24 +80,34 @@ impl Connection {
7180 self . close ( ) ;
7281 }
7382 }
83+ drop ( reg) ;
7484 }
7585
76- pub async fn handle_bi_stream ( self , ( send, recv) : ( SendStream , RecvStream ) , _reg : Register ) {
86+ pub async fn handle_bi_stream ( self , ( send, recv) : ( SendStream , RecvStream ) , reg : Register ) {
7787 debug ! (
7888 "[{id:#010x}] [{addr}] [{user}] incoming bidirectional stream" ,
7989 id = self . id( ) ,
8090 addr = self . inner. remote_address( ) ,
8191 user = self . auth,
8292 ) ;
8393
84- let max = self . max_concurrent_bi_streams . load ( Ordering :: Relaxed ) ;
85-
86- if self . remote_bi_stream_cnt . count ( ) as u32 == max {
87- self . max_concurrent_bi_streams
88- . store ( max * 2 , Ordering :: Relaxed ) ;
94+ let current_max = self . max_concurrent_bi_streams . load ( Ordering :: Relaxed ) ;
8995
96+ if self . remote_bi_stream_cnt . count ( ) as u32 >= current_max - 1
97+ && let Ok ( _) = self . max_concurrent_bi_streams . compare_exchange (
98+ current_max,
99+ current_max * 2 ,
100+ Ordering :: AcqRel ,
101+ Ordering :: Acquire ,
102+ )
103+ {
104+ debug ! (
105+ "[{id:#010x}] reached max concurrent bi_streams, setting bigger limitation={num}" ,
106+ id = self . id( ) ,
107+ num = current_max * 2
108+ ) ;
90109 self . inner
91- . set_max_concurrent_bi_streams ( VarInt :: from ( max * 2 ) ) ;
110+ . set_max_concurrent_bi_streams ( VarInt :: from ( current_max * 2 ) ) ;
92111 }
93112
94113 let pre_process = async {
@@ -121,6 +140,7 @@ impl Connection {
121140 self . close ( ) ;
122141 }
123142 }
143+ drop ( reg) ;
124144 }
125145
126146 pub async fn handle_datagram ( self , dg : Bytes ) {
0 commit comments