Skip to content

Add multi-worker work support to BeaconProcessor #7720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ logging = { workspace = true }
metrics = { workspace = true }
num_cpus = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
Expand Down
192 changes: 190 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ impl Default for BeaconProcessorConfig {
}
}

impl BeaconProcessorConfig {
/// Get the number of workers, capped at max_workers
pub fn get_worker_count(&self, worker_count: usize) -> usize {
cmp::min(worker_count, self.max_workers)
}
}

// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
Expand Down Expand Up @@ -1280,7 +1287,7 @@ impl<E: EthSpec> BeaconProcessor<E> {

if let Some(work_event) = work_event {
let work_type = work_event.to_type();
self.spawn_worker(work_event, idle_tx);
self.spawn_worker_for_work_type(work_event, idle_tx);
Some(work_type)
} else {
None
Expand Down Expand Up @@ -1328,7 +1335,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
)
}
}
_ if can_spawn => self.spawn_worker(work, idle_tx),
_ if can_spawn => self.spawn_worker_for_work_type(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Expand Down Expand Up @@ -1530,6 +1537,74 @@ impl<E: EthSpec> BeaconProcessor<E> {
Ok(())
}

fn spawn_worker_for_work_type(&mut self, work: Work<E>, idle_tx: mpsc::Sender<()>) {
let work_type = work.to_type();
let base_worker_count = match work_type {
WorkType::ColumnReconstruction => 4,
_ => 1,
};
let required_workers = self.config.get_worker_count(base_worker_count);

if required_workers == 1 {
self.spawn_worker(work, idle_tx);
} else {
self.spawn_multi_worker(work, required_workers, idle_tx);
}
}

/// Spawns multiple workers for a single work item
fn spawn_multi_worker(
&mut self,
work: Work<E>,
worker_count: usize,
idle_tx: mpsc::Sender<()>,
) {
let work_id = work.str_id();
let worker_timer =
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
metrics::inc_counter_by(
&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL,
worker_count as u64,
);
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT,
&[work.str_id()],
);

// Create a multi-worker idle sender that only sends when all workers are done
let multi_idle_tx = MultiWorkerIdleSender::new(idle_tx, worker_count, worker_timer);

let worker_id = self.current_workers;
self.current_workers = self.current_workers.saturating_add(worker_count);

let executor = self.executor.clone();

trace!(
work = work_id,
worker = worker_id,
worker_count = worker_count,
"Spawning multi-worker beacon processor task"
);

// Create a scoped rayon thread pool for this work
let task_spawner = MultiWorkerTaskSpawner {
executor,
multi_idle_tx,
worker_count,
};

match work {
Work::ColumnReconstruction(process_fn) => {
task_spawner.spawn_async_with_scoped_rayon(process_fn)
}
// Add other multi-worker work types here as needed
_ => {
// This should never happen due to the check above, but just in case
unreachable!("Unsupported multi-worker work type should have been handled above");
}
}
}

/// Spawns a blocking worker thread to process some `Work`.
///
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
Expand Down Expand Up @@ -1655,6 +1730,76 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
}

/// Manages idle signaling for multi-worker tasks
struct MultiWorkerIdleSender {
tx: mpsc::Sender<()>,
worker_count: usize,
_worker_timer: Option<metrics::HistogramTimer>,
}

impl MultiWorkerIdleSender {
fn new(
tx: mpsc::Sender<()>,
worker_count: usize,
timer: Option<metrics::HistogramTimer>,
) -> Self {
Self {
tx,
worker_count,
_worker_timer: timer,
}
}
}

impl Drop for MultiWorkerIdleSender {
fn drop(&mut self) {
// Send idle signal for all workers when dropped
for _ in 0..self.worker_count {
if let Err(e) = self.tx.try_send(()) {
warn!(
msg = "did not free multi-worker, shutdown may be underway",
error = %e,
worker_count = self.worker_count,
"Unable to free multi-worker"
);
break;
}
}
}
}

/// Task spawner for multi-worker tasks
struct MultiWorkerTaskSpawner {
executor: TaskExecutor,
multi_idle_tx: MultiWorkerIdleSender,
worker_count: usize,
}

impl MultiWorkerTaskSpawner {
/// Spawn an async task with a scoped rayon thread pool
fn spawn_async_with_scoped_rayon(self, task: impl Future<Output = ()> + Send + 'static) {
let worker_count = self.worker_count;
self.executor.spawn(
async move {
// Create a scoped rayon thread pool with limited threads
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(worker_count)
.build()
.expect("Failed to create scoped thread pool");

// Install the pool for this task's duration
pool.install(|| {
// Run the task within the scoped thread pool
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(task))
});

drop(self.multi_idle_tx);
},
WORKER_TASK_NAME,
)
}
}

/// Spawns tasks that are either:
///
/// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor)
Expand Down Expand Up @@ -1738,4 +1883,47 @@ mod tests {
assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN);
assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN);
}

#[test]
fn test_worker_config_default() {
let config = BeaconProcessorConfig::default();

// Test that get_worker_count caps at max_workers
assert_eq!(config.get_worker_count(1), 1);
assert_eq!(config.get_worker_count(2), 2);

// Test that worker count is capped by max_workers
let large_count = config.max_workers + 10;
assert_eq!(config.get_worker_count(large_count), config.max_workers);
}

#[test]
fn test_worker_config_custom() {
let mut config = BeaconProcessorConfig::default();

// Test that worker count is capped by max_workers
config.max_workers = 3;
assert_eq!(config.get_worker_count(6), 3); // Capped at max_workers
assert_eq!(config.get_worker_count(2), 2); // Not capped
assert_eq!(config.get_worker_count(1), 1); // Not capped
}

#[test]
fn test_multi_worker_idle_sender() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let worker_count = 3;

// Create and drop the MultiWorkerIdleSender
{
let _multi_idle = MultiWorkerIdleSender::new(tx, worker_count, None);
// When dropped, it should send worker_count idle messages
}

// Verify that the correct number of idle messages were sent
let mut received_count = 0;
while rx.try_recv().is_ok() {
received_count += 1;
}
assert_eq!(received_count, worker_count);
}
}
Loading