Skip to content

sctp: remove unnecessary drops and use precise padding #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sctp/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Packet {
let padding_needed = get_padding_size(writer.len());
if padding_needed != 0 {
// padding needed if < 4 because we pad to 4
writer.extend_from_slice(&[0u8; 16][..padding_needed]);
writer.extend_from_slice(&[0u8; PADDING_MULTIPLE][..padding_needed]);
}
}

Expand Down
38 changes: 18 additions & 20 deletions sctp/src/queue/pending_queue.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use tokio::sync::{Mutex, Semaphore};
use util::sync::RwLock;

use std::{
collections::VecDeque,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};

use tokio::sync::{Mutex, Semaphore};
use util::sync::RwLock;

use crate::chunk::chunk_payload_data::ChunkPayloadData;

// TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>

// Some tests push a lot of data before starting to process any data...
#[cfg(test)]
const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024;
/// Maximum size of the pending queue, in bytes.
#[cfg(not(test))]
const QUEUE_BYTES_LIMIT: usize = 128 * 1024;
/// Total user data size, beyound which the packet will be split into chunks. The chunks will be
/// added to the pending queue one by one.
const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3;

/// Basic queue for either ordered or unordered chunks.
pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;

// TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>

/// A queue for both ordered and unordered chunks.
#[derive(Debug)]
pub(crate) struct PendingQueue {
Expand All @@ -39,14 +49,6 @@ impl Default for PendingQueue {
}
}

// Some tests push a lot of data before starting to process any data...
#[cfg(test)]
const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024;
#[cfg(not(test))]
const QUEUE_BYTES_LIMIT: usize = 128 * 1024;

const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3;

impl PendingQueue {
pub(crate) fn new() -> Self {
Self {
Expand All @@ -66,7 +68,7 @@ impl PendingQueue {
let user_data_len = c.user_data.len();

{
let sem_lock = self.semaphore_lock.lock().await;
let _sem_lock = self.semaphore_lock.lock().await;
let permits = self.semaphore.acquire_many(user_data_len as u32).await;
// unwrap ok because we never close the semaphore unless we have dropped self
permits.unwrap().forget();
Expand All @@ -78,7 +80,6 @@ impl PendingQueue {
let mut ordered_queue = self.ordered_queue.write();
ordered_queue.push_back(c);
}
drop(sem_lock);
}

self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
Expand All @@ -100,22 +101,21 @@ impl PendingQueue {
if total_user_data_len >= QUEUE_APPEND_LARGE {
self.append_large(chunks).await
} else {
let sem_lock = self.semaphore_lock.lock().await;
let _sem_lock = self.semaphore_lock.lock().await;
let permits = self
.semaphore
.acquire_many(total_user_data_len as u32)
.await;
// unwrap ok because we never close the semaphore unless we have dropped self
permits.unwrap().forget();
self.append_unlimited(chunks, total_user_data_len);
drop(sem_lock);
}
}

// If this is a very large message we append chunks one by one to allow progress while we are appending
async fn append_large(&self, chunks: Vec<ChunkPayloadData>) {
// lock this for the whole duration
let sem_lock = self.semaphore_lock.lock().await;
let _sem_lock = self.semaphore_lock.lock().await;

for chunk in chunks.into_iter() {
let user_data_len = chunk.user_data.len();
Expand All @@ -133,8 +133,6 @@ impl PendingQueue {
self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
self.queue_len.fetch_add(1, Ordering::SeqCst);
}

drop(sem_lock);
}

/// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held
Expand Down
2 changes: 1 addition & 1 deletion sctp/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use crc::{Crc, CRC_32_ISCSI};

const PADDING_MULTIPLE: usize = 4;
pub(crate) const PADDING_MULTIPLE: usize = 4;

pub(crate) fn get_padding_size(len: usize) -> usize {
(PADDING_MULTIPLE - (len % PADDING_MULTIPLE)) % PADDING_MULTIPLE
Expand Down