diff --git a/sctp/src/packet.rs b/sctp/src/packet.rs index fd0b14273..611c15e18 100644 --- a/sctp/src/packet.rs +++ b/sctp/src/packet.rs @@ -165,7 +165,7 @@ impl Packet { let padding_needed = get_padding_size(writer.len()); if padding_needed != 0 { // padding needed if < 4 because we pad to 4 - writer.extend_from_slice(&[0u8; 16][..padding_needed]); + writer.extend_from_slice(&[0u8; PADDING_MULTIPLE][..padding_needed]); } } diff --git a/sctp/src/queue/pending_queue.rs b/sctp/src/queue/pending_queue.rs index 78a25f0d9..65c29f190 100644 --- a/sctp/src/queue/pending_queue.rs +++ b/sctp/src/queue/pending_queue.rs @@ -1,18 +1,28 @@ +use tokio::sync::{Mutex, Semaphore}; +use util::sync::RwLock; + use std::{ collections::VecDeque, sync::atomic::{AtomicBool, AtomicUsize, Ordering}, }; -use tokio::sync::{Mutex, Semaphore}; -use util::sync::RwLock; - use crate::chunk::chunk_payload_data::ChunkPayloadData; +// TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex + +// Some tests push a lot of data before starting to process any data... +#[cfg(test)] +const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024; +/// Maximum size of the pending queue, in bytes. +#[cfg(not(test))] +const QUEUE_BYTES_LIMIT: usize = 128 * 1024; +/// Total user data size, beyound which the packet will be split into chunks. The chunks will be +/// added to the pending queue one by one. +const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3; + /// Basic queue for either ordered or unordered chunks. pub(crate) type PendingBaseQueue = VecDeque; -// TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex - /// A queue for both ordered and unordered chunks. #[derive(Debug)] pub(crate) struct PendingQueue { @@ -39,14 +49,6 @@ impl Default for PendingQueue { } } -// Some tests push a lot of data before starting to process any data... -#[cfg(test)] -const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024; -#[cfg(not(test))] -const QUEUE_BYTES_LIMIT: usize = 128 * 1024; - -const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3; - impl PendingQueue { pub(crate) fn new() -> Self { Self { @@ -66,7 +68,7 @@ impl PendingQueue { let user_data_len = c.user_data.len(); { - let sem_lock = self.semaphore_lock.lock().await; + let _sem_lock = self.semaphore_lock.lock().await; let permits = self.semaphore.acquire_many(user_data_len as u32).await; // unwrap ok because we never close the semaphore unless we have dropped self permits.unwrap().forget(); @@ -78,7 +80,6 @@ impl PendingQueue { let mut ordered_queue = self.ordered_queue.write(); ordered_queue.push_back(c); } - drop(sem_lock); } self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); @@ -100,7 +101,7 @@ impl PendingQueue { if total_user_data_len >= QUEUE_APPEND_LARGE { self.append_large(chunks).await } else { - let sem_lock = self.semaphore_lock.lock().await; + let _sem_lock = self.semaphore_lock.lock().await; let permits = self .semaphore .acquire_many(total_user_data_len as u32) @@ -108,14 +109,13 @@ impl PendingQueue { // unwrap ok because we never close the semaphore unless we have dropped self permits.unwrap().forget(); self.append_unlimited(chunks, total_user_data_len); - drop(sem_lock); } } // If this is a very large message we append chunks one by one to allow progress while we are appending async fn append_large(&self, chunks: Vec) { // lock this for the whole duration - let sem_lock = self.semaphore_lock.lock().await; + let _sem_lock = self.semaphore_lock.lock().await; for chunk in chunks.into_iter() { let user_data_len = chunk.user_data.len(); @@ -133,8 +133,6 @@ impl PendingQueue { self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); self.queue_len.fetch_add(1, Ordering::SeqCst); } - - drop(sem_lock); } /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held diff --git a/sctp/src/util.rs b/sctp/src/util.rs index 9dc730cf9..259803aab 100644 --- a/sctp/src/util.rs +++ b/sctp/src/util.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use crc::{Crc, CRC_32_ISCSI}; -const PADDING_MULTIPLE: usize = 4; +pub(crate) const PADDING_MULTIPLE: usize = 4; pub(crate) fn get_padding_size(len: usize) -> usize { (PADDING_MULTIPLE - (len % PADDING_MULTIPLE)) % PADDING_MULTIPLE