Skip to content

Make socket reuseable when binding to avoid errors #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
mod config;
mod server;

use log::{error, info};
use std::env;
use std::io::{Result};
use std::io::Result;
use std::io::{Error, ErrorKind::Other};
use std::process::exit;
use std::sync::{Arc, mpsc};
use std::sync::atomic::AtomicBool;
use log::{error, info};

use std::sync::{mpsc, Arc};

fn main() -> Result<()> {
env::set_var("RUST_LOG", "debug");
Expand All @@ -19,23 +18,33 @@ fn main() -> Result<()> {
}
return Ok(());
}
if config::ARGS.lock().unwrap().interface.len() < 1 {
if config::ARGS.lock().unwrap().interface.is_empty() {
error!("No interface specified");
exit(1);
}

let list = server::ADDR_LIST.iter()
.map(|(n, a)| format!("\"{}\": {}", n, a.to_string()))
let list = server::ADDR_LIST
.iter()
.map(|(n, a)| format!("\"{}\": {}", n, a))
.collect::<Vec<_>>()
.join("\n");
info!("The following mdns information for the network card is repeated:\n{}", list);
info!(
"The following mdns information for the network card is repeated:\n{}",
list
);

if server::ADDR_LIST.len() == 1 {
error!("Only one ip addr found!");
exit(1);
}

let server_done = Arc::new(AtomicBool::new(false));

let mut threads = Vec::new();
let (tx, rx) = mpsc::channel();
match server::receiver(tx, Arc::clone(&server_done)) {
Ok(handler) => threads.push(handler),
Err(err) => Err(Error::new(Other, err))?
Err(err) => Err(Error::new(Other, err))?,
}
match server::announcer(rx, Arc::clone(&server_done)) {
Ok(handler) => threads.push(handler),
Expand All @@ -49,4 +58,4 @@ fn main() -> Result<()> {
thread.join().unwrap();
}
Ok(())
}
}
73 changes: 45 additions & 28 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::net::{Ipv4Addr, SocketAddr, IpAddr};
use std::io::{Result};
use crate::config;
use if_addrs2::get_if_addrs;
use lazy_static::lazy_static;
use log::{error, info};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::io::Result;
use std::io::{Error, ErrorKind::Other};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{Receiver, Sender};
use std::{thread};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread;
use std::thread::JoinHandle;
use if_addrs2::get_if_addrs;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use lazy_static::lazy_static;
use log::{error, info};
use crate::config;

lazy_static! {
static ref MDNS_ADDR: SocketAddr = "224.0.0.251:5353".parse::<SocketAddr>().unwrap();
Expand All @@ -23,7 +23,7 @@ lazy_static! {
fn bind_multicast(socket: &Socket, addr: SocketAddr) -> Result<()> {
let addr = match addr {
SocketAddr::V4(_addr) => addr,
SocketAddr::V6(_addr) => Err(Error::new(Other, "IPv6 is not supported"))?
SocketAddr::V6(_addr) => Err(Error::new(Other, "IPv6 is not supported"))?,
};
socket.bind(&socket2::SockAddr::from(addr))
}
Expand All @@ -34,6 +34,7 @@ fn bind_multicast(socket: &Socket, addr: SocketAddr) -> Result<()> {
/// On unixes we bind to the multicast address, which causes multicast packets to be filtered
#[cfg(unix)]
fn bind_multicast(socket: &Socket, addr: SocketAddr) -> Result<()> {
let _ = socket.set_reuse_port(true);
socket.bind(&socket2::SockAddr::from(addr))
}

Expand All @@ -56,6 +57,7 @@ fn get_address_list() -> Result<Vec<(String, IpAddr)>> {
.iter()
.filter(|iface| !iface.is_loopback())
.filter(|iface| -> bool { config::ARGS.lock().unwrap().interface.contains(&iface.name) })
.filter(|iface| iface.ip().is_ipv4())
.map(|iface| (iface.name.clone(), iface.ip()))
.collect())
}
Expand All @@ -75,10 +77,13 @@ fn join_multicast(socket: &Socket, multiaddr: &Ipv4Addr) -> Result<()> {
}

// receive mdns message
pub(crate) fn receiver(tx: Sender<(Box<[u8]>, SockAddr)>, server_done: Arc<AtomicBool>) -> Result<JoinHandle<()>> {
pub(crate) fn receiver(
tx: Sender<(Box<[u8]>, SockAddr)>,
server_done: Arc<AtomicBool>,
) -> Result<JoinHandle<()>> {
let socket = new_socket()?;
match *MDNS_ADDR {
SocketAddr::V4(addr) => join_multicast(&socket, &addr.ip())?,
SocketAddr::V4(addr) => join_multicast(&socket, addr.ip())?,
_ => Err(Error::new(Other, "IPv6 is not supported"))?,
};
socket.set_reuse_address(true)?;
Expand All @@ -91,9 +96,13 @@ pub(crate) fn receiver(tx: Sender<(Box<[u8]>, SockAddr)>, server_done: Arc<Atomi
match socket.recv_from(&mut buf) {
Ok((len, remote_addr)) => {
let data = buf[..len].to_vec().into_boxed_slice();
tx.send((data, remote_addr)).unwrap_or_else(|err| {
error!("send msg to chan failed: {err}")
});
if let Err(err) = tx.send((data, remote_addr)) {
error!("send msg to chan failed: {err}");
if err.to_string().contains("closed") {
server_done.store(true, std::sync::atomic::Ordering::Relaxed);
return;
}
}
}
Err(err) => {
error!("recv msg error: {err}");
Expand All @@ -108,10 +117,13 @@ pub(crate) fn receiver(tx: Sender<(Box<[u8]>, SockAddr)>, server_done: Arc<Atomi
}

// send mdns message to other interface
pub(crate) fn announcer(rx: Receiver<(Box<[u8]>, SockAddr)>, server_done: Arc<AtomicBool>) -> Result<JoinHandle<()>> {
pub(crate) fn announcer(
rx: Receiver<(Box<[u8]>, SockAddr)>,
server_done: Arc<AtomicBool>,
) -> Result<JoinHandle<()>> {
let socket = new_socket()?;
match *MDNS_ADDR {
SocketAddr::V4(addr) => join_multicast(&socket, &addr.ip())?,
SocketAddr::V4(addr) => join_multicast(&socket, addr.ip())?,
_ => Err(Error::new(Other, "IPv6 is not supported"))?,
};
socket.set_reuse_address(true)?;
Expand All @@ -129,9 +141,15 @@ pub(crate) fn announcer(rx: Receiver<(Box<[u8]>, SockAddr)>, server_done: Arc<At
break;
}
};
if !data
.windows(8)
.any(|window| window == [98, 108, 105, 122, 122, 97, 114, 100])
{
continue;
}
info!(
"[server]: got data: {} from: {}",
String::from_utf8_lossy(&*data),
data.len(),
remote_addr.as_std().unwrap()
);
for (_, addr) in &*ADDR_LIST {
Expand All @@ -153,24 +171,23 @@ pub(crate) fn announcer(rx: Receiver<(Box<[u8]>, SockAddr)>, server_done: Arc<At
}
match if_addrs {
IpAddr::V4(_addr) => {
socket.set_multicast_if_v4(_addr).unwrap();
if let Err(err) = socket.set_multicast_if_v4(_addr) {
error!("socket set_multicast_if_v4 err: {}", err);
break;
}
}
IpAddr::V6(_) => {
error!("server error");
IpAddr::V6(_addr) => {
error!("ip v6 addr is found: {}", _addr);
break;
}
}
match socket.send_to(&*data, &SockAddr::from(*MDNS_ADDR)) {
Err(_err) => {
error!("server error {_err}");
break;
}
_ => ()
if let Err(_err) = socket.send_to(&data, &SockAddr::from(*MDNS_ADDR)) {
error!("server error {_err}");
break;
};
}
}
server_done.store(true, std::sync::atomic::Ordering::Relaxed)
});
Ok(handler)
}