Skip to content

Commit 73ce2b8

Browse files
authored
fix(data-channel): stuck closing in Chrome (#480)
* Respond with ParamOutgoingResetRequest * chore: resolve clippy warnings
1 parent 2f7df93 commit 73ce2b8

File tree

15 files changed

+71
-38
lines changed

15 files changed

+71
-38
lines changed

examples/examples/data-channels/data-channels.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ async fn main() -> Result<()> {
126126
let d2 = Arc::clone(&d);
127127
let d_label2 = d_label.clone();
128128
let d_id2 = d_id;
129+
d.on_close(Box::new(move || {
130+
println!("Data channel closed");
131+
Box::pin(async {})
132+
}));
133+
129134
d.on_open(Box::new(move || {
130135
println!("Data channel '{d_label2}'-'{d_id2}' open. Random messages will now be sent to any connected DataChannels every 5 seconds");
131136

interceptor/src/twcc/receiver/receiver_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -
119119
)
120120
.await;
121121

122-
let delays = vec![0, 10, 100, 200];
122+
let delays = [0, 10, 100, 200];
123123
for (i, d) in delays.iter().enumerate() {
124124
tokio::time::advance(Duration::from_millis(*d)).await;
125125

media/src/audio/buffer/info.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,7 @@ impl<L> Copy for BufferInfo<L> {}
4747

4848
impl<L> Clone for BufferInfo<L> {
4949
fn clone(&self) -> Self {
50-
Self {
51-
channels: self.channels,
52-
frames: self.frames,
53-
_phantom: PhantomData,
54-
}
50+
*self
5551
}
5652
}
5753

rtcp/src/payload_feedbacks/receiver_estimated_maximum_bitrate/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ impl Unmarshal for ReceiverEstimatedMaximumBitrate {
228228
}
229229

230230
// REMB rules all around me
231-
let mut unique_identifier = vec![0; 4];
231+
let mut unique_identifier = [0; 4];
232232
unique_identifier[0] = raw_packet.get_u8();
233233
unique_identifier[1] = raw_packet.get_u8();
234234
unique_identifier[2] = raw_packet.get_u8();

rtp/src/codecs/vp8/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl Payloader for Vp8Payloader {
6767
let current_fragment_size =
6868
std::cmp::min(max_fragment_size, payload_data_remaining) as usize;
6969
let mut out = BytesMut::with_capacity(using_header_size + current_fragment_size);
70-
let mut buf = vec![0u8; 4];
70+
let mut buf = [0u8; 4];
7171
if first {
7272
buf[0] = 0x10;
7373
first = false;

rtp/src/codecs/vp9/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl Payloader for Vp9Payloader {
106106
let current_fragment_size =
107107
std::cmp::min(max_fragment_size as usize, payload_data_remaining);
108108
let mut out = BytesMut::with_capacity(VP9HEADER_SIZE + current_fragment_size);
109-
let mut buf = vec![0u8; VP9HEADER_SIZE];
109+
let mut buf = [0u8; VP9HEADER_SIZE];
110110
buf[0] = 0x90; // F=1 I=1
111111
if payload_data_index == 0 {
112112
buf[0] |= 0x08; // B=1

sctp/src/association/association_internal.rs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,7 @@ impl AssociationInternal {
975975
let rst_reqs: Vec<ParamOutgoingResetRequest> =
976976
self.reconfig_requests.values().cloned().collect();
977977
for rst_req in rst_reqs {
978-
let resp = self.reset_streams_if_any(&rst_req);
979-
log::debug!("[{}] RESET RESPONSE: {}", self.name, resp);
980-
reply.push(resp);
978+
self.reset_streams_if_any(&rst_req, false, &mut reply)?;
981979
}
982980
}
983981

@@ -1630,15 +1628,11 @@ impl AssociationInternal {
16301628
let mut pp = vec![];
16311629

16321630
if let Some(param_a) = &c.param_a {
1633-
if let Some(p) = self.handle_reconfig_param(param_a).await? {
1634-
pp.push(p);
1635-
}
1631+
self.handle_reconfig_param(param_a, &mut pp).await?;
16361632
}
16371633

16381634
if let Some(param_b) = &c.param_b {
1639-
if let Some(p) = self.handle_reconfig_param(param_b).await? {
1640-
pp.push(p);
1641-
}
1635+
self.handle_reconfig_param(param_b, &mut pp).await?;
16421636
}
16431637

16441638
Ok(pp)
@@ -1751,26 +1745,35 @@ impl AssociationInternal {
17511745
async fn handle_reconfig_param(
17521746
&mut self,
17531747
raw: &Box<dyn Param + Send + Sync>,
1754-
) -> Result<Option<Packet>> {
1748+
reply: &mut Vec<Packet>,
1749+
) -> Result<()> {
17551750
if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
17561751
self.reconfig_requests
17571752
.insert(p.reconfig_request_sequence_number, p.clone());
1758-
Ok(Some(self.reset_streams_if_any(p)))
1753+
self.reset_streams_if_any(p, true, reply)?;
1754+
Ok(())
17591755
} else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
17601756
self.reconfigs.remove(&p.reconfig_response_sequence_number);
17611757
if self.reconfigs.is_empty() {
17621758
if let Some(treconfig) = &self.treconfig {
17631759
treconfig.stop().await;
17641760
}
17651761
}
1766-
Ok(None)
1762+
Ok(())
17671763
} else {
17681764
Err(Error::ErrParamterType)
17691765
}
17701766
}
17711767

1772-
fn reset_streams_if_any(&mut self, p: &ParamOutgoingResetRequest) -> Packet {
1768+
fn reset_streams_if_any(
1769+
&mut self,
1770+
p: &ParamOutgoingResetRequest,
1771+
respond: bool,
1772+
reply: &mut Vec<Packet>,
1773+
) -> Result<()> {
17731774
let mut result = ReconfigResult::SuccessPerformed;
1775+
let mut sis_to_reset = vec![];
1776+
17741777
if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
17751778
log::debug!(
17761779
"[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
@@ -1781,6 +1784,9 @@ impl AssociationInternal {
17811784
for id in &p.stream_identifiers {
17821785
if let Some(s) = self.streams.get(id) {
17831786
let stream_identifier = s.stream_identifier;
1787+
if respond {
1788+
sis_to_reset.push(*id);
1789+
}
17841790
self.unregister_stream(stream_identifier);
17851791
}
17861792
}
@@ -1796,13 +1802,41 @@ impl AssociationInternal {
17961802
result = ReconfigResult::InProgress;
17971803
}
17981804

1799-
self.create_packet(vec![Box::new(ChunkReconfig {
1805+
// Answer incoming reset requests with the same reset request, but with
1806+
// reconfig_response_sequence_number.
1807+
if !sis_to_reset.is_empty() {
1808+
let rsn = self.generate_next_rsn();
1809+
let tsn = self.my_next_tsn - 1;
1810+
1811+
let c = ChunkReconfig {
1812+
param_a: Some(Box::new(ParamOutgoingResetRequest {
1813+
reconfig_request_sequence_number: rsn,
1814+
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1815+
sender_last_tsn: tsn,
1816+
stream_identifiers: sis_to_reset,
1817+
})),
1818+
..Default::default()
1819+
};
1820+
1821+
self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
1822+
1823+
let p = self.create_packet(vec![Box::new(c)]);
1824+
reply.push(p);
1825+
}
1826+
1827+
let packet = self.create_packet(vec![Box::new(ChunkReconfig {
18001828
param_a: Some(Box::new(ParamReconfigResponse {
18011829
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
18021830
result,
18031831
})),
18041832
param_b: None,
1805-
})])
1833+
})]);
1834+
1835+
log::debug!("[{}] RESET RESPONSE: {}", self.name, packet);
1836+
1837+
reply.push(packet);
1838+
1839+
Ok(())
18061840
}
18071841

18081842
/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.

sctp/src/queue/queue_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ fn test_payload_queue_get_gap_ack_block() -> Result<()> {
7878
pq.push(make_payload(5, 0), 0);
7979
pq.push(make_payload(6, 0), 0);
8080

81-
let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
81+
let gab1 = [GapAckBlock { start: 1, end: 6 }];
8282
let gab2 = pq.get_gap_ack_blocks(0);
8383
assert!(!gab2.is_empty());
8484
assert_eq!(gab2.len(), 1);
@@ -89,7 +89,7 @@ fn test_payload_queue_get_gap_ack_block() -> Result<()> {
8989
pq.push(make_payload(8, 0), 0);
9090
pq.push(make_payload(9, 0), 0);
9191

92-
let gab1 = vec![
92+
let gab1 = [
9393
GapAckBlock { start: 1, end: 6 },
9494
GapAckBlock { start: 8, end: 9 },
9595
];

sctp/src/timer/timer_test.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,7 @@ mod test_rto_manager {
9393
#[tokio::test]
9494
async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> {
9595
let mut m = RtoManager::new();
96-
let exp = vec![
97-
1800, 1500, 1275, 1106, 1000, // capped at RTO.Min
98-
];
96+
let exp = [1800, 1500, 1275, 1106, 1000];
9997

10098
for i in 0..5 {
10199
m.set_new_rtt(600);
@@ -109,7 +107,7 @@ mod test_rto_manager {
109107
#[tokio::test]
110108
async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> {
111109
let mut m = RtoManager::new();
112-
let exp = vec![
110+
let exp = [
113111
60000, // capped at RTO.Max
114112
60000, // capped at RTO.Max
115113
60000, // capped at RTO.Max

sdp/src/direction/direction_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fn test_new_direction() {
1111
("inactive", Direction::Inactive),
1212
];
1313

14-
let failingtests = vec!["", "notadirection"];
14+
let failingtests = ["", "notadirection"];
1515

1616
for (i, u) in passingtests.iter().enumerate() {
1717
let dir = Direction::new(u.0);

sdp/src/extmap/extmap_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ fn test_extmap() -> Result<()> {
1818
let example_attr_extmap2_line = EXAMPLE_ATTR_EXTMAP2;
1919
let failing_attr_extmap1_line = format!("{ATTRIBUTE_KEY}{FAILING_ATTR_EXTMAP1}{END_LINE}");
2020
let failing_attr_extmap2_line = format!("{ATTRIBUTE_KEY}{FAILING_ATTR_EXTMAP2}{END_LINE}");
21-
let passingtests = vec![
21+
let passingtests = [
2222
(EXAMPLE_ATTR_EXTMAP1, example_attr_extmap1_line),
2323
(EXAMPLE_ATTR_EXTMAP2, example_attr_extmap2_line),
2424
];

stun/src/integrity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl MessageIntegrity {
6868
// new_long_term_integrity returns new MessageIntegrity with key for long-term
6969
// credentials. Password, username, and realm must be SASL-prepared.
7070
pub fn new_long_term_integrity(username: String, realm: String, password: String) -> Self {
71-
let s = vec![username, realm, password].join(CREDENTIALS_SEP);
71+
let s = [username, realm, password].join(CREDENTIALS_SEP);
7272

7373
let mut h = Md5::new();
7474
h.update(s.as_bytes());

stun/src/xoraddr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl XorMappedAddress {
119119
IpAddr::V6(ipv6) => (FAMILY_IPV6, IPV6LEN, ipv6.octets().to_vec()),
120120
};
121121

122-
let mut value = vec![0; 32 + 128];
122+
let mut value = [0; 32 + 128];
123123
//value[0] = 0 // first 8 bits are zeroes
124124
let mut xor_value = vec![0; IPV6LEN];
125125
xor_value[4..].copy_from_slice(&m.transaction_id.0);

util/src/vnet/router/router_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -641,8 +641,8 @@ fn test_router_static_ips_static_ip_local_ip_mapping() -> Result<()> {
641641
assert_eq!(lan.static_ips[2].to_string(), "1.2.3.3", "should match");
642642

643643
assert_eq!(3, lan.static_local_ips.len(), "should be 3");
644-
let local_ips = vec!["192.168.0.1", "192.168.0.2", "192.168.0.3"];
645-
let ips = vec!["1.2.3.1", "1.2.3.2", "1.2.3.3"];
644+
let local_ips = ["192.168.0.1", "192.168.0.2", "192.168.0.3"];
645+
let ips = ["1.2.3.1", "1.2.3.2", "1.2.3.3"];
646646
for i in 0..3 {
647647
let ext_ipstr = ips[i];
648648
if let Some(loc_ip) = lan.static_local_ips.get(ext_ipstr) {

webrtc/src/data_channel/data_channel_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ async fn test_data_channel_buffered_amount_set_before_open() -> Result<()> {
872872
Box::pin(async move {
873873
for _ in 0..10 {
874874
assert!(
875-
matches!(dc3.send(&buf).await, Ok(_)),
875+
dc3.send(&buf).await.is_ok(),
876876
"Failed to send string on data channel"
877877
);
878878
assert_eq!(
@@ -973,7 +973,7 @@ async fn test_data_channel_buffered_amount_set_after_open() -> Result<()> {
973973

974974
for _ in 0..10 {
975975
assert!(
976-
matches!(dc3.send(&buf).await, Ok(_)),
976+
dc3.send(&buf).await.is_ok(),
977977
"Failed to send string on data channel"
978978
);
979979
assert_eq!(

0 commit comments

Comments
 (0)