Skip to content

Commit c257866

Browse files
committed
Respond with ParamOutgoingResetRequest
1 parent 2f7df93 commit c257866

File tree

2 files changed

+55
-15
lines changed

2 files changed

+55
-15
lines changed

examples/examples/data-channels/data-channels.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ async fn main() -> Result<()> {
126126
let d2 = Arc::clone(&d);
127127
let d_label2 = d_label.clone();
128128
let d_id2 = d_id;
129+
d.on_close(Box::new(move || {
130+
println!("Data channel closed");
131+
Box::pin(async {})
132+
}));
133+
129134
d.on_open(Box::new(move || {
130135
println!("Data channel '{d_label2}'-'{d_id2}' open. Random messages will now be sent to any connected DataChannels every 5 seconds");
131136

sctp/src/association/association_internal.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,7 @@ impl AssociationInternal {
975975
let rst_reqs: Vec<ParamOutgoingResetRequest> =
976976
self.reconfig_requests.values().cloned().collect();
977977
for rst_req in rst_reqs {
978-
let resp = self.reset_streams_if_any(&rst_req);
979-
log::debug!("[{}] RESET RESPONSE: {}", self.name, resp);
980-
reply.push(resp);
978+
self.reset_streams_if_any(&rst_req, false, &mut reply)?;
981979
}
982980
}
983981

@@ -1630,15 +1628,11 @@ impl AssociationInternal {
16301628
let mut pp = vec![];
16311629

16321630
if let Some(param_a) = &c.param_a {
1633-
if let Some(p) = self.handle_reconfig_param(param_a).await? {
1634-
pp.push(p);
1635-
}
1631+
self.handle_reconfig_param(param_a, &mut pp).await?;
16361632
}
16371633

16381634
if let Some(param_b) = &c.param_b {
1639-
if let Some(p) = self.handle_reconfig_param(param_b).await? {
1640-
pp.push(p);
1641-
}
1635+
self.handle_reconfig_param(param_b, &mut pp).await?;
16421636
}
16431637

16441638
Ok(pp)
@@ -1751,26 +1745,35 @@ impl AssociationInternal {
17511745
async fn handle_reconfig_param(
17521746
&mut self,
17531747
raw: &Box<dyn Param + Send + Sync>,
1754-
) -> Result<Option<Packet>> {
1748+
reply: &mut Vec<Packet>,
1749+
) -> Result<()> {
17551750
if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
17561751
self.reconfig_requests
17571752
.insert(p.reconfig_request_sequence_number, p.clone());
1758-
Ok(Some(self.reset_streams_if_any(p)))
1753+
self.reset_streams_if_any(p, true, reply)?;
1754+
Ok(())
17591755
} else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
17601756
self.reconfigs.remove(&p.reconfig_response_sequence_number);
17611757
if self.reconfigs.is_empty() {
17621758
if let Some(treconfig) = &self.treconfig {
17631759
treconfig.stop().await;
17641760
}
17651761
}
1766-
Ok(None)
1762+
Ok(())
17671763
} else {
17681764
Err(Error::ErrParamterType)
17691765
}
17701766
}
17711767

1772-
fn reset_streams_if_any(&mut self, p: &ParamOutgoingResetRequest) -> Packet {
1768+
fn reset_streams_if_any(
1769+
&mut self,
1770+
p: &ParamOutgoingResetRequest,
1771+
respond: bool,
1772+
reply: &mut Vec<Packet>,
1773+
) -> Result<()> {
17731774
let mut result = ReconfigResult::SuccessPerformed;
1775+
let mut sis_to_reset = vec![];
1776+
17741777
if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
17751778
log::debug!(
17761779
"[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
@@ -1781,6 +1784,9 @@ impl AssociationInternal {
17811784
for id in &p.stream_identifiers {
17821785
if let Some(s) = self.streams.get(id) {
17831786
let stream_identifier = s.stream_identifier;
1787+
if respond {
1788+
sis_to_reset.push(*id);
1789+
}
17841790
self.unregister_stream(stream_identifier);
17851791
}
17861792
}
@@ -1796,13 +1802,42 @@ impl AssociationInternal {
17961802
result = ReconfigResult::InProgress;
17971803
}
17981804

1799-
self.create_packet(vec![Box::new(ChunkReconfig {
1805+
// Answer incoming reset requests with the same reset request, but with
1806+
// reconfig_response_sequence_number.
1807+
if !sis_to_reset.is_empty() {
1808+
let rsn = self.generate_next_rsn();
1809+
let tsn = self.my_next_tsn - 1;
1810+
1811+
let c = ChunkReconfig {
1812+
param_a: Some(Box::new(ParamOutgoingResetRequest {
1813+
reconfig_request_sequence_number: rsn,
1814+
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1815+
sender_last_tsn: tsn,
1816+
stream_identifiers: sis_to_reset,
1817+
..Default::default()
1818+
})),
1819+
..Default::default()
1820+
};
1821+
1822+
self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
1823+
1824+
let p = self.create_packet(vec![Box::new(c)]);
1825+
reply.push(p);
1826+
}
1827+
1828+
let packet = self.create_packet(vec![Box::new(ChunkReconfig {
18001829
param_a: Some(Box::new(ParamReconfigResponse {
18011830
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
18021831
result,
18031832
})),
18041833
param_b: None,
1805-
})])
1834+
})]);
1835+
1836+
log::debug!("[{}] RESET RESPONSE: {}", self.name, packet);
1837+
1838+
reply.push(packet);
1839+
1840+
Ok(())
18061841
}
18071842

18081843
/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.

0 commit comments

Comments
 (0)