Skip to content

Commit 08b15c5

Browse files
committed
Sending and receiving packets
1 parent 2c0c177 commit 08b15c5

File tree

3 files changed

+89
-6
lines changed

3 files changed

+89
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ stunclient = "0.1.1"
1717
turnclient = "0.1.0"
1818
spin_sleep = "0.3.7"
1919
tokio-timer = "0.2.10"
20+
byteorder = "1.3.1"
2021

2122
[replace]
2223
"stun_codec:0.1.10" = {path = "/mnt/src/git/stun_codec"}

src/main.rs

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ extern crate turnclient;
99
extern crate tokio;
1010
extern crate spin_sleep;
1111
extern crate tokio_timer;
12+
extern crate byteorder;
1213

1314
use std::time::{Duration,Instant};
1415
use std::net::SocketAddr;
1516
use structopt::StructOpt;
17+
use std::sync::Arc;
1618

1719
use futures::{Async, Future, Poll};
1820
use futures::{Stream, Sink};
@@ -54,7 +56,7 @@ struct Opt {
5456

5557
/// Packets per second
5658
#[structopt(long="pps", default_value="5")]
57-
delay_between_packets: u32,
59+
packets_per_second: u32,
5860

5961
/// Experiment duration, seconds
6062
#[structopt(short="d", long="duration", default_value="5")]
@@ -74,11 +76,67 @@ enum ServeTurnEventOrShutdown {
7476
Shutdown,
7577
}
7678

79+
fn sending_thread(
80+
udp: Arc<std::net::UdpSocket>,
81+
packet_size: usize,
82+
packets_per_second: u32,
83+
duration_seconds: u64,
84+
destinations: Vec<SocketAddr>,
85+
time_base: Instant,
86+
) {
87+
let sleeper = spin_sleep::SpinSleeper::default();
88+
sleeper.sleep_ns(500_000_000); // to allow receiver to warm up
89+
let start = Instant::now();
90+
let step = Duration::from_secs(1) / packets_per_second;
91+
let n = packets_per_second * (duration_seconds as u32);
92+
93+
use byteorder::{BE,ByteOrder};
94+
95+
let mut buf = vec![0; packet_size];
96+
97+
let mut totalctr : u32 = 0;
98+
99+
for i in 0..n {
100+
let deadline = start + step * i;
101+
let now = Instant::now();
102+
let delta = now - time_base;
103+
104+
BE::write_u64(&mut buf[0..8], delta.as_secs());
105+
BE::write_u32(&mut buf[8..12], delta.subsec_nanos());
106+
107+
if now < deadline {
108+
sleeper.sleep(deadline - now);
109+
}
110+
111+
let udp = &*udp;
112+
for addr in &destinations {
113+
BE::write_u32(&mut buf[12..16], totalctr);
114+
udp.send_to(&buf[..], addr).expect("UDP send_to failed");
115+
totalctr+=1;
116+
}
117+
}
118+
}
119+
120+
fn receiving_thread(
121+
udp: Arc<std::net::UdpSocket>,
122+
duration_seconds: u64,
123+
packet_size: usize,
124+
time_base: Instant,
125+
) {
126+
let mut buf = vec![0; packet_size];
127+
loop {
128+
let (_len, _addr) = udp.recv_from(&mut buf[..]).expect("Failed to receive packet");
129+
println!("Received a packet from {}", _addr);
130+
}
131+
}
132+
77133
fn main() -> Result<(), Error> {
78134
let opt = Opt::from_args();
79135

80136
let local_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
81137
let probing_udp = std::net::UdpSocket::bind(local_addr)?;
138+
let probing_udp = Arc::new(probing_udp);
139+
let time_base = Instant::now();
82140

83141
// Phase 1: Query my own external address
84142
let extaddr = stunclient::StunClient::new(opt.server)
@@ -91,6 +149,8 @@ fn main() -> Result<(), Error> {
91149
let k = opt.num_connections;
92150
let duration = opt.duration;
93151
let delay_after_stopping_sender = opt.delay_after_stopping_sender;
152+
let packet_size = opt.packet_size;
153+
let pps = opt.packets_per_second;
94154

95155
let clientstream = futures::stream::repeat::<_,Error>(
96156
(opt.server, opt.username, opt.password),
@@ -149,18 +209,39 @@ fn main() -> Result<(), Error> {
149209
let clienthandles = clienthandles.and_then(move |x|{
150210
let (init_handles, shutdown_handles) : (Vec<_>, Vec<_>) = x.into_iter().unzip();
151211
futures::future::join_all(init_handles)
152-
.and_then(|h| {
153-
eprintln!("Allocated {} TURN clients", h.len());
154-
futures::future::ok(())
155-
})
156212
.map_err(|_e|Error::from("Oneshot error"))
157-
.and_then(move |()| {
213+
.and_then(move |destinations| {
214+
eprintln!("Allocated {} TURN clients", destinations.len());
215+
// Phase 3: Starting sender and receiver
216+
217+
let probing_udp2 = probing_udp.clone();
218+
std::thread::spawn(move || {
219+
sending_thread(
220+
probing_udp2,
221+
packet_size,
222+
pps,
223+
duration,
224+
destinations,
225+
time_base,
226+
);
227+
});
228+
std::thread::spawn(move || {
229+
receiving_thread(
230+
probing_udp,
231+
duration,
232+
packet_size,
233+
time_base,
234+
);
235+
});
236+
158237
tokio_timer::Delay::new(
159238
Instant::now() +
160239
Duration::from_secs(
161240
duration + delay_after_stopping_sender
162241
)
163242
).and_then(|()| {
243+
// Phase 4: Stopping
244+
164245
eprintln!("Stopping TURN clients");
165246
for sh in shutdown_handles {
166247
sh.send(());

0 commit comments

Comments
 (0)