Skip to content

Commit 6f99fa3

Browse files
authored
Add load tests and prevent partial message read (#20)
* Add streaming API for addRoutes * Add list routes and list peers streaming APIs * Implement basic load test * Implement logger * Fix load test * Prevent partial message read * Make load test realistic * Support RibType for ListRoutes API
1 parent 18bc551 commit 6f99fa3

37 files changed

+2789
-426
lines changed

Cargo.lock

Lines changed: 95 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ members = [
33
"core",
44
"daemon",
55
"cli",
6+
"loadtests",
67
]
78
resolver = "2"
89

Makefile

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: all build clean run test fmt release setup
1+
.PHONY: all build clean run test fmt release setup loadtest
22

33
all: build
44

@@ -19,7 +19,12 @@ run: setup
1919

2020
test: setup
2121
cargo clippy --all-targets --all-features -- -D warnings
22-
cargo test
22+
cargo test --workspace --exclude loadtests
2323

2424
fmt:
2525
cargo fmt
26+
27+
loadtest: setup
28+
@echo "Building bgpggd and running load tests..."
29+
cargo build --bin bgpggd
30+
cargo test -p loadtests --release -- --nocapture --test-threads=1

cli/src/commands/global.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ pub async fn handle(addr: String, cmd: GlobalCommands) -> Result<(), Box<dyn std
102102
},
103103

104104
GlobalCommands::Info => {
105-
let (listen_addr, listen_port) = client.get_server_info().await?;
105+
let (listen_addr, listen_port, num_routes) = client.get_server_info().await?;
106106
println!("BGP Server Information:");
107107
println!(" Listen Address: {}:{}", listen_addr, listen_port);
108+
println!(" Routes in RIB: {}", num_routes);
108109
}
109110

110111
GlobalCommands::Summary => {

config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ asn: 65000
22
listen_addr: "127.0.0.1:1790"
33
router_id: "1.1.1.1"
44
grpc_listen_addr: "[::1]:50051"
5+
log_level: "info"

core/src/bgp/msg.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ pub async fn read_bgp_message<R: AsyncReadExt + Unpin>(
134134
mut stream: R,
135135
) -> Result<BgpMessage, ParserError> {
136136
let mut header_buffer = [0u8; BGP_HEADER_SIZE_BYTES];
137+
137138
stream
138139
.read_exact(&mut header_buffer)
139140
.await

core/src/bgp/msg_notification.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crate::warn;
16-
1715
use super::msg::{Message, MessageType};
1816
use super::utils::ParserError;
1917

@@ -171,28 +169,40 @@ impl BgpError {
171169
ErrorCode::MessageHeaderError => {
172170
let err = MessageHeaderError::from(err_sub_code);
173171
if matches!(err, MessageHeaderError::Unknown(_)) {
174-
warn!("received NOTIFICATION with unknown MessageHeaderError subcode", "subcode" => err_sub_code);
172+
eprintln!(
173+
"Warning: received NOTIFICATION with unknown MessageHeaderError subcode, subcode={:?}",
174+
err_sub_code
175+
);
175176
}
176177
BgpError::MessageHeaderError(err)
177178
}
178179
ErrorCode::OpenMessageError => {
179180
let err = OpenMessageError::from(err_sub_code);
180181
if matches!(err, OpenMessageError::Unknown(_)) {
181-
warn!("received NOTIFICATION with unknown OpenMessageError subcode", "subcode" => err_sub_code);
182+
eprintln!(
183+
"Warning: received NOTIFICATION with unknown OpenMessageError subcode, subcode={:?}",
184+
err_sub_code
185+
);
182186
}
183187
BgpError::OpenMessageError(err)
184188
}
185189
ErrorCode::UpdateMessageError => {
186190
let err = UpdateMessageError::from(err_sub_code);
187191
if matches!(err, UpdateMessageError::Unknown(_)) {
188-
warn!("received NOTIFICATION with unknown UpdateMessageError subcode", "subcode" => err_sub_code);
192+
eprintln!(
193+
"Warning: received NOTIFICATION with unknown UpdateMessageError subcode, subcode={:?}",
194+
err_sub_code
195+
);
189196
}
190197
BgpError::UpdateMessageError(err)
191198
}
192199
ErrorCode::HoldTimerExpired => BgpError::HoldTimerExpired,
193200
ErrorCode::FiniteStateMachineError => BgpError::FiniteStateMachineError,
194201
ErrorCode::Unknown => {
195-
warn!("received NOTIFICATION with unknown error code", "code" => err_code);
202+
eprintln!(
203+
"Warning: received NOTIFICATION with unknown error code, code={:?}",
204+
err_code
205+
);
196206
BgpError::Unknown
197207
}
198208
}
@@ -280,7 +290,10 @@ impl NotificationMessage {
280290
// RFC 4271: errors in NOTIFICATION messages cannot be reported back via NOTIFICATION,
281291
// so we log locally and return Unknown for malformed messages.
282292
if bytes.len() < 2 {
283-
warn!("received malformed NOTIFICATION message", "len" => bytes.len());
293+
eprintln!(
294+
"Warning: received malformed NOTIFICATION message, len={:?}",
295+
bytes.len()
296+
);
284297
return NotificationMessage {
285298
error: BgpError::Unknown,
286299
data: bytes,

core/src/bgp/utils.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crate::warn;
1615
use std::error::Error;
1716
use std::fmt::{Display, Formatter};
1817
use std::net::{Ipv4Addr, Ipv6Addr};
@@ -99,7 +98,10 @@ pub fn parse_nlri_list(bytes: &[u8]) -> Result<Vec<IpNetwork>, ParserError> {
9998

10099
// Semantic check: skip multicast prefixes (224.0.0.0/4)
101100
if net.is_multicast() {
102-
warn!("ignoring multicast NLRI prefix", "prefix" => format!("{:?}", net));
101+
eprintln!(
102+
"Warning: ignoring multicast NLRI prefix, prefix={:?}",
103+
format!("{:?}", net)
104+
);
103105
cursor += byte_len;
104106
continue;
105107
}

core/src/bmp/destination.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
// limitations under the License.
1414

1515
use super::msg::BmpMessage;
16+
use crate::log::Logger;
1617
use crate::{error, info};
1718
use std::net::SocketAddr;
19+
use std::sync::Arc;
1820
use std::time::Duration;
1921
use tokio::io::AsyncWriteExt;
2022
use tokio::net::TcpStream;
@@ -24,14 +26,16 @@ pub struct BmpTcpClient {
2426
addr: SocketAddr,
2527
conn: Option<TcpStream>,
2628
reconnect_delay: Duration,
29+
logger: Arc<Logger>,
2730
}
2831

2932
impl BmpTcpClient {
30-
pub fn new(addr: SocketAddr) -> Self {
33+
pub fn new(addr: SocketAddr, logger: Arc<Logger>) -> Self {
3134
Self {
3235
addr,
3336
conn: None,
3437
reconnect_delay: Duration::from_secs(1),
38+
logger,
3539
}
3640
}
3741

@@ -46,13 +50,13 @@ impl BmpTcpClient {
4650

4751
match TcpStream::connect(self.addr).await {
4852
Ok(stream) => {
49-
info!("BMP: Connected to collector", "addr" => &self.addr);
53+
info!(&self.logger, "BMP: Connected to collector", "addr" => &self.addr);
5054
self.conn = Some(stream);
5155
self.reconnect_delay = Duration::from_secs(1); // Reset delay
5256
true
5357
}
5458
Err(e) => {
55-
error!("BMP: Failed to connect to collector", "addr" => &self.addr, "error" => &e.to_string());
59+
error!(&self.logger, "BMP: Failed to connect to collector", "addr" => &self.addr, "error" => &e.to_string());
5660
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (max)
5761
self.reconnect_delay =
5862
std::cmp::min(self.reconnect_delay * 2, Duration::from_secs(30));
@@ -70,7 +74,7 @@ impl BmpTcpClient {
7074
match conn.write_all(data).await {
7175
Ok(_) => true,
7276
Err(e) => {
73-
error!("BMP: Write failed", "addr" => &self.addr, "error" => &e.to_string());
77+
error!(&self.logger, "BMP: Write failed", "addr" => &self.addr, "error" => &e.to_string());
7478
self.conn = None; // Trigger reconnect
7579
false
7680
}

0 commit comments

Comments
 (0)