diff --git a/Cargo.lock b/Cargo.lock index 5e22c9742ad..9e5e76b00de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -966,6 +966,7 @@ dependencies = [ "metrics", "num_cpus", "parking_lot 0.12.3", + "rayon", "serde", "slot_clock", "strum", diff --git a/beacon_node/beacon_processor/Cargo.toml b/beacon_node/beacon_processor/Cargo.toml index afd4660c9a3..262badf7f97 100644 --- a/beacon_node/beacon_processor/Cargo.toml +++ b/beacon_node/beacon_processor/Cargo.toml @@ -12,6 +12,7 @@ logging = { workspace = true } metrics = { workspace = true } num_cpus = { workspace = true } parking_lot = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } strum = { workspace = true } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0f324071a1e..d91543af2ef 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -261,6 +261,13 @@ impl Default for BeaconProcessorConfig { } } +impl BeaconProcessorConfig { + /// Get the number of workers, capped at max_workers + pub fn get_worker_count(&self, worker_count: usize) -> usize { + cmp::min(worker_count, self.max_workers) + } +} + // The channels necessary to instantiate a `BeaconProcessor`. pub struct BeaconProcessorChannels { pub beacon_processor_tx: BeaconProcessorSend, @@ -1280,7 +1287,7 @@ impl BeaconProcessor { if let Some(work_event) = work_event { let work_type = work_event.to_type(); - self.spawn_worker(work_event, idle_tx); + self.spawn_worker_for_work_type(work_event, idle_tx); Some(work_type) } else { None @@ -1328,7 +1335,7 @@ impl BeaconProcessor { ) } } - _ if can_spawn => self.spawn_worker(work, idle_tx), + _ if can_spawn => self.spawn_worker_for_work_type(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. @@ -1530,6 +1537,74 @@ impl BeaconProcessor { Ok(()) } + fn spawn_worker_for_work_type(&mut self, work: Work, idle_tx: mpsc::Sender<()>) { + let work_type = work.to_type(); + let base_worker_count = match work_type { + WorkType::ColumnReconstruction => 4, + _ => 1, + }; + let required_workers = self.config.get_worker_count(base_worker_count); + + if required_workers == 1 { + self.spawn_worker(work, idle_tx); + } else { + self.spawn_multi_worker(work, required_workers, idle_tx); + } + } + + /// Spawns multiple workers for a single work item + fn spawn_multi_worker( + &mut self, + work: Work, + worker_count: usize, + idle_tx: mpsc::Sender<()>, + ) { + let work_id = work.str_id(); + let worker_timer = + metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); + metrics::inc_counter_by( + &metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL, + worker_count as u64, + ); + metrics::inc_counter_vec( + &metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT, + &[work.str_id()], + ); + + // Create a multi-worker idle sender that only sends when all workers are done + let multi_idle_tx = MultiWorkerIdleSender::new(idle_tx, worker_count, worker_timer); + + let worker_id = self.current_workers; + self.current_workers = self.current_workers.saturating_add(worker_count); + + let executor = self.executor.clone(); + + trace!( + work = work_id, + worker = worker_id, + worker_count = worker_count, + "Spawning multi-worker beacon processor task" + ); + + // Create a scoped rayon thread pool for this work + let task_spawner = MultiWorkerTaskSpawner { + executor, + multi_idle_tx, + worker_count, + }; + + match work { + Work::ColumnReconstruction(process_fn) => { + task_spawner.spawn_async_with_scoped_rayon(process_fn) + } + // Add other multi-worker work types here as needed + _ => { + // This should never happen due to the check above, but just in case + unreachable!("Unsupported multi-worker work type should have been handled above"); + } + } + } + /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. @@ -1655,6 +1730,76 @@ impl BeaconProcessor { } } +/// Manages idle signaling for multi-worker tasks +struct MultiWorkerIdleSender { + tx: mpsc::Sender<()>, + worker_count: usize, + _worker_timer: Option, +} + +impl MultiWorkerIdleSender { + fn new( + tx: mpsc::Sender<()>, + worker_count: usize, + timer: Option, + ) -> Self { + Self { + tx, + worker_count, + _worker_timer: timer, + } + } +} + +impl Drop for MultiWorkerIdleSender { + fn drop(&mut self) { + // Send idle signal for all workers when dropped + for _ in 0..self.worker_count { + if let Err(e) = self.tx.try_send(()) { + warn!( + msg = "did not free multi-worker, shutdown may be underway", + error = %e, + worker_count = self.worker_count, + "Unable to free multi-worker" + ); + break; + } + } + } +} + +/// Task spawner for multi-worker tasks +struct MultiWorkerTaskSpawner { + executor: TaskExecutor, + multi_idle_tx: MultiWorkerIdleSender, + worker_count: usize, +} + +impl MultiWorkerTaskSpawner { + /// Spawn an async task with a scoped rayon thread pool + fn spawn_async_with_scoped_rayon(self, task: impl Future + Send + 'static) { + let worker_count = self.worker_count; + self.executor.spawn( + async move { + // Create a scoped rayon thread pool with limited threads + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(worker_count) + .build() + .expect("Failed to create scoped thread pool"); + + // Install the pool for this task's duration + pool.install(|| { + // Run the task within the scoped thread pool + tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(task)) + }); + + drop(self.multi_idle_tx); + }, + WORKER_TASK_NAME, + ) + } +} + /// Spawns tasks that are either: /// /// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor) @@ -1738,4 +1883,47 @@ mod tests { assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); } + + #[test] + fn test_worker_config_default() { + let config = BeaconProcessorConfig::default(); + + // Test that get_worker_count caps at max_workers + assert_eq!(config.get_worker_count(1), 1); + assert_eq!(config.get_worker_count(2), 2); + + // Test that worker count is capped by max_workers + let large_count = config.max_workers + 10; + assert_eq!(config.get_worker_count(large_count), config.max_workers); + } + + #[test] + fn test_worker_config_custom() { + let mut config = BeaconProcessorConfig::default(); + + // Test that worker count is capped by max_workers + config.max_workers = 3; + assert_eq!(config.get_worker_count(6), 3); // Capped at max_workers + assert_eq!(config.get_worker_count(2), 2); // Not capped + assert_eq!(config.get_worker_count(1), 1); // Not capped + } + + #[test] + fn test_multi_worker_idle_sender() { + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let worker_count = 3; + + // Create and drop the MultiWorkerIdleSender + { + let _multi_idle = MultiWorkerIdleSender::new(tx, worker_count, None); + // When dropped, it should send worker_count idle messages + } + + // Verify that the correct number of idle messages were sent + let mut received_count = 0; + while rx.try_recv().is_ok() { + received_count += 1; + } + assert_eq!(received_count, worker_count); + } }