Skip to content

Commit 225cec0

Browse files
authored
sctp: improve payload queue push performance (#365)
As discussed in #360 Gathering packets to send is a big chunk of the work the Association::write_loop is doing while in a critical section. This PR improves the performance of this by making the payload queue more performant to push to. Previously this did a full mergesort (O(n*log(n))) on all the in-flight TSNs, now it does a binary search (O(log(n)))
1 parent 58b4feb commit 225cec0

File tree

2 files changed

+31
-24
lines changed

2 files changed

+31
-24
lines changed

sctp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* Performance improvements
6+
* improve algorithm used to push to pending queue from O(n*log(n)) to O(log(n)) [#365](https://github.com/webrtc-rs/webrtc/pull/365)
67
* reuse as many allocations as possible when marshaling [#364](https://github.com/webrtc-rs/webrtc/pull/364)
78
* The lock for the internal association was contended badly because marshaling was done while still in a critical section and also tokio was scheduling tasks badly [#363](https://github.com/webrtc-rs/webrtc/pull/363)
89

sctp/src/queue/payload_queue.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use crate::chunk::chunk_payload_data::ChunkPayloadData;
22
use crate::chunk::chunk_selective_ack::GapAckBlock;
33
use crate::util::*;
44

5-
use std::collections::HashMap;
5+
use std::collections::{HashMap, VecDeque};
66
use std::sync::atomic::{AtomicUsize, Ordering};
77
use std::sync::Arc;
88

99
#[derive(Default, Debug)]
1010
pub(crate) struct PayloadQueue {
1111
pub(crate) length: Arc<AtomicUsize>,
1212
pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
13-
pub(crate) sorted: Vec<u32>,
13+
pub(crate) sorted: VecDeque<u32>,
1414
pub(crate) dup_tsn: Vec<u32>,
1515
pub(crate) n_bytes: usize,
1616
}
@@ -24,26 +24,37 @@ impl PayloadQueue {
2424
}
2525
}
2626

27-
pub(crate) fn update_sorted_keys(&mut self) {
28-
self.sorted.sort_by(|a, b| {
29-
if sna32lt(*a, *b) {
30-
std::cmp::Ordering::Less
31-
} else {
32-
std::cmp::Ordering::Greater
33-
}
34-
});
35-
}
36-
3727
pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
3828
!(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
3929
}
4030

4131
pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
32+
let tsn = p.tsn;
4233
self.n_bytes += p.user_data.len();
43-
self.sorted.push(p.tsn);
44-
self.chunk_map.insert(p.tsn, p);
34+
self.chunk_map.insert(tsn, p);
4535
self.length.fetch_add(1, Ordering::SeqCst);
46-
self.update_sorted_keys();
36+
37+
if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
38+
self.sorted.push_back(tsn);
39+
} else if sna32lt(tsn, *self.sorted.front().unwrap()) {
40+
self.sorted.push_front(tsn);
41+
} else {
42+
fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
43+
if sna32lt(a, b) {
44+
std::cmp::Ordering::Less
45+
} else {
46+
std::cmp::Ordering::Greater
47+
}
48+
}
49+
let pos = match self
50+
.sorted
51+
.binary_search_by(|element| compare_tsn(*element, tsn))
52+
{
53+
Ok(pos) => pos,
54+
Err(pos) => pos,
55+
};
56+
self.sorted.insert(pos, tsn);
57+
}
4758
}
4859

4960
/// push pushes a payload data. If the payload data is already in our queue or
@@ -57,19 +68,14 @@ impl PayloadQueue {
5768
return false;
5869
}
5970

60-
self.n_bytes += p.user_data.len();
61-
self.sorted.push(p.tsn);
62-
self.chunk_map.insert(p.tsn, p);
63-
self.length.fetch_add(1, Ordering::SeqCst);
64-
self.update_sorted_keys();
65-
71+
self.push_no_check(p);
6672
true
6773
}
6874

6975
/// pop pops only if the oldest chunk's TSN matches the given TSN.
7076
pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
71-
if !self.sorted.is_empty() && tsn == self.sorted[0] {
72-
self.sorted.remove(0);
77+
if Some(&tsn) == self.sorted.front() {
78+
self.sorted.pop_front();
7379
if let Some(c) = self.chunk_map.remove(&tsn) {
7480
self.length.fetch_sub(1, Ordering::SeqCst);
7581
self.n_bytes -= c.user_data.len();
@@ -149,7 +155,7 @@ impl PayloadQueue {
149155
}
150156

151157
pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
152-
self.sorted.last()
158+
self.sorted.back()
153159
}
154160

155161
pub(crate) fn mark_all_to_retrasmit(&mut self) {

0 commit comments

Comments
 (0)