diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index 50dc6bbe38b..357eb31b639 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -204,7 +204,7 @@ impl Command { .map_err(|_| BlockValidationError::SenderRecoveryError)?, None, )?; - block_state.write_to_db(provider_rw.tx_ref(), OriginalValuesKnown::No)?; + block_state.write_to_storage(provider_rw.tx_ref(), None, OriginalValuesKnown::No)?; let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?; let storages = provider_rw.plain_state_storages(storage_lists)?; provider_rw.insert_storage_for_hashing(storages)?; diff --git a/crates/node-core/src/init.rs b/crates/node-core/src/init.rs index de75bf9d548..6930d117105 100644 --- a/crates/node-core/src/init.rs +++ b/crates/node-core/src/init.rs @@ -153,7 +153,7 @@ pub fn insert_genesis_state( 0, ); - bundle.write_to_db(tx, OriginalValuesKnown::Yes)?; + bundle.write_to_storage(tx, None, OriginalValuesKnown::Yes)?; Ok(()) } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 42225b1a093..1d5c6260e49 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -15,13 +15,15 @@ use reth_primitives::{ stage::{ CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId, }, - BlockNumber, Header, PruneModes, U256, + BlockNumber, Header, PruneModes, SnapshotSegment, U256, }; use reth_provider::{ + providers::{SnapshotProviderRWRefMut, SnapshotWriter}, BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, TransactionVariant, }; use std::{ + cmp::Ordering, ops::RangeInclusive, time::{Duration, Instant}, }; @@ -121,6 +123,12 @@ impl ExecutionStage { let max_block = input.target(); let prune_modes = self.adjust_prune_modes(provider, start_block, max_block)?; + // We only use static files for Receipts, if there is no receipt pruning of any kind. + let mut snapshotter = None; + if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() { + snapshotter = Some(prepare_snapshotter(provider, start_block)?); + } + // Build executor let mut executor = self.executor_factory.with_state(LatestStateProviderRef::new(provider.tx_ref())); @@ -195,7 +203,7 @@ impl ExecutionStage { let time = Instant::now(); // write output - state.write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes)?; + state.write_to_storage(provider.tx_ref(), snapshotter, OriginalValuesKnown::Yes)?; let db_write_duration = time.elapsed(); debug!( target: "sync::stages::execution", @@ -421,17 +429,37 @@ impl Stage for ExecutionStage { let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint(); // Unwind all receipts for transactions in the block range - let mut cursor = tx.cursor_write::()?; - let mut reverse_walker = cursor.walk_back(None)?; + if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() { + // We only use static files for Receipts, if there is no receipt pruning of any kind. - while let Some(Ok((tx_number, receipt))) = reverse_walker.next() { - if tx_number < first_tx_num { - break - } - reverse_walker.delete_current()?; + // prepare_snapshotter does a consistency check that will unwind static files if the + // expected highest receipt in the files is higher than the database. Which is + // essentially what happens here when we unwind this stage. + let _snapshotter = prepare_snapshotter(provider, *range.start())?; + // Update the checkpoint. if let Some(stage_checkpoint) = stage_checkpoint.as_mut() { - stage_checkpoint.progress.processed -= receipt.cumulative_gas_used; + for block_number in range { + stage_checkpoint.progress.processed -= provider + .block_by_number(block_number)? + .ok_or_else(|| ProviderError::BlockNotFound(block_number.into()))? + .gas_used; + } + } + } else { + // We database for Receipts, if there is any kind of receipt pruning/filtering. + let mut cursor = tx.cursor_write::()?; + let mut reverse_walker = cursor.walk_back(None)?; + + while let Some(Ok((tx_number, receipt))) = reverse_walker.next() { + if tx_number < first_tx_num { + break + } + reverse_walker.delete_current()?; + + if let Some(stage_checkpoint) = stage_checkpoint.as_mut() { + stage_checkpoint.progress.processed -= receipt.cumulative_gas_used; + } } } @@ -494,21 +522,81 @@ impl ExecutionStageThresholds { } } +/// Returns a `SnapshotProviderRWRefMut` snapshotter after performing a consistency check. +/// +/// This function compares the highest receipt number recorded in the database with that in the +/// static file to detect any discrepancies due to unexpected shutdowns or database rollbacks. **If +/// the height in the static file is higher**, it rolls back (unwinds) the static file. +/// **Conversely, if the height in the database is lower**, it triggers a rollback in the database +/// (by returning [`StageError`]) until the heights in both the database and static file match. +fn prepare_snapshotter<'a, 'b, DB: Database>( + provider: &'b DatabaseProviderRW, + start_block: u64, +) -> Result, StageError> +where + 'b: 'a, +{ + // Get next expected receipt number + let tx = provider.tx_ref(); + let next_receipt_num = tx + .cursor_read::()? + .seek_exact(start_block)? + .map(|(_, value)| value.first_tx_num) + .unwrap_or(0); + + // Get next expected receipt number in static files + let snapshot_provider = provider.snapshot_provider().expect("should exist"); + let mut snapshotter = snapshot_provider.get_writer(start_block, SnapshotSegment::Receipts)?; + let next_snapshot_receipt_num = snapshotter + .get_highest_snapshot_tx(SnapshotSegment::Receipts) + .map(|num| num + 1) + .unwrap_or(0); + + // Check if we had any unexpected shutdown after committing to static files, but + // NOT committing to database. + match next_snapshot_receipt_num.cmp(&next_receipt_num) { + Ordering::Greater => snapshotter.prune_receipts( + next_snapshot_receipt_num - next_receipt_num, + start_block.saturating_sub(1), + )?, + Ordering::Less => { + let last_block = snapshot_provider + .get_highest_snapshot_block(SnapshotSegment::Receipts) + .unwrap_or(0); + + let missing_block = Box::new( + tx.get::(last_block + 1)?.unwrap_or_default().seal_slow(), + ); + + return Err(StageError::MissingSnapshotData { + block: missing_block, + segment: SnapshotSegment::Receipts, + }) + } + Ordering::Equal => {} + } + Ok(snapshotter) +} + #[cfg(test)] mod tests { use super::*; use crate::test_utils::TestStageDB; use alloy_rlp::Decodable; use assert_matches::assert_matches; - use reth_db::{models::AccountBeforeTx, test_utils::create_test_rw_db}; + use reth_db::{ + models::AccountBeforeTx, + test_utils::{create_test_rw_db, create_test_snapshots_dir}, + }; use reth_interfaces::executor::BlockValidationError; use reth_primitives::{ - address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, - ChainSpecBuilder, PruneModes, SealedBlock, StorageEntry, B256, MAINNET, U256, + address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Address, + Bytecode, ChainSpecBuilder, PruneMode, PruneModes, ReceiptsLogPruneConfig, SealedBlock, + StorageEntry, B256, MAINNET, U256, }; use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider}; use reth_revm::EvmProcessorFactory; - use std::sync::Arc; + use std::{collections::BTreeMap, sync::Arc}; fn stage() -> ExecutionStage { let executor_factory = EvmProcessorFactory::new(Arc::new( @@ -665,7 +753,9 @@ mod tests { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()) + .with_snapshots(create_test_snapshots_dir()) + .unwrap(); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -700,69 +790,101 @@ mod tests { db_tx.put::(code_hash, Bytecode::new_raw(code.to_vec().into())).unwrap(); provider.commit().unwrap(); - let provider = factory.provider_rw().unwrap(); - let mut execution_stage: ExecutionStage = stage(); - let output = execution_stage.execute(&provider, input).unwrap(); - provider.commit().unwrap(); - assert_matches!(output, ExecOutput { - checkpoint: StageCheckpoint { - block_number: 1, - stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { - block_range: CheckpointBlockRange { - from: 1, - to: 1, - }, - progress: EntitiesCheckpoint { - processed, - total - } - })) - }, - done: true - } if processed == total && total == block.gas_used); - - let provider = factory.provider().unwrap(); - - // check post state - let account1 = address!("1000000000000000000000000000000000000000"); - let account1_info = - Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) }; - let account2 = address!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"); - let account2_info = Account { - balance: U256::from(0x1bc16d674ece94bau128), - nonce: 0x00, - bytecode_hash: None, - }; - let account3 = address!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"); - let account3_info = Account { - balance: U256::from(0x3635c9adc5de996b46u128), - nonce: 0x01, - bytecode_hash: None, - }; + // execute - // assert accounts - assert_eq!( - provider.basic_account(account1), - Ok(Some(account1_info)), - "Post changed of a account" - ); - assert_eq!( - provider.basic_account(account2), - Ok(Some(account2_info)), - "Post changed of a account" - ); - assert_eq!( - provider.basic_account(account3), - Ok(Some(account3_info)), - "Post changed of a account" - ); - // assert storage - // Get on dupsort would return only first value. This is good enough for this test. - assert_eq!( - provider.tx_ref().get::(account1), - Ok(Some(StorageEntry { key: B256::with_last_byte(1), value: U256::from(2) })), - "Post changed of a account" - ); + // If there is a pruning configuration, then it's forced to use the database. + // This way we test both cases. + let modes = [None, Some(PruneModes::none())]; + let random_filter = + ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); + + // Tests node with database and node with static files + for mut mode in modes { + let provider = factory.provider_rw().unwrap(); + + if let Some(mode) = &mut mode { + // Simulating a full node where we write receipts to database + mode.receipts_log_filter = random_filter.clone(); + } + + let mut execution_stage: ExecutionStage = stage(); + execution_stage.prune_modes = mode.clone().unwrap_or_default(); + + let output = execution_stage.execute(&provider, input).unwrap(); + provider.commit().unwrap(); + + assert_matches!(output, ExecOutput { + checkpoint: StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { + from: 1, + to: 1, + }, + progress: EntitiesCheckpoint { + processed, + total + } + })) + }, + done: true + } if processed == total && total == block.gas_used); + + let provider = factory.provider().unwrap(); + + // check post state + let account1 = address!("1000000000000000000000000000000000000000"); + let account1_info = + Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) }; + let account2 = address!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"); + let account2_info = Account { + balance: U256::from(0x1bc16d674ece94bau128), + nonce: 0x00, + bytecode_hash: None, + }; + let account3 = address!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"); + let account3_info = Account { + balance: U256::from(0x3635c9adc5de996b46u128), + nonce: 0x01, + bytecode_hash: None, + }; + + // assert accounts + assert_eq!( + provider.basic_account(account1), + Ok(Some(account1_info)), + "Post changed of a account" + ); + assert_eq!( + provider.basic_account(account2), + Ok(Some(account2_info)), + "Post changed of a account" + ); + assert_eq!( + provider.basic_account(account3), + Ok(Some(account3_info)), + "Post changed of a account" + ); + // assert storage + // Get on dupsort would return only first value. This is good enough for this test. + assert_eq!( + provider.tx_ref().get::(account1), + Ok(Some(StorageEntry { key: B256::with_last_byte(1), value: U256::from(2) })), + "Post changed of a account" + ); + + let provider = factory.provider_rw().unwrap(); + let mut stage = stage(); + stage.prune_modes = mode.unwrap_or_default(); + + let _result = stage + .unwind( + &provider, + UnwindInput { checkpoint: output.checkpoint, unwind_to: 0, bad_block: None }, + ) + .unwrap(); + provider.commit().unwrap(); + } } #[tokio::test] @@ -771,7 +893,9 @@ mod tests { // is merged as it has similar framework let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()) + .with_snapshots(create_test_snapshots_dir()) + .unwrap(); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -801,44 +925,77 @@ mod tests { provider.commit().unwrap(); // execute - let provider = factory.provider_rw().unwrap(); - let mut execution_stage = stage(); - let result = execution_stage.execute(&provider, input).unwrap(); - provider.commit().unwrap(); - - let provider = factory.provider_rw().unwrap(); - let mut stage = stage(); - let result = stage - .unwind( - &provider, - UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None }, - ) - .unwrap(); - - assert_matches!(result, UnwindOutput { - checkpoint: StageCheckpoint { - block_number: 0, - stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { - block_range: CheckpointBlockRange { - from: 1, - to: 1, - }, - progress: EntitiesCheckpoint { - processed: 0, - total - } - })) + let mut provider = factory.provider_rw().unwrap(); + + // If there is a pruning configuration, then it's forced to use the database. + // This way we test both cases. + let modes = [None, Some(PruneModes::none())]; + let random_filter = + ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); + + // Tests node with database and node with static files + for mut mode in modes { + if let Some(mode) = &mut mode { + // Simulating a full node where we write receipts to database + mode.receipts_log_filter = random_filter.clone(); } - } if total == block.gas_used); - // assert unwind stage - assert_eq!(provider.basic_account(acc1), Ok(Some(acc1_info)), "Pre changed of a account"); - assert_eq!(provider.basic_account(acc2), Ok(Some(acc2_info)), "Post changed of a account"); + // Test Execution + let mut execution_stage = stage(); + execution_stage.prune_modes = mode.clone().unwrap_or_default(); + + let result = execution_stage.execute(&provider, input).unwrap(); + provider.commit().unwrap(); - let miner_acc = address!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"); - assert_eq!(provider.basic_account(miner_acc), Ok(None), "Third account should be unwound"); + // Test Unwind + provider = factory.provider_rw().unwrap(); + let mut stage = stage(); + stage.prune_modes = mode.unwrap_or_default(); - assert_eq!(provider.receipt(0), Ok(None), "First receipt should be unwound"); + let result = stage + .unwind( + &provider, + UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None }, + ) + .unwrap(); + + assert_matches!(result, UnwindOutput { + checkpoint: StageCheckpoint { + block_number: 0, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { + from: 1, + to: 1, + }, + progress: EntitiesCheckpoint { + processed: 0, + total + } + })) + } + } if total == block.gas_used); + + // assert unwind stage + assert_eq!( + provider.basic_account(acc1), + Ok(Some(acc1_info)), + "Pre changed of a account" + ); + assert_eq!( + provider.basic_account(acc2), + Ok(Some(acc2_info)), + "Post changed of a account" + ); + + let miner_acc = address!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"); + assert_eq!( + provider.basic_account(miner_acc), + Ok(None), + "Third account should be unwound" + ); + + assert_eq!(provider.receipt(0), Ok(None), "First receipt should be unwound"); + } } #[tokio::test] diff --git a/crates/storage/db/src/snapshot/cursor.rs b/crates/storage/db/src/snapshot/cursor.rs index f778b39a03e..7a896c8d799 100644 --- a/crates/storage/db/src/snapshot/cursor.rs +++ b/crates/storage/db/src/snapshot/cursor.rs @@ -28,6 +28,10 @@ impl<'a> SnapshotCursor<'a> { key_or_num: KeyOrNumber<'_>, mask: usize, ) -> ProviderResult>> { + if self.jar().rows() == 0 { + return Ok(None) + } + let row = match key_or_num { KeyOrNumber::Key(k) => self.row_by_key_with_cols(k, mask), KeyOrNumber::Number(n) => { diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 05efa8c1df8..7aaf9b6eccd 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -207,6 +207,11 @@ impl NippyJar { &self.user_header } + /// Gets total rows in jar. + pub fn rows(&self) -> usize { + self.rows + } + /// Returns the size of inclusion filter pub fn filter_size(&self) -> usize { self.size() diff --git a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs index 72bba494916..72b84150915 100644 --- a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs +++ b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs @@ -1,15 +1,15 @@ -use crate::{StateChanges, StateReverts}; +use crate::{providers::SnapshotProviderRWRefMut, StateChanges, StateReverts}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::db::DatabaseError; +use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ logs_bloom, revm::compat::{into_reth_acc, into_revm_acc}, - Account, Address, BlockNumber, Bloom, Bytecode, Log, Receipt, Receipts, StorageEntry, B256, - U256, + Account, Address, BlockNumber, Bloom, Bytecode, Log, Receipt, Receipts, SnapshotSegment, + StorageEntry, B256, U256, }; use reth_trie::HashedPostState; use revm::{ @@ -285,15 +285,21 @@ impl BundleStateWithReceipts { std::mem::swap(&mut self.bundle, &mut other) } - /// Write bundle state to database. + /// Write bundle state to database and receipts to either database or static files if + /// `snapshotter` is `Some`. It should be none if there is any kind of pruning/filtering over + /// the receipts. /// /// `omit_changed_check` should be set to true of bundle has some of it data /// detached, This would make some original values not known. - pub fn write_to_db( + pub fn write_to_storage( self, tx: &TX, + mut snapshotter: Option>, is_value_known: OriginalValuesKnown, - ) -> Result<(), DatabaseError> { + ) -> ProviderResult<()> + where + TX: DbTxMut + DbTx, + { let (plain_state, reverts) = self.bundle.into_plain_state_and_reverts(is_value_known); StateReverts(reverts).write_to_db(tx, self.first_block)?; @@ -303,15 +309,22 @@ impl BundleStateWithReceipts { let mut receipts_cursor = tx.cursor_write::()?; for (idx, receipts) in self.receipts.into_iter().enumerate() { - if !receipts.is_empty() { - let block_number = self.first_block + idx as u64; - let (_, body_indices) = - bodies_cursor.seek_exact(block_number)?.unwrap_or_else(|| { - let last_available = bodies_cursor.last().ok().flatten().map(|(number, _)| number); - panic!("body indices for block {block_number} must exist. last available block number: {last_available:?}"); - }); - - let first_tx_index = body_indices.first_tx_num(); + let block_number = self.first_block + idx as u64; + let first_tx_index = bodies_cursor + .seek_exact(block_number)? + .map(|(_, indices)| indices.first_tx_num()) + .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?; + + if let Some(snapshotter) = &mut snapshotter { + // Increment block on static file header. + snapshotter.increment_block(SnapshotSegment::Receipts)?; + + for (tx_idx, receipt) in receipts.into_iter().enumerate() { + let receipt = receipt + .expect("receipt should not be filtered when saving to static files."); + snapshotter.append_receipt(first_tx_index + tx_idx as u64, receipt)?; + } + } else if !receipts.is_empty() { for (tx_idx, receipt) in receipts.into_iter().enumerate() { if let Some(receipt) = receipt { receipts_cursor.append(first_tx_index + tx_idx as u64, receipt)?; @@ -556,7 +569,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); BundleStateWithReceipts::new(state.take_bundle(), Receipts::new(), 1) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); // Check plain storage state @@ -654,7 +667,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); BundleStateWithReceipts::new(state.take_bundle(), Receipts::new(), 2) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); assert_eq!( @@ -718,7 +731,7 @@ mod tests { )])); init_state.merge_transitions(BundleRetention::Reverts); BundleStateWithReceipts::new(init_state.take_bundle(), Receipts::new(), 0) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write init bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -863,7 +876,7 @@ mod tests { let bundle = state.take_bundle(); BundleStateWithReceipts::new(bundle, Receipts::new(), 1) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider @@ -1026,7 +1039,7 @@ mod tests { )])); init_state.merge_transitions(BundleRetention::Reverts); BundleStateWithReceipts::new(init_state.take_bundle(), Receipts::new(), 0) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write init bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -1071,7 +1084,7 @@ mod tests { // Commit block #1 changes to the database. state.merge_transitions(BundleRetention::Reverts); BundleStateWithReceipts::new(state.take_bundle(), Receipts::new(), 1) - .write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes) + .write_to_storage(provider.tx_ref(), None, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 19aa70bc93b..e70ea82c06d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -108,9 +108,9 @@ pub struct DatabaseProvider { } impl DatabaseProvider { - /// Returns snapshot provider - pub fn snapshot_provider(&self) -> Option> { - self.snapshot_provider.clone() + /// Returns a snapshot provider reference + pub fn snapshot_provider(&self) -> Option<&Arc> { + self.snapshot_provider.as_ref() } } @@ -2499,7 +2499,7 @@ impl BlockWriter for DatabaseProvider { // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. - state.write_to_db(self.tx_ref(), OriginalValuesKnown::No)?; + state.write_to_storage(self.tx_ref(), None, OriginalValuesKnown::No)?; durations_recorder.record_relative(metrics::Action::InsertState); // insert hashes and intermediate merkle nodes diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 088392cad78..4e2ef3cb198 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -38,7 +38,10 @@ mod bundle_state_provider; mod chain_info; mod database; mod snapshot; -pub use snapshot::{SnapshotJarProvider, SnapshotProvider, SnapshotProviderRW, SnapshotWriter}; +pub use snapshot::{ + SnapshotJarProvider, SnapshotProvider, SnapshotProviderRW, SnapshotProviderRWRefMut, + SnapshotWriter, +}; mod state; use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource}; pub use bundle_state_provider::BundleStateProvider; diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 55163005755..c548d3dbcc1 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -1,13 +1,13 @@ -use super::{LoadedJar, SnapshotJarProvider, SnapshotProviderRW, BLOCKS_PER_SNAPSHOT}; +use super::{ + LoadedJar, SnapshotJarProvider, SnapshotProviderRW, SnapshotProviderRWRefMut, + BLOCKS_PER_SNAPSHOT, +}; use crate::{ to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, }; -use dashmap::{ - mapref::{entry::Entry as DashMapEntry, one::RefMut}, - DashMap, -}; +use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap}; use parking_lot::RwLock; use reth_db::{ codecs::CompactU256, @@ -264,6 +264,13 @@ impl SnapshotProvider { index.insert(tx_end, current_block_range.clone()); }) .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)])); + } else if let Some(1) = tx_index.get(&segment).map(|index| index.len()) { + // Only happens if we unwind all the txs/receipts from the first static file. + // Should only happen in test scenarios. + if matches!(segment, SnapshotSegment::Receipts | SnapshotSegment::Transactions) + { + tx_index.remove(&segment); + } } // Update the cached provider. @@ -420,13 +427,13 @@ pub trait SnapshotWriter { &self, block: BlockNumber, segment: SnapshotSegment, - ) -> ProviderResult>>; + ) -> ProviderResult>; /// Returns a mutable reference to a [`SnapshotProviderRW`] of the latest [`SnapshotSegment`]. fn latest_writer( &self, segment: SnapshotSegment, - ) -> ProviderResult>>; + ) -> ProviderResult>; /// Commits all changes of all [`SnapshotProviderRW`] of all [`SnapshotSegment`]. fn commit(&self) -> ProviderResult<()>; @@ -437,7 +444,7 @@ impl SnapshotWriter for Arc { &self, block: BlockNumber, segment: SnapshotSegment, - ) -> ProviderResult>> { + ) -> ProviderResult> { Ok(match self.writers.entry(segment) { DashMapEntry::Occupied(entry) => entry.into_ref(), DashMapEntry::Vacant(entry) => { @@ -449,7 +456,7 @@ impl SnapshotWriter for Arc { fn latest_writer( &self, segment: SnapshotSegment, - ) -> ProviderResult>> { + ) -> ProviderResult> { self.get_writer(self.get_highest_snapshot_block(segment).unwrap_or_default(), segment) } diff --git a/crates/storage/provider/src/providers/snapshot/mod.rs b/crates/storage/provider/src/providers/snapshot/mod.rs index f357e53383e..f03b25c097d 100644 --- a/crates/storage/provider/src/providers/snapshot/mod.rs +++ b/crates/storage/provider/src/providers/snapshot/mod.rs @@ -5,7 +5,7 @@ mod jar; pub use jar::SnapshotJarProvider; mod writer; -pub use writer::SnapshotProviderRW; +pub use writer::{SnapshotProviderRW, SnapshotProviderRWRefMut}; use reth_interfaces::provider::ProviderResult; use reth_nippy_jar::NippyJar; diff --git a/crates/storage/provider/src/providers/snapshot/writer.rs b/crates/storage/provider/src/providers/snapshot/writer.rs index c17f763e86e..066edf64e3a 100644 --- a/crates/storage/provider/src/providers/snapshot/writer.rs +++ b/crates/storage/provider/src/providers/snapshot/writer.rs @@ -1,4 +1,5 @@ use super::SnapshotProvider; +use dashmap::mapref::one::RefMut; use reth_codecs::Compact; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter}; @@ -9,6 +10,9 @@ use reth_primitives::{ }; use std::{ops::Deref, path::PathBuf, sync::Arc}; +/// Mutable reference to a dashmap element of [`SnapshotProviderRW`]. +pub type SnapshotProviderRWRefMut<'a> = RefMut<'a, SnapshotSegment, SnapshotProviderRW<'static>>; + #[derive(Debug)] /// Extends `SnapshotProvider` with writing capabilities pub struct SnapshotProviderRW<'a> { @@ -274,6 +278,21 @@ impl<'a> SnapshotProviderRW<'a> { self.truncate(segment, number, Some(last_block)) } + /// Prunes `to_delete` number of receipts from snapshots. + /// + /// # Note + /// Commits to the configuration file at the end. + pub fn prune_receipts( + &mut self, + to_delete: u64, + last_block: BlockNumber, + ) -> ProviderResult<()> { + let segment = SnapshotSegment::Receipts; + debug_assert!(self.writer.user_header().segment() == segment); + + self.truncate(segment, to_delete, Some(last_block)) + } + #[cfg(any(test, feature = "test-utils"))] /// Helper function to override block range for testing. pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive) { diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index 0da47c47940..3862b678f4d 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -1,6 +1,6 @@ use crate::ProviderFactory; use reth_db::{ - test_utils::{create_test_rw_db, TempDatabase}, + test_utils::{create_test_rw_db, create_test_snapshots_dir, TempDatabase}, DatabaseEnv, }; use reth_primitives::{ChainSpec, MAINNET}; @@ -27,5 +27,5 @@ pub fn create_test_provider_factory_with_chain_spec( chain_spec: Arc, ) -> ProviderFactory>> { let db = create_test_rw_db(); - ProviderFactory::new(db, chain_spec) + ProviderFactory::new(db, chain_spec).with_snapshots(create_test_snapshots_dir()).unwrap() } diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 3ff4ebbabda..0221e17f121 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,7 +5,7 @@ use crate::{ Case, Error, Suite, }; use alloy_rlp::Decodable; -use reth_db::test_utils::create_test_rw_db; +use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_primitives::{BlockBody, SealedBlock}; use reth_provider::{BlockWriter, HashingWriter, ProviderFactory}; use reth_stages::{stages::ExecutionStage, ExecInput, Stage}; @@ -78,6 +78,8 @@ impl Case for BlockchainTestCase { // Create a new test database and initialize a provider for the test case. let db = create_test_rw_db(); let provider = ProviderFactory::new(db.as_ref(), Arc::new(case.network.clone().into())) + .with_snapshots(create_test_snapshots_dir()) + .map_err(|err| Error::RethError(err.into()))? .provider_rw() .unwrap();