Skip to content

Commit 81c10b4

Browse files
authored
feat(stages): remove static files stage, run before loop (#6763)
1 parent 5ebff60 commit 81c10b4

File tree

13 files changed

+134
-105
lines changed

13 files changed

+134
-105
lines changed

bin/reth/src/commands/debug_cmd/execution.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ impl Command {
127127
header_downloader,
128128
body_downloader,
129129
factory.clone(),
130-
static_file_producer,
131130
)?
132131
.set(SenderRecoveryStage {
133132
commit_threshold: stage_conf.sender_recovery.commit_threshold,
@@ -148,7 +147,7 @@ impl Command {
148147
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
149148
)),
150149
)
151-
.build(provider_factory);
150+
.build(provider_factory, static_file_producer);
152151

153152
Ok(pipeline)
154153
}

bin/reth/src/commands/debug_cmd/replay_engine.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ use reth_node_ethereum::{EthEngineTypes, EthEvmConfig};
2525
#[cfg(feature = "optimism")]
2626
use reth_node_optimism::{OptimismEngineTypes, OptimismEvmConfig};
2727
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
28-
use reth_primitives::{fs, ChainSpec};
28+
use reth_primitives::{fs, ChainSpec, PruneModes};
2929
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
3030
use reth_revm::EvmProcessorFactory;
3131
use reth_stages::Pipeline;
32+
use reth_static_file::StaticFileProducer;
3233
use reth_tasks::TaskExecutor;
3334
use reth_transaction_pool::noop::NoopTransactionPool;
3435
use std::{
@@ -196,7 +197,14 @@ impl Command {
196197
let (consensus_engine_tx, consensus_engine_rx) = mpsc::unbounded_channel();
197198
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
198199
network_client,
199-
Pipeline::builder().build(provider_factory),
200+
Pipeline::builder().build(
201+
provider_factory.clone(),
202+
StaticFileProducer::new(
203+
provider_factory.clone(),
204+
provider_factory.static_file_provider(),
205+
PruneModes::default(),
206+
),
207+
),
200208
blockchain_db.clone(),
201209
Box::new(ctx.task_executor.clone()),
202210
Box::new(network),

bin/reth/src/commands/import.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ impl ImportCommand {
191191
header_downloader,
192192
body_downloader,
193193
factory.clone(),
194-
static_file_producer,
195194
)?
196195
.set(SenderRecoveryStage {
197196
commit_threshold: config.stages.sender_recovery.commit_threshold,
@@ -213,7 +212,7 @@ impl ImportCommand {
213212
config.prune.map(|prune| prune.segments).unwrap_or_default(),
214213
)),
215214
)
216-
.build(provider_factory);
215+
.build(provider_factory, static_file_producer);
217216

218217
let events = pipeline.events().map(Into::into);
219218

crates/consensus/beacon/src/engine/sync.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,13 +398,14 @@ mod tests {
398398
use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient};
399399
use reth_primitives::{
400400
constants::ETHEREUM_BLOCK_GAS_LIMIT, stage::StageCheckpoint, BlockBody, ChainSpecBuilder,
401-
Header, SealedHeader, MAINNET,
401+
Header, PruneModes, SealedHeader, MAINNET,
402402
};
403403
use reth_provider::{
404404
test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory},
405405
BundleStateWithReceipts,
406406
};
407407
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
408+
use reth_static_file::StaticFileProducer;
408409
use reth_tasks::TokioTaskExecutor;
409410
use std::{collections::VecDeque, future::poll_fn, ops::Range};
410411
use tokio::sync::watch;
@@ -465,7 +466,15 @@ mod tests {
465466
pipeline = pipeline.with_max_block(max_block);
466467
}
467468

468-
pipeline.build(create_test_provider_factory_with_chain_spec(chain_spec))
469+
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
470+
471+
let static_file_producer = StaticFileProducer::new(
472+
provider_factory.clone(),
473+
provider_factory.static_file_provider(),
474+
PruneModes::default(),
475+
);
476+
477+
pipeline.build(provider_factory, static_file_producer)
469478
}
470479
}
471480

crates/consensus/beacon/src/engine/test_utils.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,6 @@ where
407407
header_downloader,
408408
body_downloader,
409409
executor_factory.clone(),
410-
static_file_producer,
411410
)
412411
.expect("should build"),
413412
)
@@ -418,7 +417,7 @@ where
418417
pipeline = pipeline.with_max_block(max_block);
419418
}
420419

421-
let pipeline = pipeline.build(provider_factory.clone());
420+
let pipeline = pipeline.build(provider_factory.clone(), static_file_producer);
422421

423422
// Setup blockchain tree
424423
let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory);

crates/node-core/src/node_config.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,6 @@ impl NodeConfig {
847847
header_downloader,
848848
body_downloader,
849849
factory.clone(),
850-
static_file_producer,
851850
)?
852851
.set(SenderRecoveryStage {
853852
commit_threshold: stage_config.sender_recovery.commit_threshold,
@@ -892,7 +891,7 @@ impl NodeConfig {
892891
prune_modes.storage_history,
893892
)),
894893
)
895-
.build(provider_factory);
894+
.build(provider_factory, static_file_producer);
896895

897896
Ok(pipeline)
898897
}

crates/stages/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ pub enum PipelineError {
163163
/// The pipeline encountered an error while trying to send an event.
164164
#[error("pipeline encountered an error while trying to send an event")]
165165
Channel(#[from] Box<SendError<PipelineEvent>>),
166-
/// The stage encountered an internal error.
166+
/// Internal error
167167
#[error(transparent)]
168-
Internal(Box<dyn std::error::Error + Send + Sync>),
168+
Internal(#[from] RethError),
169169
}

crates/stages/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,10 @@
5959
//! headers_downloader,
6060
//! bodies_downloader,
6161
//! executor_factory,
62-
//! static_file_producer,
6362
//! )
6463
//! .unwrap(),
6564
//! )
66-
//! .build(provider_factory);
65+
//! .build(provider_factory, static_file_producer);
6766
//! ```
6867
//!
6968
//! ## Feature Flags

crates/stages/src/pipeline/builder.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet}
22
use reth_db::database::Database;
33
use reth_primitives::{stage::StageId, BlockNumber, B256};
44
use reth_provider::ProviderFactory;
5+
use reth_static_file::StaticFileProducer;
56
use tokio::sync::watch;
67

78
/// Builds a [`Pipeline`].
@@ -67,12 +68,17 @@ where
6768
}
6869

6970
/// Builds the final [`Pipeline`] using the given database.
70-
pub fn build(self, provider_factory: ProviderFactory<DB>) -> Pipeline<DB> {
71+
pub fn build(
72+
self,
73+
provider_factory: ProviderFactory<DB>,
74+
static_file_producer: StaticFileProducer<DB>,
75+
) -> Pipeline<DB> {
7176
let Self { stages, max_block, tip_tx, metrics_tx } = self;
7277
Pipeline {
7378
provider_factory,
7479
stages,
7580
max_block,
81+
static_file_producer,
7682
tip_tx,
7783
listeners: Default::default(),
7884
progress: Default::default(),

crates/stages/src/pipeline/mod.rs

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ use crate::{
33
};
44
use futures_util::Future;
55
use reth_db::database::Database;
6+
use reth_interfaces::RethResult;
67
use reth_primitives::{
78
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH,
89
stage::{StageCheckpoint, StageId},
10+
static_file::HighestStaticFiles,
911
BlockNumber, B256,
1012
};
1113
use reth_provider::{
1214
providers::StaticFileWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
1315
};
16+
use reth_static_file::StaticFileProducer;
1417
use reth_tokio_util::EventListeners;
1518
use std::pin::Pin;
1619
use tokio::sync::watch;
@@ -68,6 +71,7 @@ pub struct Pipeline<DB: Database> {
6871
stages: Vec<BoxedStage<DB>>,
6972
/// The maximum block number to sync to.
7073
max_block: Option<BlockNumber>,
74+
static_file_producer: StaticFileProducer<DB>,
7175
/// All listeners for events the pipeline emits.
7276
listeners: EventListeners<PipelineEvent>,
7377
/// Keeps track of the progress of the pipeline.
@@ -179,6 +183,8 @@ where
179183
/// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused
180184
/// the unwind.
181185
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
186+
self.produce_static_files()?;
187+
182188
let mut previous_stage = None;
183189
for stage_index in 0..self.stages.len() {
184190
let stage = &self.stages[stage_index];
@@ -214,6 +220,33 @@ where
214220
Ok(self.progress.next_ctrl())
215221
}
216222

223+
/// Run [static file producer](StaticFileProducer) and move all data from the database to static
224+
/// files for corresponding [segments](reth_primitives::static_file::StaticFileSegment),
225+
/// according to their [stage checkpoints](StageCheckpoint):
226+
/// - [StaticFileSegment::Headers](reth_primitives::static_file::StaticFileSegment::Headers) ->
227+
/// [StageId::Headers]
228+
/// - [StaticFileSegment::Receipts](reth_primitives::static_file::StaticFileSegment::Receipts)
229+
/// -> [StageId::Execution]
230+
/// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions)
231+
/// -> [StageId::Bodies]
232+
fn produce_static_files(&mut self) -> RethResult<()> {
233+
let provider = self.provider_factory.provider()?;
234+
let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles {
235+
headers: provider
236+
.get_stage_checkpoint(StageId::Headers)?
237+
.map(|checkpoint| checkpoint.block_number),
238+
receipts: provider
239+
.get_stage_checkpoint(StageId::Execution)?
240+
.map(|checkpoint| checkpoint.block_number),
241+
transactions: provider
242+
.get_stage_checkpoint(StageId::Bodies)?
243+
.map(|checkpoint| checkpoint.block_number),
244+
})?;
245+
self.static_file_producer.run(targets)?;
246+
247+
Ok(())
248+
}
249+
217250
/// Unwind the stages to the target block.
218251
///
219252
/// If the unwind is due to a bad block the number of that block should be specified.
@@ -508,6 +541,7 @@ mod tests {
508541
provider::ProviderError,
509542
test_utils::{generators, generators::random_header},
510543
};
544+
use reth_primitives::PruneModes;
511545
use reth_provider::test_utils::create_test_provider_factory;
512546
use tokio_stream::StreamExt;
513547

@@ -553,7 +587,14 @@ mod tests {
553587
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
554588
)
555589
.with_max_block(10)
556-
.build(provider_factory);
590+
.build(
591+
provider_factory.clone(),
592+
StaticFileProducer::new(
593+
provider_factory.clone(),
594+
provider_factory.static_file_provider(),
595+
PruneModes::default(),
596+
),
597+
);
557598
let events = pipeline.events();
558599

559600
// Run pipeline
@@ -613,7 +654,14 @@ mod tests {
613654
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
614655
)
615656
.with_max_block(10)
616-
.build(provider_factory);
657+
.build(
658+
provider_factory.clone(),
659+
StaticFileProducer::new(
660+
provider_factory.clone(),
661+
provider_factory.static_file_provider(),
662+
PruneModes::default(),
663+
),
664+
);
617665
let events = pipeline.events();
618666

619667
// Run pipeline
@@ -720,7 +768,14 @@ mod tests {
720768
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
721769
)
722770
.with_max_block(10)
723-
.build(provider_factory);
771+
.build(
772+
provider_factory.clone(),
773+
StaticFileProducer::new(
774+
provider_factory.clone(),
775+
provider_factory.static_file_provider(),
776+
PruneModes::default(),
777+
),
778+
);
724779
let events = pipeline.events();
725780

726781
// Run pipeline
@@ -817,7 +872,14 @@ mod tests {
817872
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
818873
)
819874
.with_max_block(10)
820-
.build(provider_factory);
875+
.build(
876+
provider_factory.clone(),
877+
StaticFileProducer::new(
878+
provider_factory.clone(),
879+
provider_factory.static_file_provider(),
880+
PruneModes::default(),
881+
),
882+
);
821883
let events = pipeline.events();
822884

823885
// Run pipeline
@@ -897,7 +959,14 @@ mod tests {
897959
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
898960
)
899961
.with_max_block(10)
900-
.build(provider_factory);
962+
.build(
963+
provider_factory.clone(),
964+
StaticFileProducer::new(
965+
provider_factory.clone(),
966+
provider_factory.static_file_provider(),
967+
PruneModes::default(),
968+
),
969+
);
901970
let result = pipeline.run().await;
902971
assert_matches!(result, Ok(()));
903972

@@ -907,7 +976,14 @@ mod tests {
907976
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
908977
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
909978
)))
910-
.build(provider_factory);
979+
.build(
980+
provider_factory.clone(),
981+
StaticFileProducer::new(
982+
provider_factory.clone(),
983+
provider_factory.static_file_provider(),
984+
PruneModes::default(),
985+
),
986+
);
911987
let result = pipeline.run().await;
912988
assert_matches!(
913989
result,

0 commit comments

Comments
 (0)