Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;

const DEFAULT_ADDR: &str = "127.0.0.1:6142";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
Expand Down Expand Up @@ -70,7 +72,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:6142".to_string());
.unwrap_or_else(|| DEFAULT_ADDR.to_string());

// Bind a TCP listener to the socket address.
//
Expand All @@ -88,9 +90,9 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Spawn our handler to be run asynchronously.
tokio::spawn(async move {
tracing::debug!("accepted connection");
tracing::debug!("accepted connection from {}", addr);
if let Err(e) = process(state, stream, addr).await {
tracing::info!("an error occurred; error = {:?}", e);
tracing::warn!("Connection from {} failed: {:?}", addr, e);
}
});
}
Expand Down Expand Up @@ -138,12 +140,26 @@ impl Shared {

/// Send a `LineCodec` encoded message to every peer, except
/// for the sender.
///
/// This function also cleans up disconnected peers automatically.
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
let _ = peer.1.send(message.into());
let mut failed_peers = Vec::new();
let message = message.to_string(); // Clone once for all sends

for (addr, tx) in self.peers.iter() {
if *addr != sender {
if tx.send(message.clone()).is_err() {
// Receiver has been dropped, mark for removal
failed_peers.push(*addr);
}
}
}

// Clean up disconnected peers
for addr in failed_peers {
self.peers.remove(&addr);
tracing::debug!("Removed disconnected peer: {}", addr);
}
}
}

Expand Down Expand Up @@ -203,7 +219,10 @@ async fn process(
tokio::select! {
// A message was received from a peer. Send it to the current user.
Some(msg) = peer.rx.recv() => {
peer.lines.send(&msg).await?;
if let Err(e) = peer.lines.send(&msg).await {
tracing::error!("Failed to send message to {}: {:?}", username, e);
break;
}
}
result = peer.lines.next() => match result {
// A message was received from the current user, we should
Expand All @@ -221,6 +240,7 @@ async fn process(
username,
e
);
break;
}
// The stream has been exhausted.
None => break,
Expand Down
2 changes: 1 addition & 1 deletion examples/connect-tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn connect(
//BytesMut into Bytes
Ok(i) => future::ready(Some(i.freeze())),
Err(e) => {
println!("failed to read from socket; error={e}");
eprintln!("failed to read from socket; error={e}");
future::ready(None)
}
})
Expand Down
37 changes: 22 additions & 15 deletions examples/echo-tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ use tokio::net::TcpListener;
use std::env;
use std::error::Error;

const DEFAULT_ADDR: &str = "127.0.0.1:8080";
const BUFFER_SIZE: usize = 4096;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
.unwrap_or_else(|| DEFAULT_ADDR.to_string());

// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
Expand All @@ -44,7 +47,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
// Asynchronously wait for an inbound socket.
let (mut socket, _) = listener.accept().await?;
let (mut socket, addr) = listener.accept().await?;

// And this is where much of the magic of this server happens. We
// crucially want all clients to make progress concurrently, rather than
Expand All @@ -55,23 +58,27 @@ async fn main() -> Result<(), Box<dyn Error>> {
// which will allow all of our clients to be processed concurrently.

tokio::spawn(async move {
let mut buf = vec![0; 1024];
let mut buf = vec![0; BUFFER_SIZE];

// In a loop, read data from the socket and write the data back.
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");

if n == 0 {
return;
match socket.read(&mut buf).await {
Ok(0) => {
// Connection closed by peer
return;
}
Ok(n) => {
// Write the data back. If writing fails, log the error and exit.
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("Failed to write to socket {}: {}", addr, e);
return;
}
}
Err(e) => {
eprintln!("Failed to read from socket {}: {}", addr, e);
return;
}
}

socket
.write_all(&buf[0..n])
.await
.expect("failed to write data to socket");
}
});
}
Expand Down