@@ -975,9 +975,7 @@ impl AssociationInternal {
975
975
let rst_reqs: Vec < ParamOutgoingResetRequest > =
976
976
self . reconfig_requests . values ( ) . cloned ( ) . collect ( ) ;
977
977
for rst_req in rst_reqs {
978
- let resp = self . reset_streams_if_any ( & rst_req) ;
979
- log:: debug!( "[{}] RESET RESPONSE: {}" , self . name, resp) ;
980
- reply. push ( resp) ;
978
+ self . reset_streams_if_any ( & rst_req, false , & mut reply) ?;
981
979
}
982
980
}
983
981
@@ -1630,15 +1628,11 @@ impl AssociationInternal {
1630
1628
let mut pp = vec ! [ ] ;
1631
1629
1632
1630
if let Some ( param_a) = & c. param_a {
1633
- if let Some ( p) = self . handle_reconfig_param ( param_a) . await ? {
1634
- pp. push ( p) ;
1635
- }
1631
+ self . handle_reconfig_param ( param_a, & mut pp) . await ?;
1636
1632
}
1637
1633
1638
1634
if let Some ( param_b) = & c. param_b {
1639
- if let Some ( p) = self . handle_reconfig_param ( param_b) . await ? {
1640
- pp. push ( p) ;
1641
- }
1635
+ self . handle_reconfig_param ( param_b, & mut pp) . await ?;
1642
1636
}
1643
1637
1644
1638
Ok ( pp)
@@ -1751,26 +1745,35 @@ impl AssociationInternal {
1751
1745
async fn handle_reconfig_param (
1752
1746
& mut self ,
1753
1747
raw : & Box < dyn Param + Send + Sync > ,
1754
- ) -> Result < Option < Packet > > {
1748
+ reply : & mut Vec < Packet > ,
1749
+ ) -> Result < ( ) > {
1755
1750
if let Some ( p) = raw. as_any ( ) . downcast_ref :: < ParamOutgoingResetRequest > ( ) {
1756
1751
self . reconfig_requests
1757
1752
. insert ( p. reconfig_request_sequence_number , p. clone ( ) ) ;
1758
- Ok ( Some ( self . reset_streams_if_any ( p) ) )
1753
+ self . reset_streams_if_any ( p, true , reply) ?;
1754
+ Ok ( ( ) )
1759
1755
} else if let Some ( p) = raw. as_any ( ) . downcast_ref :: < ParamReconfigResponse > ( ) {
1760
1756
self . reconfigs . remove ( & p. reconfig_response_sequence_number ) ;
1761
1757
if self . reconfigs . is_empty ( ) {
1762
1758
if let Some ( treconfig) = & self . treconfig {
1763
1759
treconfig. stop ( ) . await ;
1764
1760
}
1765
1761
}
1766
- Ok ( None )
1762
+ Ok ( ( ) )
1767
1763
} else {
1768
1764
Err ( Error :: ErrParamterType )
1769
1765
}
1770
1766
}
1771
1767
1772
- fn reset_streams_if_any ( & mut self , p : & ParamOutgoingResetRequest ) -> Packet {
1768
+ fn reset_streams_if_any (
1769
+ & mut self ,
1770
+ p : & ParamOutgoingResetRequest ,
1771
+ respond : bool ,
1772
+ reply : & mut Vec < Packet > ,
1773
+ ) -> Result < ( ) > {
1773
1774
let mut result = ReconfigResult :: SuccessPerformed ;
1775
+ let mut sis_to_reset = vec ! [ ] ;
1776
+
1774
1777
if sna32lte ( p. sender_last_tsn , self . peer_last_tsn ) {
1775
1778
log:: debug!(
1776
1779
"[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}" ,
@@ -1781,6 +1784,11 @@ impl AssociationInternal {
1781
1784
for id in & p. stream_identifiers {
1782
1785
if let Some ( s) = self . streams . get ( id) {
1783
1786
let stream_identifier = s. stream_identifier ;
1787
+ if respond {
1788
+ sis_to_reset. push ( * id) ;
1789
+ }
1790
+ // Someon
1791
+ // https://github.com/pion/sctp/blob/master/association.go#L2045
1784
1792
self . unregister_stream ( stream_identifier) ;
1785
1793
}
1786
1794
}
@@ -1796,13 +1804,42 @@ impl AssociationInternal {
1796
1804
result = ReconfigResult :: InProgress ;
1797
1805
}
1798
1806
1799
- self . create_packet ( vec ! [ Box :: new( ChunkReconfig {
1807
+ // Answer incoming reset requests with the same reset request, but with
1808
+ // reconfig_response_sequence_number.
1809
+ if !sis_to_reset. is_empty ( ) {
1810
+ let rsn = self . generate_next_rsn ( ) ;
1811
+ let tsn = self . my_next_tsn - 1 ;
1812
+
1813
+ let c = ChunkReconfig {
1814
+ param_a : Some ( Box :: new ( ParamOutgoingResetRequest {
1815
+ reconfig_request_sequence_number : rsn,
1816
+ reconfig_response_sequence_number : p. reconfig_request_sequence_number ,
1817
+ sender_last_tsn : tsn,
1818
+ stream_identifiers : sis_to_reset,
1819
+ ..Default :: default ( )
1820
+ } ) ) ,
1821
+ ..Default :: default ( )
1822
+ } ;
1823
+
1824
+ self . reconfigs . insert ( rsn, c. clone ( ) ) ; // store in the map for retransmission
1825
+
1826
+ let p = self . create_packet ( vec ! [ Box :: new( c) ] ) ;
1827
+ reply. push ( p) ;
1828
+ }
1829
+
1830
+ let packet = self . create_packet ( vec ! [ Box :: new( ChunkReconfig {
1800
1831
param_a: Some ( Box :: new( ParamReconfigResponse {
1801
1832
reconfig_response_sequence_number: p. reconfig_request_sequence_number,
1802
1833
result,
1803
1834
} ) ) ,
1804
1835
param_b: None ,
1805
- } ) ] )
1836
+ } ) ] ) ;
1837
+
1838
+ log:: debug!( "[{}] RESET RESPONSE: {}" , self . name, packet) ;
1839
+
1840
+ reply. push ( packet) ;
1841
+
1842
+ Ok ( ( ) )
1806
1843
}
1807
1844
1808
1845
/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
0 commit comments