Skip to content

1936 fix tcp tls server #1983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: io_uring_tpc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ normal = ["tracing-appender"]
[package.metadata.cargo-machete]
ignored = ["rust-s3"]

[[bin]]
name = "iggy-server"
path = "src/main.rs"

[features]
default = ["mimalloc"]
tokio-console = ["dep:console-subscriber", "tokio/tracing"]
Expand All @@ -37,6 +41,7 @@ mimalloc = ["dep:mimalloc"]
[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
async-channel = { workspace = true }
async_zip = { workspace = true }
axum = { workspace = true }
axum-server = { workspace = true }
Expand All @@ -47,6 +52,7 @@ bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
console-subscriber = { workspace = true, optional = true }
core_affinity = "0.8.0"
crossbeam = { workspace = true }
ctrlc = { version = "3.4", features = ["termination"] }
dashmap = { workspace = true }
Expand All @@ -57,14 +63,12 @@ error_set = { version = "0.8.5", features = ["tracing"] }
figlet-rs = { workspace = true }
figment = { version = "0.10.19", features = ["toml", "env"] }
flume = { workspace = true }
async-channel = { workspace = true }
futures = { workspace = true }
hash32 = "1.0.0"
human-repr = { workspace = true }
iggy_common = { workspace = true }
jsonwebtoken = "9.3.1"
socket2 = "0.5.10"
lending-iterator = "0.1.7"
hash32 = "1.0.0"
mimalloc = { workspace = true, optional = true }
moka = { version = "0.12.10", features = ["future"] }
nix = { version = "0.30", features = ["fs"] }
Expand Down Expand Up @@ -99,8 +103,9 @@ rustls = { workspace = true }
rustls-pemfile = "2.2.0"
serde = { workspace = true }
serde_with = { workspace = true }
static-toml = "1.3.0"
sharded_queue = "2.0.1"
socket2 = "0.5.10"
static-toml = "1.3.0"
strum = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
Expand Down Expand Up @@ -131,7 +136,3 @@ vergen-git2 = { version = "1.0.7", features = [
[dev-dependencies]
mockall = { workspace = true }
serial_test = { workspace = true }

[[bin]]
name = "iggy-server"
path = "src/main.rs"
6 changes: 3 additions & 3 deletions core/server/src/binary/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use bytes::BytesMut;
use compio::buf::{IoBuf, IoBufMut};
use compio::io::{AsyncReadExt, AsyncWriteExt};
use compio::net::TcpStream;
use compio::tls::TlsStream;
use iggy_common::IggyError;
use nix::libc;
use quinn::{RecvStream, SendStream};
Expand Down Expand Up @@ -85,9 +86,8 @@ impl SenderKind {
Self::Tcp(TcpSender { stream })
}

pub fn get_tcp_tls_sender(stream: ()) -> Self {
todo!();
//Self::TcpTls(TcpTlsSender { stream })
pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
Self::TcpTls(TcpTlsSender { stream })
}

pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion core/server/src/server_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use error_set::error_set;
use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError};
use std::array::TryFromSliceError;
use tokio::io;
use std::io;

error_set!(
ServerError = ConfigError || ArchiverError || ConnectionError || LogError || CompatError || QuicError;
Expand Down
2 changes: 1 addition & 1 deletion core/server/src/tcp/tcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> {
info!("Initializing {server_name} server...");
// TODO: Fixme -- storing addr of the server inside of the config for integration tests...
match shard.config.tcp.tls.enabled {
true => tcp_tls_listener::start(server_name, addr, socket, shard.clone()).await?,
true => tcp_tls_listener::start(server_name, addr.into(), socket, shard.clone()).await?,
false => tcp_listener::start(server_name, addr, socket_config, shard.clone()).await?,
};

Expand Down
83 changes: 59 additions & 24 deletions core/server/src/tcp/tcp_tls_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
* under the License.
*/

use compio::net::{TcpListener, TcpOpts};
use compio::tls::TlsAcceptor;

use crate::configs::tcp::TcpSocketConfig;
use crate::binary::sender::SenderKind;
use crate::configs::tcp::TcpTlsConfig;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::clients::client_manager::Transport;
Expand All @@ -27,51 +30,81 @@ use iggy_common::IggyError;
use rustls::ServerConfig;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls_pemfile::{certs, private_key};
use socket2::Socket;
use socket2::{SockAddr, Socket};
use std::io::BufReader;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};

async fn create_listener(
addr: SocketAddr,
config: &TcpSocketConfig,
) -> Result<TcpListener, std::io::Error> {
// Required by the thread-per-core model...
// We create bunch of sockets on different threads, that bind to exactly the same address and port.
let opts = TcpOpts::new().reuse_port(true).reuse_port(true);
let opts = if config.override_defaults {
let recv_buffer_size = config
.recv_buffer_size
.as_bytes_u64()
.try_into()
.expect("Failed to parse recv_buffer_size for TCP socket");

let send_buffer_size = config
.send_buffer_size
.as_bytes_u64()
.try_into()
.expect("Failed to parse send_buffer_size for TCP socket");

opts.recv_buffer_size(recv_buffer_size)
.send_buffer_size(send_buffer_size)
.keepalive(config.keepalive)
.linger(config.linger.get_duration())
.nodelay(config.nodelay)
} else {
opts
};
TcpListener::bind_with_options(addr, opts).await
}

pub(crate) async fn start(
server_name: &'static str,
addr: SocketAddr,
// Hmmmm... Does this argument need to be removed?
socket: Socket,
shard: Rc<IggyShard>,
) -> Result<(), IggyError> {
/*
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let config = &shard.config.tcp.tls;

let (certs, key) =
if config.self_signed && !std::path::Path::new(&config.cert_file).exists() {
info!("Generating self-signed certificate for TCP TLS server");
generate_self_signed_cert()
.unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}"))
} else {
info!(
"Loading certificates from cert_file: {}, key_file: {}",
config.cert_file, config.key_file
);
load_certificates(&config.cert_file, &config.key_file)
.unwrap_or_else(|e| panic!("Failed to load certificates: {e}"))
};
let sock_config = &shard.config.tcp.socket;

let (certs, key) =
if config.self_signed && !std::path::Path::new(&config.cert_file).exists() {
info!("Generating self-signed certificate for TCP TLS server");
generate_self_signed_cert()
.unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}"))
} else {
info!(
"Loading certificates from cert_file: {}, key_file: {}",
config.cert_file, config.key_file
);
load_certificates(&config.cert_file, &config.key_file)
.unwrap_or_else(|e| panic!("Failed to load certificates: {e}"))
};

let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}"));

let acceptor = TlsAcceptor::from(Arc::new(server_config));
let acceptor = Arc::new(acceptor);

socket
.bind(&addr.into())
.unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': {e}",));
let listener = create_listener(addr, sock_config).await
.unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr:?}': {e}"));

let listener: std::net::TcpListener = socket.into();
let listener = monoio::net::TcpListener::from_std(listener).unwrap();
info!("{server_name} server has started on: {:?}", addr);

loop {
Expand All @@ -80,7 +113,7 @@ pub(crate) async fn start(
if shard.is_shutting_down() {
return;
}
monoio::time::sleep(Duration::from_millis(100)).await;
compio::time::sleep(Duration::from_millis(100)).await;
}
};

Expand Down Expand Up @@ -115,13 +148,16 @@ pub(crate) async fn start(
let conn_stop_receiver = shard_clone.task_registry.add_connection(client_id);

let shard_for_conn = shard_clone.clone();

shard_clone.task_registry.spawn_tracked(async move {
match acceptor.accept(stream).await {
Ok(tls_stream) => {
let mut sender = SenderKind::get_tcp_tls_sender(tls_stream.into());

if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await {
handle_error(error);
}

shard_for_conn.task_registry.remove_connection(&client_id);

if let Err(error) = sender.shutdown().await {
Expand All @@ -146,7 +182,6 @@ pub(crate) async fn start(
}
}
}
*/
Ok(())
}

Expand Down
10 changes: 2 additions & 8 deletions core/server/src/tcp/tcp_tls_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,34 @@ use bytes::BytesMut;
use compio::buf::IoBufMut;
use compio::io::AsyncWrite;
use compio::net::TcpStream;
use compio::tls::TlsStream;
use error_set::ErrContext;
use iggy_common::IggyError;
//use tokio_rustls::server::TlsStream;
use nix::libc;

#[derive(Debug)]
pub struct TcpTlsSender {
pub(crate) stream: TcpStream,
pub(crate) stream: TlsStream<TcpStream>,
}

impl Sender for TcpTlsSender {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) {
todo!();
sender::read(&mut self.stream, buffer).await
}

async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
todo!();
sender::send_empty_ok_response(&mut self.stream).await
}

async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> {
todo!();
sender::send_ok_response(&mut self.stream, payload).await
}

async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> {
todo!();
sender::send_error_response(&mut self.stream, error).await
}

async fn shutdown(&mut self) -> Result<(), ServerError> {
todo!();
self.stream
.shutdown()
.await
Expand All @@ -71,7 +66,6 @@ impl Sender for TcpTlsSender {
length: &[u8],
slices: Vec<PooledBuffer>,
) -> Result<(), IggyError> {
todo!();
sender::send_ok_response_vectored(&mut self.stream, length, slices).await
}
}