Skip to content

fix(data-channel): stuck closing in Chrome #480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/examples/data-channels/data-channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ async fn main() -> Result<()> {
let d2 = Arc::clone(&d);
let d_label2 = d_label.clone();
let d_id2 = d_id;
d.on_close(Box::new(move || {
println!("Data channel closed");
Box::pin(async {})
}));

d.on_open(Box::new(move || {
println!("Data channel '{d_label2}'-'{d_id2}' open. Random messages will now be sent to any connected DataChannels every 5 seconds");

Expand Down
2 changes: 1 addition & 1 deletion interceptor/src/twcc/receiver/receiver_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -
)
.await;

let delays = vec![0, 10, 100, 200];
let delays = [0, 10, 100, 200];
for (i, d) in delays.iter().enumerate() {
tokio::time::advance(Duration::from_millis(*d)).await;

Expand Down
6 changes: 1 addition & 5 deletions media/src/audio/buffer/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ impl<L> Copy for BufferInfo<L> {}

impl<L> Clone for BufferInfo<L> {
fn clone(&self) -> Self {
Self {
channels: self.channels,
frames: self.frames,
_phantom: PhantomData,
}
*self
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Unmarshal for ReceiverEstimatedMaximumBitrate {
}

// REMB rules all around me
let mut unique_identifier = vec![0; 4];
let mut unique_identifier = [0; 4];
unique_identifier[0] = raw_packet.get_u8();
unique_identifier[1] = raw_packet.get_u8();
unique_identifier[2] = raw_packet.get_u8();
Expand Down
2 changes: 1 addition & 1 deletion rtp/src/codecs/vp8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Payloader for Vp8Payloader {
let current_fragment_size =
std::cmp::min(max_fragment_size, payload_data_remaining) as usize;
let mut out = BytesMut::with_capacity(using_header_size + current_fragment_size);
let mut buf = vec![0u8; 4];
let mut buf = [0u8; 4];
if first {
buf[0] = 0x10;
first = false;
Expand Down
2 changes: 1 addition & 1 deletion rtp/src/codecs/vp9/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Payloader for Vp9Payloader {
let current_fragment_size =
std::cmp::min(max_fragment_size as usize, payload_data_remaining);
let mut out = BytesMut::with_capacity(VP9HEADER_SIZE + current_fragment_size);
let mut buf = vec![0u8; VP9HEADER_SIZE];
let mut buf = [0u8; VP9HEADER_SIZE];
buf[0] = 0x90; // F=1 I=1
if payload_data_index == 0 {
buf[0] |= 0x08; // B=1
Expand Down
64 changes: 49 additions & 15 deletions sctp/src/association/association_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,7 @@ impl AssociationInternal {
let rst_reqs: Vec<ParamOutgoingResetRequest> =
self.reconfig_requests.values().cloned().collect();
for rst_req in rst_reqs {
let resp = self.reset_streams_if_any(&rst_req);
log::debug!("[{}] RESET RESPONSE: {}", self.name, resp);
reply.push(resp);
self.reset_streams_if_any(&rst_req, false, &mut reply)?;
}
}

Expand Down Expand Up @@ -1630,15 +1628,11 @@ impl AssociationInternal {
let mut pp = vec![];

if let Some(param_a) = &c.param_a {
if let Some(p) = self.handle_reconfig_param(param_a).await? {
pp.push(p);
}
self.handle_reconfig_param(param_a, &mut pp).await?;
}

if let Some(param_b) = &c.param_b {
if let Some(p) = self.handle_reconfig_param(param_b).await? {
pp.push(p);
}
self.handle_reconfig_param(param_b, &mut pp).await?;
}

Ok(pp)
Expand Down Expand Up @@ -1751,26 +1745,35 @@ impl AssociationInternal {
async fn handle_reconfig_param(
&mut self,
raw: &Box<dyn Param + Send + Sync>,
) -> Result<Option<Packet>> {
reply: &mut Vec<Packet>,
) -> Result<()> {
if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
self.reconfig_requests
.insert(p.reconfig_request_sequence_number, p.clone());
Ok(Some(self.reset_streams_if_any(p)))
self.reset_streams_if_any(p, true, reply)?;
Ok(())
} else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
self.reconfigs.remove(&p.reconfig_response_sequence_number);
if self.reconfigs.is_empty() {
if let Some(treconfig) = &self.treconfig {
treconfig.stop().await;
}
}
Ok(None)
Ok(())
} else {
Err(Error::ErrParamterType)
}
}

fn reset_streams_if_any(&mut self, p: &ParamOutgoingResetRequest) -> Packet {
fn reset_streams_if_any(
&mut self,
p: &ParamOutgoingResetRequest,
respond: bool,
reply: &mut Vec<Packet>,
) -> Result<()> {
let mut result = ReconfigResult::SuccessPerformed;
let mut sis_to_reset = vec![];

if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
log::debug!(
"[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
Expand All @@ -1781,6 +1784,9 @@ impl AssociationInternal {
for id in &p.stream_identifiers {
if let Some(s) = self.streams.get(id) {
let stream_identifier = s.stream_identifier;
if respond {
sis_to_reset.push(*id);
}
self.unregister_stream(stream_identifier);
}
}
Expand All @@ -1796,13 +1802,41 @@ impl AssociationInternal {
result = ReconfigResult::InProgress;
}

self.create_packet(vec![Box::new(ChunkReconfig {
// Answer incoming reset requests with the same reset request, but with
// reconfig_response_sequence_number.
if !sis_to_reset.is_empty() {
let rsn = self.generate_next_rsn();
let tsn = self.my_next_tsn - 1;

let c = ChunkReconfig {
param_a: Some(Box::new(ParamOutgoingResetRequest {
reconfig_request_sequence_number: rsn,
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
sender_last_tsn: tsn,
stream_identifiers: sis_to_reset,
})),
..Default::default()
};

self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission

let p = self.create_packet(vec![Box::new(c)]);
reply.push(p);
}

let packet = self.create_packet(vec![Box::new(ChunkReconfig {
param_a: Some(Box::new(ParamReconfigResponse {
reconfig_response_sequence_number: p.reconfig_request_sequence_number,
result,
})),
param_b: None,
})])
})]);

log::debug!("[{}] RESET RESPONSE: {}", self.name, packet);

reply.push(packet);

Ok(())
}

/// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
Expand Down
4 changes: 2 additions & 2 deletions sctp/src/queue/queue_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn test_payload_queue_get_gap_ack_block() -> Result<()> {
pq.push(make_payload(5, 0), 0);
pq.push(make_payload(6, 0), 0);

let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
let gab1 = [GapAckBlock { start: 1, end: 6 }];
let gab2 = pq.get_gap_ack_blocks(0);
assert!(!gab2.is_empty());
assert_eq!(gab2.len(), 1);
Expand All @@ -89,7 +89,7 @@ fn test_payload_queue_get_gap_ack_block() -> Result<()> {
pq.push(make_payload(8, 0), 0);
pq.push(make_payload(9, 0), 0);

let gab1 = vec![
let gab1 = [
GapAckBlock { start: 1, end: 6 },
GapAckBlock { start: 8, end: 9 },
];
Expand Down
6 changes: 2 additions & 4 deletions sctp/src/timer/timer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ mod test_rto_manager {
#[tokio::test]
async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> {
let mut m = RtoManager::new();
let exp = vec![
1800, 1500, 1275, 1106, 1000, // capped at RTO.Min
];
let exp = [1800, 1500, 1275, 1106, 1000];

for i in 0..5 {
m.set_new_rtt(600);
Expand All @@ -109,7 +107,7 @@ mod test_rto_manager {
#[tokio::test]
async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> {
let mut m = RtoManager::new();
let exp = vec![
let exp = [
60000, // capped at RTO.Max
60000, // capped at RTO.Max
60000, // capped at RTO.Max
Expand Down
2 changes: 1 addition & 1 deletion sdp/src/direction/direction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn test_new_direction() {
("inactive", Direction::Inactive),
];

let failingtests = vec!["", "notadirection"];
let failingtests = ["", "notadirection"];

for (i, u) in passingtests.iter().enumerate() {
let dir = Direction::new(u.0);
Expand Down
2 changes: 1 addition & 1 deletion sdp/src/extmap/extmap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn test_extmap() -> Result<()> {
let example_attr_extmap2_line = EXAMPLE_ATTR_EXTMAP2;
let failing_attr_extmap1_line = format!("{ATTRIBUTE_KEY}{FAILING_ATTR_EXTMAP1}{END_LINE}");
let failing_attr_extmap2_line = format!("{ATTRIBUTE_KEY}{FAILING_ATTR_EXTMAP2}{END_LINE}");
let passingtests = vec![
let passingtests = [
(EXAMPLE_ATTR_EXTMAP1, example_attr_extmap1_line),
(EXAMPLE_ATTR_EXTMAP2, example_attr_extmap2_line),
];
Expand Down
2 changes: 1 addition & 1 deletion stun/src/integrity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl MessageIntegrity {
// new_long_term_integrity returns new MessageIntegrity with key for long-term
// credentials. Password, username, and realm must be SASL-prepared.
pub fn new_long_term_integrity(username: String, realm: String, password: String) -> Self {
let s = vec![username, realm, password].join(CREDENTIALS_SEP);
let s = [username, realm, password].join(CREDENTIALS_SEP);

let mut h = Md5::new();
h.update(s.as_bytes());
Expand Down
2 changes: 1 addition & 1 deletion stun/src/xoraddr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl XorMappedAddress {
IpAddr::V6(ipv6) => (FAMILY_IPV6, IPV6LEN, ipv6.octets().to_vec()),
};

let mut value = vec![0; 32 + 128];
let mut value = [0; 32 + 128];
//value[0] = 0 // first 8 bits are zeroes
let mut xor_value = vec![0; IPV6LEN];
xor_value[4..].copy_from_slice(&m.transaction_id.0);
Expand Down
4 changes: 2 additions & 2 deletions util/src/vnet/router/router_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ fn test_router_static_ips_static_ip_local_ip_mapping() -> Result<()> {
assert_eq!(lan.static_ips[2].to_string(), "1.2.3.3", "should match");

assert_eq!(3, lan.static_local_ips.len(), "should be 3");
let local_ips = vec!["192.168.0.1", "192.168.0.2", "192.168.0.3"];
let ips = vec!["1.2.3.1", "1.2.3.2", "1.2.3.3"];
let local_ips = ["192.168.0.1", "192.168.0.2", "192.168.0.3"];
let ips = ["1.2.3.1", "1.2.3.2", "1.2.3.3"];
for i in 0..3 {
let ext_ipstr = ips[i];
if let Some(loc_ip) = lan.static_local_ips.get(ext_ipstr) {
Expand Down
4 changes: 2 additions & 2 deletions webrtc/src/data_channel/data_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ async fn test_data_channel_buffered_amount_set_before_open() -> Result<()> {
Box::pin(async move {
for _ in 0..10 {
assert!(
matches!(dc3.send(&buf).await, Ok(_)),
dc3.send(&buf).await.is_ok(),
"Failed to send string on data channel"
);
assert_eq!(
Expand Down Expand Up @@ -973,7 +973,7 @@ async fn test_data_channel_buffered_amount_set_after_open() -> Result<()> {

for _ in 0..10 {
assert!(
matches!(dc3.send(&buf).await, Ok(_)),
dc3.send(&buf).await.is_ok(),
"Failed to send string on data channel"
);
assert_eq!(
Expand Down