Skip to content

Commit 8cf686f

Browse files
authored
Add test for ActiveSamplingRequest (#6307)
* Add test for ActiveSamplingRequest * Fix the column_indexes field from the requested ones to the responded ones * Fix clippy errors * Move tests to tests.rs * Fix unused import * Fix clippy error * Merge branch 'unstable' into fork/add-test-for-active-sampling-request # Conflicts: # beacon_node/network/Cargo.toml # beacon_node/network/src/sync/sampling.rs * Merge branch 'unstable' into fork/add-test-for-active-sampling-request
1 parent f3a5e25 commit 8cf686f

File tree

5 files changed

+145
-1
lines changed

5 files changed

+145
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ eth2 = { workspace = true }
1515
gossipsub = { workspace = true }
1616
eth2_network_config = { workspace = true }
1717
kzg = { workspace = true }
18+
bls = { workspace = true }
1819

1920
[dependencies]
2021
alloy-primitives = { workspace = true }

beacon_node/network/src/sync/block_lookups/tests.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,13 @@ impl TestRig {
310310
);
311311
}
312312

313+
fn expect_active_sampling(&mut self, block_root: &Hash256) {
314+
assert!(self
315+
.sync_manager
316+
.active_sampling_requests()
317+
.contains(block_root));
318+
}
319+
313320
fn expect_clean_finished_sampling(&mut self) {
314321
self.expect_empty_network();
315322
self.expect_sampling_result_work();
@@ -1090,6 +1097,11 @@ impl TestRig {
10901097
.unwrap_or_else(|e| panic!("Expected sampling result work: {e}"))
10911098
}
10921099

1100+
fn expect_no_work_event(&mut self) {
1101+
self.drain_processor_rx();
1102+
assert!(self.network_rx_queue.is_empty());
1103+
}
1104+
10931105
fn expect_no_penalty_for(&mut self, peer_id: PeerId) {
10941106
self.drain_network_rx();
10951107
let downscore_events = self
@@ -1290,6 +1302,16 @@ impl TestRig {
12901302
imported: false,
12911303
});
12921304
}
1305+
1306+
fn assert_sampling_request_status(
1307+
&self,
1308+
block_root: Hash256,
1309+
ongoing: &Vec<ColumnIndex>,
1310+
no_peers: &Vec<ColumnIndex>,
1311+
) {
1312+
self.sync_manager
1313+
.assert_sampling_request_status(block_root, ongoing, no_peers)
1314+
}
12931315
}
12941316

12951317
#[test]
@@ -2023,6 +2045,76 @@ fn sampling_avoid_retrying_same_peer() {
20232045
r.expect_empty_network();
20242046
}
20252047

2048+
#[test]
2049+
fn sampling_batch_requests() {
2050+
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
2051+
return;
2052+
};
2053+
let _supernode = r.new_connected_supernode_peer();
2054+
let (block, data_columns) = r.rand_block_and_data_columns();
2055+
let block_root = block.canonical_root();
2056+
r.trigger_sample_block(block_root, block.slot());
2057+
2058+
// Retrieve the sample request, which should be batched.
2059+
let (sync_request_id, column_indexes) = r
2060+
.expect_only_data_columns_by_root_requests(block_root, 1)
2061+
.pop()
2062+
.unwrap();
2063+
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
2064+
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
2065+
2066+
// Resolve the request.
2067+
r.complete_valid_sampling_column_requests(
2068+
vec![(sync_request_id, column_indexes.clone())],
2069+
data_columns,
2070+
);
2071+
r.expect_clean_finished_sampling();
2072+
}
2073+
2074+
#[test]
2075+
fn sampling_batch_requests_not_enough_responses_returned() {
2076+
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
2077+
return;
2078+
};
2079+
let _supernode = r.new_connected_supernode_peer();
2080+
let (block, data_columns) = r.rand_block_and_data_columns();
2081+
let block_root = block.canonical_root();
2082+
r.trigger_sample_block(block_root, block.slot());
2083+
2084+
// Retrieve the sample request, which should be batched.
2085+
let (sync_request_id, column_indexes) = r
2086+
.expect_only_data_columns_by_root_requests(block_root, 1)
2087+
.pop()
2088+
.unwrap();
2089+
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
2090+
2091+
// The request status should be set to Sampling.
2092+
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
2093+
2094+
// Split the indexes to simulate the case where the supernode doesn't have the requested column.
2095+
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
2096+
column_indexes.split_at(1);
2097+
2098+
// Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs.
2099+
let data_columns_to_complete = data_columns
2100+
.iter()
2101+
.filter(|d| column_indexes_to_complete.contains(&d.index))
2102+
.cloned()
2103+
.collect::<Vec<_>>();
2104+
r.complete_data_columns_by_root_request(
2105+
(sync_request_id, column_indexes.clone()),
2106+
&data_columns_to_complete,
2107+
);
2108+
2109+
// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
2110+
r.assert_sampling_request_status(block_root, &vec![], &column_indexes);
2111+
2112+
// The sampling request stalls.
2113+
r.expect_empty_network();
2114+
r.expect_no_work_event();
2115+
r.expect_active_sampling(&block_root);
2116+
}
2117+
20262118
#[test]
20272119
fn custody_lookup_happy_path() {
20282120
let Some(mut r) = TestRig::test_setup_after_peerdas() else {

beacon_node/network/src/sync/manager.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ use std::time::Duration;
7171
use tokio::sync::mpsc;
7272
use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
7373

74+
#[cfg(test)]
75+
use types::ColumnIndex;
76+
7477
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
7578
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
7679
/// fully sync'd peer.
@@ -334,6 +337,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
334337
self.sampling.active_sampling_requests()
335338
}
336339

340+
#[cfg(test)]
341+
pub(crate) fn assert_sampling_request_status(
342+
&self,
343+
block_root: Hash256,
344+
ongoing: &Vec<ColumnIndex>,
345+
no_peers: &Vec<ColumnIndex>,
346+
) {
347+
self.sampling
348+
.assert_sampling_request_status(block_root, ongoing, no_peers);
349+
}
350+
337351
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
338352
self.network.network_globals()
339353
}

beacon_node/network/src/sync/peer_sampling.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ impl<T: BeaconChainTypes> Sampling<T> {
4242
self.requests.values().map(|r| r.block_root).collect()
4343
}
4444

45+
#[cfg(test)]
46+
pub fn assert_sampling_request_status(
47+
&self,
48+
block_root: Hash256,
49+
ongoing: &Vec<ColumnIndex>,
50+
no_peers: &Vec<ColumnIndex>,
51+
) {
52+
let requester = SamplingRequester::ImportedBlock(block_root);
53+
let active_sampling_request = self.requests.get(&requester).unwrap();
54+
active_sampling_request.assert_sampling_request_status(ongoing, no_peers);
55+
}
56+
4557
/// Create a new sampling request for a known block
4658
///
4759
/// ### Returns
@@ -220,6 +232,21 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
220232
}
221233
}
222234

235+
#[cfg(test)]
236+
pub fn assert_sampling_request_status(
237+
&self,
238+
ongoing: &Vec<ColumnIndex>,
239+
no_peers: &Vec<ColumnIndex>,
240+
) {
241+
for idx in ongoing {
242+
assert!(self.column_requests.get(idx).unwrap().is_ongoing());
243+
}
244+
245+
for idx in no_peers {
246+
assert!(self.column_requests.get(idx).unwrap().is_no_peers());
247+
}
248+
}
249+
223250
/// Insert a downloaded column into an active sampling request. Then make progress on the
224251
/// entire request.
225252
///
@@ -253,10 +280,14 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
253280

254281
match resp {
255282
Ok((mut resp_data_columns, seen_timestamp)) => {
283+
let resp_column_indexes = resp_data_columns
284+
.iter()
285+
.map(|r| r.index)
286+
.collect::<Vec<_>>();
256287
debug!(self.log,
257288
"Sample download success";
258289
"block_root" => %self.block_root,
259-
"column_indexes" => ?column_indexes,
290+
"column_indexes" => ?resp_column_indexes,
260291
"count" => resp_data_columns.len()
261292
);
262293
metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]);
@@ -598,6 +629,11 @@ mod request {
598629
}
599630
}
600631

632+
#[cfg(test)]
633+
pub(crate) fn is_no_peers(&self) -> bool {
634+
matches!(self.status, Status::NoPeers)
635+
}
636+
601637
pub(crate) fn choose_peer<T: BeaconChainTypes>(
602638
&mut self,
603639
cx: &SyncNetworkContext<T>,

0 commit comments

Comments
 (0)