diff --git a/Cargo.lock b/Cargo.lock index fce0c0664..b4230bea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7302,6 +7302,7 @@ dependencies = [ "clap", "compio", "console-subscriber", + "core_affinity", "crossbeam", "ctrlc", "dashmap", @@ -7346,7 +7347,6 @@ dependencies = [ "tempfile", "thiserror 2.0.12", "tokio", - "tokio-rustls", "tokio-util", "toml", "tower-http", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index b171652df..59578b8e7 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -28,6 +28,10 @@ normal = ["tracing-appender"] [package.metadata.cargo-machete] ignored = ["rust-s3"] +[[bin]] +name = "iggy-server" +path = "src/main.rs" + [features] default = ["mimalloc"] tokio-console = ["dep:console-subscriber", "tokio/tracing"] @@ -37,6 +41,7 @@ mimalloc = ["dep:mimalloc"] [dependencies] ahash = { workspace = true } anyhow = { workspace = true } +async-channel = { workspace = true } async_zip = { workspace = true } axum = { workspace = true } axum-server = { workspace = true } @@ -47,6 +52,7 @@ bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } console-subscriber = { workspace = true, optional = true } +core_affinity = "0.8.0" crossbeam = { workspace = true } ctrlc = { version = "3.4", features = ["termination"] } dashmap = { workspace = true } @@ -57,14 +63,12 @@ error_set = { version = "0.8.5", features = ["tracing"] } figlet-rs = { workspace = true } figment = { version = "0.10.19", features = ["toml", "env"] } flume = { workspace = true } -async-channel = { workspace = true } futures = { workspace = true } +hash32 = "1.0.0" human-repr = { workspace = true } iggy_common = { workspace = true } jsonwebtoken = "9.3.1" -socket2 = "0.5.10" lending-iterator = "0.1.7" -hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } nix = { version = "0.30", features = ["fs"] } @@ -99,8 +103,9 @@ rustls = { workspace = true } rustls-pemfile = "2.2.0" serde = { workspace = true } serde_with = { workspace = true } -static-toml = "1.3.0" sharded_queue = "2.0.1" +socket2 = "0.5.10" +static-toml = "1.3.0" strum = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } @@ -131,7 +136,3 @@ vergen-git2 = { version = "1.0.7", features = [ [dev-dependencies] mockall = { workspace = true } serial_test = { workspace = true } - -[[bin]] -name = "iggy-server" -path = "src/main.rs" diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 96bed3d5c..b3b2a361e 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -27,6 +27,7 @@ use bytes::BytesMut; use compio::buf::{IoBuf, IoBufMut}; use compio::io::{AsyncReadExt, AsyncWriteExt}; use compio::net::TcpStream; +use compio::tls::TlsStream; use iggy_common::IggyError; use nix::libc; use quinn::{RecvStream, SendStream}; @@ -85,9 +86,8 @@ impl SenderKind { Self::Tcp(TcpSender { stream }) } - pub fn get_tcp_tls_sender(stream: ()) -> Self { - todo!(); - //Self::TcpTls(TcpTlsSender { stream }) + pub fn get_tcp_tls_sender(stream: TlsStream) -> Self { + Self::TcpTls(TcpTlsSender { stream }) } pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) -> Self { diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index 16e2ef92b..b9829e20a 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -19,7 +19,7 @@ use error_set::error_set; use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError}; use std::array::TryFromSliceError; -use tokio::io; +use std::io; error_set!( ServerError = ConfigError || ArchiverError || ConnectionError || LogError || CompatError || QuicError; diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index 6b299bff6..2d767504d 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -43,7 +43,7 @@ pub async fn spawn_tcp_server(shard: Rc) -> Result<(), IggyError> { info!("Initializing {server_name} server..."); // TODO: Fixme -- storing addr of the server inside of the config for integration tests... match shard.config.tcp.tls.enabled { - true => tcp_tls_listener::start(server_name, addr, socket, shard.clone()).await?, + true => tcp_tls_listener::start(server_name, addr.into(), socket, shard.clone()).await?, false => tcp_listener::start(server_name, addr, socket_config, shard.clone()).await?, }; diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index b71ecfb84..0bc967d9f 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -16,8 +16,11 @@ * under the License. */ +use compio::net::{TcpListener, TcpOpts}; +use compio::tls::TlsAcceptor; + +use crate::configs::tcp::TcpSocketConfig; use crate::binary::sender::SenderKind; -use crate::configs::tcp::TcpTlsConfig; use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; @@ -27,7 +30,7 @@ use iggy_common::IggyError; use rustls::ServerConfig; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls_pemfile::{certs, private_key}; -use socket2::Socket; +use socket2::{SockAddr, Socket}; use std::io::BufReader; use std::net::SocketAddr; use std::rc::Rc; @@ -35,29 +38,61 @@ use std::sync::Arc; use std::time::Duration; use tracing::{error, info}; +async fn create_listener( + addr: SocketAddr, + config: &TcpSocketConfig, +) -> Result { + // Required by the thread-per-core model... + // We create bunch of sockets on different threads, that bind to exactly the same address and port. + let opts = TcpOpts::new().reuse_port(true).reuse_port(true); + let opts = if config.override_defaults { + let recv_buffer_size = config + .recv_buffer_size + .as_bytes_u64() + .try_into() + .expect("Failed to parse recv_buffer_size for TCP socket"); + + let send_buffer_size = config + .send_buffer_size + .as_bytes_u64() + .try_into() + .expect("Failed to parse send_buffer_size for TCP socket"); + + opts.recv_buffer_size(recv_buffer_size) + .send_buffer_size(send_buffer_size) + .keepalive(config.keepalive) + .linger(config.linger.get_duration()) + .nodelay(config.nodelay) + } else { + opts + }; + TcpListener::bind_with_options(addr, opts).await +} + pub(crate) async fn start( server_name: &'static str, addr: SocketAddr, + // Hmmmm... Does this argument need to be removed? socket: Socket, shard: Rc, ) -> Result<(), IggyError> { - /* let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let config = &shard.config.tcp.tls; - - let (certs, key) = - if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { - info!("Generating self-signed certificate for TCP TLS server"); - generate_self_signed_cert() - .unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}")) - } else { - info!( - "Loading certificates from cert_file: {}, key_file: {}", - config.cert_file, config.key_file - ); - load_certificates(&config.cert_file, &config.key_file) - .unwrap_or_else(|e| panic!("Failed to load certificates: {e}")) - }; + let sock_config = &shard.config.tcp.socket; + + let (certs, key) = + if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { + info!("Generating self-signed certificate for TCP TLS server"); + generate_self_signed_cert() + .unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}")) + } else { + info!( + "Loading certificates from cert_file: {}, key_file: {}", + config.cert_file, config.key_file + ); + load_certificates(&config.cert_file, &config.key_file) + .unwrap_or_else(|e| panic!("Failed to load certificates: {e}")) + }; let server_config = ServerConfig::builder() .with_no_client_auth() @@ -65,13 +100,11 @@ pub(crate) async fn start( .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}")); let acceptor = TlsAcceptor::from(Arc::new(server_config)); + let acceptor = Arc::new(acceptor); - socket - .bind(&addr.into()) - .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': {e}",)); + let listener = create_listener(addr, sock_config).await + .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr:?}': {e}")); - let listener: std::net::TcpListener = socket.into(); - let listener = monoio::net::TcpListener::from_std(listener).unwrap(); info!("{server_name} server has started on: {:?}", addr); loop { @@ -80,7 +113,7 @@ pub(crate) async fn start( if shard.is_shutting_down() { return; } - monoio::time::sleep(Duration::from_millis(100)).await; + compio::time::sleep(Duration::from_millis(100)).await; } }; @@ -115,13 +148,16 @@ pub(crate) async fn start( let conn_stop_receiver = shard_clone.task_registry.add_connection(client_id); let shard_for_conn = shard_clone.clone(); + shard_clone.task_registry.spawn_tracked(async move { match acceptor.accept(stream).await { Ok(tls_stream) => { let mut sender = SenderKind::get_tcp_tls_sender(tls_stream.into()); + if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { handle_error(error); } + shard_for_conn.task_registry.remove_connection(&client_id); if let Err(error) = sender.shutdown().await { @@ -146,7 +182,6 @@ pub(crate) async fn start( } } } - */ Ok(()) } diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 8dbb3972b..def9e8847 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -24,39 +24,34 @@ use bytes::BytesMut; use compio::buf::IoBufMut; use compio::io::AsyncWrite; use compio::net::TcpStream; +use compio::tls::TlsStream; use error_set::ErrContext; use iggy_common::IggyError; -//use tokio_rustls::server::TlsStream; use nix::libc; #[derive(Debug)] pub struct TcpTlsSender { - pub(crate) stream: TcpStream, + pub(crate) stream: TlsStream, } impl Sender for TcpTlsSender { async fn read(&mut self, buffer: B) -> (Result, B) { - todo!(); sender::read(&mut self.stream, buffer).await } async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { - todo!(); sender::send_empty_ok_response(&mut self.stream).await } async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { - todo!(); sender::send_ok_response(&mut self.stream, payload).await } async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { - todo!(); sender::send_error_response(&mut self.stream, error).await } async fn shutdown(&mut self) -> Result<(), ServerError> { - todo!(); self.stream .shutdown() .await @@ -71,7 +66,6 @@ impl Sender for TcpTlsSender { length: &[u8], slices: Vec, ) -> Result<(), IggyError> { - todo!(); sender::send_ok_response_vectored(&mut self.stream, length, slices).await } }