Skip to content

Commit e21b31e

Browse files
committed
More beacon chain test fixes.
1 parent 492c1c6 commit e21b31e

File tree

4 files changed

+85
-17
lines changed

4 files changed

+85
-17
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep
3434
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
3535
use crate::graffiti_calculator::GraffitiCalculator;
3636
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
37+
use crate::kzg_utils::reconstruct_blobs;
3738
use crate::light_client_finality_update_verification::{
3839
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
3940
};
@@ -1260,6 +1261,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
12601261
self.store.get_data_columns(block_root).map_err(Error::from)
12611262
}
12621263

1264+
/// Returns the blobs at the given root, if any.
1265+
///
1266+
/// Uses the `block.epoch()` to determine whether to retrieve blobs or columns from the store.
1267+
///
1268+
/// If at least 50% of columns are retrieved, blobs will be reconstructed and returned,
1269+
/// otherwise an error `InsufficientColumnsToReconstructBlobs` is returned.
1270+
///
1271+
/// ## Errors
1272+
/// May return a database error.
1273+
pub fn get_or_reconstruct_blobs(
1274+
&self,
1275+
block_root: &Hash256,
1276+
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
1277+
let Some(block) = self.store.get_blinded_block(block_root)? else {
1278+
return Ok(None);
1279+
};
1280+
1281+
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
1282+
if let Some(columns) = self.store.get_data_columns(block_root)? {
1283+
let num_required_columns = self.spec.number_of_columns / 2;
1284+
let blobs_available = columns.len() >= num_required_columns as usize;
1285+
if blobs_available {
1286+
reconstruct_blobs(&self.kzg, &columns, None, &block, &self.spec)
1287+
.map(Some)
1288+
.map_err(Error::FailedToReconstructBlobs)
1289+
} else {
1290+
Err(Error::InsufficientColumnsToReconstructBlobs {
1291+
columns_found: columns.len(),
1292+
})
1293+
}
1294+
} else {
1295+
Ok(None)
1296+
}
1297+
} else {
1298+
self.get_blobs(block_root).map(|b| b.blobs())
1299+
}
1300+
}
1301+
12631302
/// Returns the data columns at the given root, if any.
12641303
///
12651304
/// ## Errors

beacon_node/beacon_chain/src/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ pub enum BeaconChainError {
226226
EmptyRpcCustodyColumns,
227227
AttestationError(AttestationError),
228228
AttestationCommitteeIndexNotSet,
229+
InsufficientColumnsToReconstructBlobs {
230+
columns_found: usize,
231+
},
232+
FailedToReconstructBlobs(String),
229233
}
230234

231235
easy_from_to!(SlotProcessingError, BeaconChainError);

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ pub struct Builder<T: BeaconChainTypes> {
231231
mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
232232
testing_slot_clock: Option<TestingSlotClock>,
233233
validator_monitor_config: Option<ValidatorMonitorConfig>,
234+
import_all_data_columns: bool,
234235
runtime: TestRuntime,
235236
log: Logger,
236237
}
@@ -373,6 +374,7 @@ where
373374
mock_execution_layer: None,
374375
testing_slot_clock: None,
375376
validator_monitor_config: None,
377+
import_all_data_columns: false,
376378
runtime,
377379
log,
378380
}
@@ -465,6 +467,11 @@ where
465467
self
466468
}
467469

470+
pub fn import_all_data_columns(mut self, import_all_data_columns: bool) -> Self {
471+
self.import_all_data_columns = import_all_data_columns;
472+
self
473+
}
474+
468475
pub fn execution_layer_from_url(mut self, url: &str) -> Self {
469476
assert!(
470477
self.execution_layer.is_none(),
@@ -582,6 +589,7 @@ where
582589
.expect("should build dummy backend")
583590
.shutdown_sender(shutdown_tx)
584591
.chain_config(chain_config)
592+
.import_all_data_columns(self.import_all_data_columns)
585593
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
586594
log.clone(),
587595
5,

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#![cfg(not(debug_assertions))]
22

33
use beacon_chain::attestation_verification::Error as AttnError;
4-
use beacon_chain::block_verification_types::RpcBlock;
54
use beacon_chain::builder::BeaconChainBuilder;
65
use beacon_chain::data_availability_checker::AvailableBlock;
76
use beacon_chain::schema_change::migrate_schema;
@@ -82,13 +81,26 @@ fn get_harness(
8281
reconstruct_historic_states: true,
8382
..ChainConfig::default()
8483
};
85-
get_harness_generic(store, validator_count, chain_config)
84+
get_harness_generic(store, validator_count, chain_config, false)
85+
}
86+
87+
fn get_harness_import_all_data_columns(
88+
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
89+
validator_count: usize,
90+
) -> TestHarness {
91+
// Most tests expect to retain historic states, so we use this as the default.
92+
let chain_config = ChainConfig {
93+
reconstruct_historic_states: true,
94+
..ChainConfig::default()
95+
};
96+
get_harness_generic(store, validator_count, chain_config, true)
8697
}
8798

8899
fn get_harness_generic(
89100
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
90101
validator_count: usize,
91102
chain_config: ChainConfig,
103+
import_all_data_columns: bool,
92104
) -> TestHarness {
93105
let harness = TestHarness::builder(MinimalEthSpec)
94106
.spec(store.get_chain_spec().clone())
@@ -97,6 +109,7 @@ fn get_harness_generic(
97109
.fresh_disk_store(store)
98110
.mock_execution_layer()
99111
.chain_config(chain_config)
112+
.import_all_data_columns(import_all_data_columns)
100113
.build();
101114
harness.advance_slot();
102115
harness
@@ -2286,7 +2299,12 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
22862299

22872300
let temp1 = tempdir().unwrap();
22882301
let full_store = get_store(&temp1);
2289-
let harness = get_harness(full_store.clone(), LOW_VALIDATOR_COUNT);
2302+
2303+
// Run a supernode so the node has full blobs stored.
2304+
// This may not be required in the future if we end up implementing downloading checkpoint
2305+
// blobs from p2p peers:
2306+
// https://github.com/sigp/lighthouse/issues/6837
2307+
let harness = get_harness_import_all_data_columns(full_store.clone(), LOW_VALIDATOR_COUNT);
22902308

22912309
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
22922310

@@ -2319,10 +2337,8 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
23192337
.unwrap();
23202338
let wss_blobs_opt = harness
23212339
.chain
2322-
.store
2323-
.get_blobs(&wss_block_root)
2324-
.unwrap()
2325-
.blobs();
2340+
.get_or_reconstruct_blobs(&wss_block_root)
2341+
.unwrap();
23262342
let wss_state = full_store
23272343
.get_state(&wss_state_root, Some(checkpoint_slot))
23282344
.unwrap()
@@ -2395,14 +2411,16 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
23952411
.await
23962412
.unwrap()
23972413
.unwrap();
2414+
// This test may break in the future if we no longer store the full checkpoint data columns.
23982415
let store_wss_blobs_opt = beacon_chain
2399-
.store
2400-
.get_blobs(&wss_block_root)
2401-
.unwrap()
2402-
.blobs();
2416+
.get_or_reconstruct_blobs(&wss_block_root)
2417+
.unwrap();
24032418

24042419
assert_eq!(store_wss_block, wss_block);
2405-
assert_eq!(store_wss_blobs_opt, wss_blobs_opt);
2420+
// TODO(fulu): Remove this condition once #6760 (PeerDAS checkpoint sync) is merged.
2421+
if !beacon_chain.spec.is_peer_das_scheduled() {
2422+
assert_eq!(store_wss_blobs_opt, wss_blobs_opt);
2423+
}
24062424

24072425
// Apply blocks forward to reach head.
24082426
let chain_dump = harness.chain.chain_dump().unwrap();
@@ -2418,15 +2436,15 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
24182436
.await
24192437
.unwrap()
24202438
.unwrap();
2421-
let blobs = harness.chain.get_blobs(&block_root).expect("blobs").blobs();
2439+
24222440
let slot = full_block.slot();
24232441
let state_root = full_block.state_root();
24242442

24252443
beacon_chain.slot_clock.set_slot(slot.as_u64());
24262444
beacon_chain
24272445
.process_block(
24282446
full_block.canonical_root(),
2429-
RpcBlock::new(Some(block_root), Arc::new(full_block), blobs).unwrap(),
2447+
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
24302448
NotifyExecutionLayer::Yes,
24312449
BlockImportSource::Lookup,
24322450
|| Ok(()),
@@ -2480,13 +2498,12 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
24802498
.await
24812499
.expect("should get block")
24822500
.expect("should get block");
2483-
let blobs = harness.chain.get_blobs(&block_root).expect("blobs").blobs();
24842501

24852502
if let MaybeAvailableBlock::Available(block) = harness
24862503
.chain
24872504
.data_availability_checker
24882505
.verify_kzg_for_rpc_block(
2489-
RpcBlock::new(Some(block_root), Arc::new(full_block), blobs).unwrap(),
2506+
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
24902507
)
24912508
.expect("should verify kzg")
24922509
{
@@ -2587,7 +2604,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
25872604
reconstruct_historic_states: false,
25882605
..ChainConfig::default()
25892606
};
2590-
let harness = get_harness_generic(store.clone(), LOW_VALIDATOR_COUNT, chain_config);
2607+
let harness = get_harness_generic(store.clone(), LOW_VALIDATOR_COUNT, chain_config, false);
25912608

25922609
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
25932610

0 commit comments

Comments
 (0)