From e33d09a2638301fbebda3b82581528156fdd0f04 Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Mon, 11 May 2026 17:20:01 -0700 Subject: [PATCH 1/9] Update CHANGELOG.md and 2 other files --- CHANGELOG.md | 16 + src/bin/roxyd/control.rs | 365 ++++++++++++++++++++++- src/bin/roxyd/handlers/power.rs | 503 ++++++++++++++++++++++++++++++-- 3 files changed, 861 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67a35c5..9dd2efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,22 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- Implement the full `node_power` family directly in `roxyd`. Immediate + `NodePowerRequest::Reboot` and `NodePowerRequest::Shutdown` (and the legacy + flat `reboot`/`shutdown` compatibility requests) now prepare a pending + reboot or power-off operation and only execute the destructive system call + after `roxyd` has written `NodePowerResponse::Initiated` successfully on + the request stream. If the response write fails, the pending operation is + cancelled without rebooting or powering off. Immediate variants remain + Linux-only and return `"invalid command"` on other platforms. + `NodePowerRequest::GracefulReboot` and `NodePowerRequest::GracefulShutdown` + spawn the platform's standard reboot/poweroff command and return + `"fail"` if the process could not be started. + +## [0.6.0] - 2026-04-16 + +### Added + - Add explicit shutdown path for `roxyd` that handles OS signals (SIGINT/SIGTERM), cancels any in-progress connection attempt or accept/reconnect loop cleanly, and logs shutdown lifecycle events. diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 75cfae1..08c3624 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -4,6 +4,7 @@ //! Manager: connect, run loop, stream accept, reconnect, and dispatch entry. //! All request handling is delegated to the [`handlers`] module. +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; @@ -17,6 +18,7 @@ use review_protocol::types::node::{ }; use tokio::sync::watch; +use super::handlers::power::{PowerExecutor, PowerHandler, SystemPowerExecutor}; use super::{handlers, settings::Settings}; /// The review-protocol version required by this client. @@ -222,10 +224,44 @@ impl Connection { /// /// Returns an error if request reading or response sending fails. async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Result<()> { - let mut handler = RequestHandler; - review_protocol::request::handle(&mut handler, send, recv) + dispatch_with_executor(send, recv, Arc::new(SystemPowerExecutor)).await +} + +/// Power-aware dispatch entry that takes a caller-supplied +/// [`PowerExecutor`]. +/// +/// Immediate `NodePowerRequest::Reboot` and `NodePowerRequest::Shutdown` +/// (and the legacy flat `reboot`/`shutdown` compatibility paths) prepare a +/// [`handlers::power::PendingPowerOperation`] inside the handler but do not +/// execute the destructive system call. After +/// [`review_protocol::request::handle`] returns successfully — which means +/// the `NodePowerResponse::Initiated` response has been written to the +/// stream — this function releases the pending operation. If the response +/// write fails, the pending operation is dropped without being released and +/// the executor is never invoked. +/// +/// # Errors +/// +/// Returns an error if request reading or response sending fails. +async fn dispatch_with_executor( + send: &mut quinn::SendStream, + recv: &mut quinn::RecvStream, + executor: Arc, +) -> Result<()> { + let mut handler = RequestHandler::new(executor); + let result = review_protocol::request::handle(&mut handler, send, recv) .await - .map_err(|e| anyhow::anyhow!("{e}")) + .map_err(|e| anyhow::anyhow!("{e}")); + + if result.is_ok() { + // The response stream wrote successfully; release any prepared + // immediate power operations so they proceed to reboot/poweroff. + let _join_handles = handler.power.release_pending(); + } + // On error, `handler` is dropped here; any unreleased pending power + // operations observe the closed sender and exit without rebooting. + + result } /// Request handler that maps review-protocol requests into roxyd handlers. @@ -238,7 +274,17 @@ async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> /// The legacy flat methods (`reboot`, `shutdown`, `process_list`, /// `resource_usage`) are temporary protocol-compatibility adapters that /// route through the grouped handlers. -struct RequestHandler; +struct RequestHandler { + power: PowerHandler, +} + +impl RequestHandler { + fn new(executor: Arc) -> Self { + Self { + power: PowerHandler::new(executor), + } + } +} #[async_trait::async_trait] impl review_protocol::request::Handler for RequestHandler { @@ -286,7 +332,7 @@ impl review_protocol::request::Handler for RequestHandler { async fn node_power(&mut self, req: NodePowerRequest) -> Result { tracing::info!(handler_group = "node_power", request = %req.service_id(), "Dispatching request"); - handlers::power::handle(req).await + self.power.handle(req).await } async fn node_observation( @@ -859,6 +905,315 @@ mod tests { assert!(task_result.is_ok()); } + // -- Tests: RequestCode dispatch over live connection -------------- + + /// Spawns a dispatch loop that uses the provided mock executor for every + /// stream accepted on this connection. Returns the task handle so the + /// caller can drive shutdown. + fn spawn_dispatch_loop_with_mock( + inner: review_protocol::client::Connection, + mock: Arc, + ) -> tokio::task::JoinHandle> { + let executor: Arc = mock; + tokio::spawn(async move { + loop { + let Ok((mut send, mut recv)) = inner.accept_bi().await else { + return Ok::<(), anyhow::Error>(()); + }; + if let Err(e) = dispatch_with_executor(&mut send, &mut recv, executor.clone()).await + { + tracing::error!("Request handling failed: {e}"); + } + } + }) + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn dispatch_reboot_over_live_connection() { + use std::sync::atomic::Ordering; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let result = server.send_reboot_cmd().await; + assert!(result.is_ok(), "reboot should be accepted: {result:?}"); + + // Give the background reboot task time to call the mock executor. + for _ in 0..50 { + if mock.reboot_count.load(Ordering::SeqCst) > 0 { + break; + } + tokio::task::yield_now().await; + } + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + + drop(server); + let task_result = task.await.expect("dispatch task should not panic"); + assert!(task_result.is_ok()); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn dispatch_shutdown_over_live_connection() { + use std::sync::atomic::Ordering; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let result = server.send_shutdown_cmd().await; + assert!(result.is_ok(), "shutdown should be accepted: {result:?}"); + + for _ in 0..50 { + if mock.power_off_count.load(Ordering::SeqCst) > 0 { + break; + } + tokio::task::yield_now().await; + } + assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); + + drop(server); + let task_result = task.await.expect("dispatch task should not panic"); + assert!(task_result.is_ok()); + } + + #[cfg(not(target_os = "linux"))] + #[tokio::test] + async fn dispatch_reboot_over_live_connection_non_linux() { + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let err = server + .send_reboot_cmd() + .await + .expect_err("immediate reboot is Linux-only"); + assert!( + err.to_string().contains("invalid command"), + "expected 'invalid command', got: {err}" + ); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[cfg(not(target_os = "linux"))] + #[tokio::test] + async fn dispatch_shutdown_over_live_connection_non_linux() { + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let err = server + .send_shutdown_cmd() + .await + .expect_err("immediate shutdown is Linux-only"); + assert!( + err.to_string().contains("invalid command"), + "expected 'invalid command', got: {err}" + ); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn dispatch_node_power_reboot_over_live_connection() { + use std::sync::atomic::Ordering; + + use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let resp = server + .node_power(NodePowerRequest::Reboot) + .await + .expect("node_power reboot should succeed"); + assert_eq!(resp, NodePowerResponse::Initiated); + + for _ in 0..50 { + if mock.reboot_count.load(Ordering::SeqCst) > 0 { + break; + } + tokio::task::yield_now().await; + } + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[cfg(not(target_os = "linux"))] + #[tokio::test] + async fn dispatch_node_power_reboot_over_live_connection_non_linux() { + use review_protocol::types::node::NodePowerRequest; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let err = server + .node_power(NodePowerRequest::Reboot) + .await + .expect_err("immediate reboot is Linux-only"); + assert!( + err.to_string().contains("invalid command"), + "expected 'invalid command', got: {err}" + ); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[tokio::test] + async fn dispatch_node_power_graceful_reboot_over_live_connection() { + use std::sync::atomic::Ordering; + + use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let resp = server + .node_power(NodePowerRequest::GracefulReboot) + .await + .expect("graceful reboot should succeed"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(mock.graceful_reboot_count.load(Ordering::SeqCst), 1); + // Graceful path must not trigger immediate reboot. + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[tokio::test] + async fn dispatch_node_power_graceful_shutdown_over_live_connection() { + use std::sync::atomic::Ordering; + + use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let resp = server + .node_power(NodePowerRequest::GracefulShutdown) + .await + .expect("graceful shutdown should succeed"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(mock.graceful_power_off_count.load(Ordering::SeqCst), 1); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + #[tokio::test] + async fn dispatch_node_power_graceful_reboot_fail_over_live_connection() { + use std::sync::atomic::Ordering; + + use review_protocol::types::node::NodePowerRequest; + + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + mock.graceful_reboot_fail.store(true, Ordering::SeqCst); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + let err = server + .node_power(NodePowerRequest::GracefulReboot) + .await + .expect_err("graceful reboot should fail"); + let msg = err.to_string(); + assert!( + msg.contains("fail"), + "expected 'fail' in error message, got: {msg}" + ); + + drop(server); + let _ = task.await.expect("dispatch task should not panic"); + } + + /// Verifies that when the response stream is dropped before the response + /// reaches the peer (simulated here by closing the stream from the + /// server side after the connection-level send), the pending immediate + /// power operation is not released and the executor is never invoked. + #[tokio::test] + async fn pending_reboot_cancelled_on_dispatch_error() { + use std::sync::atomic::Ordering; + + // Build a handler and call into it directly to simulate the dispatch + // path. This isolates the cancellation invariant from the QUIC + // transport. + let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mut handler = super::RequestHandler::new(mock.clone() as Arc); + + // Drive the node_power handler directly. On non-Linux this returns + // an error and there is nothing pending — the assertion holds. + let _ = review_protocol::request::Handler::node_power( + &mut handler, + review_protocol::types::node::NodePowerRequest::Reboot, + ) + .await; + + // Drop without calling release_pending — simulates a response-write + // failure in dispatch_with_executor. + drop(handler); + + for _ in 0..20 { + tokio::task::yield_now().await; + } + + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn dispatch_resource_usage_over_live_connection() { + let (inner, server, _endpoint) = setup_test_connection().await; + let task = tokio::spawn(async move { + loop { + let Ok((mut send, mut recv)) = inner.accept_bi().await else { + return Ok::<(), anyhow::Error>(()); + }; + if let Err(e) = dispatch(&mut send, &mut recv).await { + tracing::error!("Request handling failed: {e}"); + } + } + }); + + let result = server.get_resource_usage().await; + assert!(result.is_err(), "should fail: handler is unimplemented"); + + let task_err = task.await.expect_err("task should have panicked"); + assert!(task_err.is_panic()); + } + + #[tokio::test] + async fn dispatch_process_list_over_live_connection() { + let (inner, server, _endpoint) = setup_test_connection().await; + let task = tokio::spawn(async move { + loop { + let Ok((mut send, mut recv)) = inner.accept_bi().await else { + return Ok::<(), anyhow::Error>(()); + }; + if let Err(e) = dispatch(&mut send, &mut recv).await { + tracing::error!("Request handling failed: {e}"); + } + } + }); + + let result = server.get_process_list().await; + assert!(result.is_err(), "should fail: handler is unimplemented"); + + let task_err = task.await.expect_err("task should have panicked"); + assert!(task_err.is_panic()); + } + #[tokio::test] async fn run_reconnects_after_connection_close() { use tokio::sync::oneshot; diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index 56f2db5..1cb1e1e 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -1,30 +1,497 @@ -// TODO: Scaffolding only — implement actual power-control logic later. +//! Power-control request handling. +//! +//! Immediate `Reboot` and `Shutdown` requests are split into two phases: the +//! request handler *prepares* a [`PendingPowerOperation`] and returns +//! `NodePowerResponse::Initiated` without actually rebooting or powering off. +//! The dispatch layer is responsible for [releasing](PowerHandler::release_pending) +//! the pending operation only after the success response has been written +//! successfully. If the response write fails, the pending operation is dropped +//! and the destructive system call is never made. +//! +//! Graceful variants spawn the standard platform reboot/poweroff command and +//! return `Initiated` on successful spawn, `"fail"` otherwise. + +use std::process::Command; +use std::sync::Arc; use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; -/// Handles a node power-control request. -/// -/// # Errors +const ERR_INVALID_COMMAND: &str = "invalid command"; +const ERR_FAIL: &str = "fail"; + +/// Executes the platform-specific power-control operations. /// -/// Returns an error message if the operation fails. +/// Production code uses [`SystemPowerExecutor`]; tests inject a mock so that +/// power operations can be observed without actually rebooting the host. +pub(crate) trait PowerExecutor: Send + Sync { + /// Performs an immediate reboot. On Linux this calls + /// `nix::sys::reboot::reboot`, which does not return on success. + /// + /// Only called from the immediate-reboot path, which is Linux-only. + #[cfg_attr(not(target_os = "linux"), allow(dead_code))] + fn reboot(&self); + + /// Performs an immediate power-off. + /// + /// Only called from the immediate-shutdown path, which is Linux-only. + #[cfg_attr(not(target_os = "linux"), allow(dead_code))] + fn power_off(&self); + + /// Spawns a graceful reboot process. + /// + /// # Errors + /// + /// Returns `Err(())` if the process could not be spawned. + fn graceful_reboot(&self) -> Result<(), ()>; + + /// Spawns a graceful power-off process. + /// + /// # Errors + /// + /// Returns `Err(())` if the process could not be spawned. + fn graceful_power_off(&self) -> Result<(), ()>; +} + +/// Production executor that triggers reboot/power-off via `nix::sys::reboot` +/// (immediate) and the platform's standard CLI tools (graceful). +pub(crate) struct SystemPowerExecutor; + +impl PowerExecutor for SystemPowerExecutor { + fn reboot(&self) { + #[cfg(target_os = "linux")] + { + if let Err(e) = nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_AUTOBOOT) { + tracing::error!("nix reboot failed: {e}"); + } + } + #[cfg(not(target_os = "linux"))] + { + tracing::error!("immediate reboot is not supported on this platform"); + } + } + + fn power_off(&self) { + #[cfg(target_os = "linux")] + { + if let Err(e) = nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF) { + tracing::error!("nix poweroff failed: {e}"); + } + } + #[cfg(not(target_os = "linux"))] + { + tracing::error!("immediate poweroff is not supported on this platform"); + } + } + + fn graceful_reboot(&self) -> Result<(), ()> { + #[cfg(target_os = "linux")] + let result = Command::new("reboot").spawn(); + #[cfg(target_os = "macos")] + let result = Command::new("sudo").args(["reboot"]).spawn(); + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let result: std::io::Result = Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "graceful reboot is not supported on this platform", + )); + + match result { + Ok(_) => Ok(()), + Err(e) => { + tracing::debug!("Failed to execute graceful reboot: {e}"); + Err(()) + } + } + } + + fn graceful_power_off(&self) -> Result<(), ()> { + #[cfg(target_os = "linux")] + let result = Command::new("poweroff").spawn(); + #[cfg(target_os = "macos")] + let result = Command::new("sudo").args(["shutdown", "-h", "now"]).spawn(); + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let result: std::io::Result = Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "graceful poweroff is not supported on this platform", + )); + + match result { + Ok(_) => Ok(()), + Err(e) => { + tracing::debug!("Failed to execute graceful poweroff: {e}"); + Err(()) + } + } + } +} + +/// Which immediate power action a [`PendingPowerOperation`] represents. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum PowerAction { + Reboot, + PowerOff, +} + +/// An immediate reboot or power-off intent that has been prepared but not +/// yet executed. /// -/// # Panics +/// The pending operation owns a oneshot release channel and a background +/// task that waits on the channel before calling the destructive system +/// operation. If the operation is dropped without being released (for +/// example, because the response write failed), the sender side of the +/// channel is dropped, the background task observes the closed channel, +/// and the executor is never invoked. +pub(crate) struct PendingPowerOperation { + release_tx: oneshot::Sender<()>, + task: JoinHandle<()>, +} + +impl PendingPowerOperation { + /// Prepares a pending operation. Spawns a background task that waits + /// for the release signal before calling the executor. + #[cfg_attr(not(target_os = "linux"), allow(dead_code))] + pub(crate) fn prepare(executor: Arc, action: PowerAction) -> Self { + let (release_tx, release_rx) = oneshot::channel::<()>(); + let task = tokio::spawn(async move { + if release_rx.await.is_ok() { + match action { + PowerAction::Reboot => executor.reboot(), + PowerAction::PowerOff => executor.power_off(), + } + } + }); + Self { release_tx, task } + } + + /// Releases the pending operation. The background task receives the + /// signal and proceeds to call the executor. The returned + /// [`JoinHandle`] resolves once the executor returns; tests can await + /// it to observe the call. Production callers may ignore the handle. + pub(crate) fn release(self) -> JoinHandle<()> { + let _ = self.release_tx.send(()); + self.task + } +} + +/// Per-stream handler state for power requests. /// -/// Always panics — scaffolding only, not yet implemented. -#[allow(clippy::unused_async)] -pub async fn handle(req: NodePowerRequest) -> Result { - match req { - NodePowerRequest::Reboot => { - unimplemented!("NodePowerRequest::Reboot") +/// Owns a shared [`PowerExecutor`] and accumulates pending immediate +/// operations during a request. The dispatch layer must call +/// [`release_pending`](Self::release_pending) only after the success +/// response has been written successfully on the request stream. +pub(crate) struct PowerHandler { + executor: Arc, + pending: Vec, +} + +impl PowerHandler { + pub(crate) fn new(executor: Arc) -> Self { + Self { + executor, + pending: Vec::new(), + } + } + + /// Handles a [`NodePowerRequest`]. + /// + /// # Errors + /// + /// Returns `Err("invalid command")` for immediate requests on platforms + /// where they are not supported, and `Err("fail")` if a graceful + /// operation could not be initiated. + #[allow(clippy::unused_async)] + pub(crate) async fn handle( + &mut self, + req: NodePowerRequest, + ) -> Result { + match req { + NodePowerRequest::Reboot => self.prepare_immediate(PowerAction::Reboot), + NodePowerRequest::Shutdown => self.prepare_immediate(PowerAction::PowerOff), + NodePowerRequest::GracefulReboot => self.graceful_reboot(), + NodePowerRequest::GracefulShutdown => self.graceful_power_off(), + } + } + + #[allow(clippy::unused_self)] + fn prepare_immediate(&mut self, action: PowerAction) -> Result { + #[cfg(target_os = "linux")] + { + let op = PendingPowerOperation::prepare(self.executor.clone(), action); + self.pending.push(op); + Ok(NodePowerResponse::Initiated) + } + #[cfg(not(target_os = "linux"))] + { + let _ = action; + Err(ERR_INVALID_COMMAND.to_string()) + } + } + + fn graceful_reboot(&self) -> Result { + match self.executor.graceful_reboot() { + Ok(()) => Ok(NodePowerResponse::Initiated), + Err(()) => Err(ERR_FAIL.to_string()), + } + } + + fn graceful_power_off(&self) -> Result { + match self.executor.graceful_power_off() { + Ok(()) => Ok(NodePowerResponse::Initiated), + Err(()) => Err(ERR_FAIL.to_string()), + } + } + + /// Releases all pending immediate power operations. + /// + /// Returns the per-operation [`JoinHandle`]s so that callers (typically + /// tests) can await the executor calls. Production callers may drop the + /// returned vector. + pub(crate) fn release_pending(&mut self) -> Vec> { + self.pending + .drain(..) + .map(PendingPowerOperation::release) + .collect() + } + + #[cfg(test)] + pub(crate) fn pending_count(&self) -> usize { + self.pending.len() + } +} + +#[cfg(test)] +pub(crate) use mock::MockPowerExecutor; + +#[cfg(test)] +pub(crate) mod mock { + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + + use super::PowerExecutor; + + /// In-memory mock executor used by tests. Records call counts and can + /// be configured to fail graceful operations. + #[derive(Default)] + pub(crate) struct MockPowerExecutor { + pub reboot_count: AtomicUsize, + pub power_off_count: AtomicUsize, + pub graceful_reboot_count: AtomicUsize, + pub graceful_power_off_count: AtomicUsize, + pub graceful_reboot_fail: AtomicBool, + pub graceful_power_off_fail: AtomicBool, + } + + impl PowerExecutor for MockPowerExecutor { + fn reboot(&self) { + self.reboot_count.fetch_add(1, Ordering::SeqCst); } - NodePowerRequest::Shutdown => { - unimplemented!("NodePowerRequest::Shutdown") + + fn power_off(&self) { + self.power_off_count.fetch_add(1, Ordering::SeqCst); } - NodePowerRequest::GracefulReboot => { - unimplemented!("NodePowerRequest::GracefulReboot") + + fn graceful_reboot(&self) -> Result<(), ()> { + self.graceful_reboot_count.fetch_add(1, Ordering::SeqCst); + if self.graceful_reboot_fail.load(Ordering::SeqCst) { + Err(()) + } else { + Ok(()) + } } - NodePowerRequest::GracefulShutdown => { - unimplemented!("NodePowerRequest::GracefulShutdown") + + fn graceful_power_off(&self) -> Result<(), ()> { + self.graceful_power_off_count.fetch_add(1, Ordering::SeqCst); + if self.graceful_power_off_fail.load(Ordering::SeqCst) { + Err(()) + } else { + Ok(()) + } } } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::Ordering; + + use super::mock::MockPowerExecutor; + use super::*; + + #[tokio::test] + async fn pending_operation_executes_only_after_release() { + let mock = Arc::new(MockPowerExecutor::default()); + let executor: Arc = mock.clone(); + let pending = PendingPowerOperation::prepare(executor, PowerAction::Reboot); + + // Give the background task a chance to run; it should still be + // waiting on the release channel. + tokio::task::yield_now().await; + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + + let task = pending.release(); + task.await.expect("background task should complete"); + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn pending_operation_dropped_without_release_does_not_execute() { + let mock = Arc::new(MockPowerExecutor::default()); + let executor: Arc = mock.clone(); + let pending = PendingPowerOperation::prepare(executor, PowerAction::PowerOff); + + // Drop without releasing. The background task should observe the + // closed sender and exit without calling the executor. + drop(pending); + + // Yield repeatedly to allow the background task to finish. + for _ in 0..10 { + tokio::task::yield_now().await; + } + + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 0); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn handle_reboot_on_linux_prepares_pending_op() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let resp = handler + .handle(NodePowerRequest::Reboot) + .await + .expect("reboot should return Initiated"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(handler.pending_count(), 1); + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + + let handles = handler.release_pending(); + for handle in handles { + handle.await.expect("background task should complete"); + } + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn handle_shutdown_on_linux_prepares_pending_op() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let resp = handler + .handle(NodePowerRequest::Shutdown) + .await + .expect("shutdown should return Initiated"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(handler.pending_count(), 1); + + let handles = handler.release_pending(); + for handle in handles { + handle.await.expect("background task should complete"); + } + assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn handle_reboot_dropped_without_release_does_not_reboot() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let _ = handler + .handle(NodePowerRequest::Reboot) + .await + .expect("reboot should return Initiated"); + + // Simulate a response-write failure by dropping the handler without + // calling release_pending. + drop(handler); + + for _ in 0..10 { + tokio::task::yield_now().await; + } + + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + } + + #[cfg(not(target_os = "linux"))] + #[tokio::test] + async fn handle_reboot_on_non_linux_returns_invalid_command() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let err = handler + .handle(NodePowerRequest::Reboot) + .await + .expect_err("reboot should be unsupported on non-Linux"); + assert_eq!(err, ERR_INVALID_COMMAND); + assert_eq!(handler.pending_count(), 0); + } + + #[cfg(not(target_os = "linux"))] + #[tokio::test] + async fn handle_shutdown_on_non_linux_returns_invalid_command() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let err = handler + .handle(NodePowerRequest::Shutdown) + .await + .expect_err("shutdown should be unsupported on non-Linux"); + assert_eq!(err, ERR_INVALID_COMMAND); + assert_eq!(handler.pending_count(), 0); + } + + #[tokio::test] + async fn handle_graceful_reboot_returns_initiated_on_success() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let resp = handler + .handle(NodePowerRequest::GracefulReboot) + .await + .expect("graceful reboot should succeed"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(mock.graceful_reboot_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn handle_graceful_reboot_returns_fail_on_spawn_error() { + let mock = Arc::new(MockPowerExecutor::default()); + mock.graceful_reboot_fail.store(true, Ordering::SeqCst); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let err = handler + .handle(NodePowerRequest::GracefulReboot) + .await + .expect_err("graceful reboot should fail"); + assert_eq!(err, ERR_FAIL); + } + + #[tokio::test] + async fn handle_graceful_shutdown_returns_initiated_on_success() { + let mock = Arc::new(MockPowerExecutor::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let resp = handler + .handle(NodePowerRequest::GracefulShutdown) + .await + .expect("graceful shutdown should succeed"); + assert_eq!(resp, NodePowerResponse::Initiated); + assert_eq!(mock.graceful_power_off_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn handle_graceful_shutdown_returns_fail_on_spawn_error() { + let mock = Arc::new(MockPowerExecutor::default()); + mock.graceful_power_off_fail.store(true, Ordering::SeqCst); + let mut handler = PowerHandler::new(mock.clone() as Arc); + + let err = handler + .handle(NodePowerRequest::GracefulShutdown) + .await + .expect_err("graceful shutdown should fail"); + assert_eq!(err, ERR_FAIL); + } +} From 77c9d84fc3178a3a693e3f22fbfb897e5bedc3c1 Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Thu, 21 May 2026 14:05:33 +0000 Subject: [PATCH 2/9] Resolve merge conflict in roxyd power tests Rebase conflict in src/bin/roxyd/control.rs was resolved by keeping the incoming commit's full power-handler test suite (mock executor), and removing the obsolete panic-expecting reboot/shutdown tests from HEAD. All conflict markers were removed and CHANGELOG.md updated. cargo check -p roxy --bin roxyd succeeds. --- CHANGELOG.md | 23 ++++----- src/bin/roxyd/control.rs | 108 +++++++++++++++++++++++++++++---------- 2 files changed, 89 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dd2efe..299b714 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,20 +27,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add explicit shutdown path for `roxyd` that handles OS signals (SIGINT/SIGTERM), cancels any in-progress connection attempt or accept/reconnect loop cleanly, and logs shutdown lifecycle events. - -### Changed - -- Simplified `list_files` to return only file names instead of unused size and - modified-time data. - -### Removed - -- Removed the direct `chrono` dependency. - -## [0.6.0] - 2026-04-16 - -### Added - - Add tracing in `roxyd` for incoming `review-protocol` request dispatch, logging the selected handler group and request identifier. - Add `roxyd` binary entrypoint as a new implementation path for QUIC/mTLS @@ -59,6 +45,15 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm `shutdown`, `process_list`, `resource_usage`) now delegate through the grouped handlers as compatibility adapters. +### Changed + +- Simplified `list_files` to return only file names instead of unused size and + modified-time data. + +### Removed + +- Removed the direct `chrono` dependency. + ### Fixed - Fix `NetplanYaml::merge` bridge handling so `bridges` is no longer dropped diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 08c3624..5184739 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -933,12 +933,18 @@ mod tests { async fn dispatch_reboot_over_live_connection() { use std::sync::atomic::Ordering; + use review_protocol::server::node::NodePowerOutcome; + use review_protocol::types::node::NodePowerRequest; + let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - let result = server.send_reboot_cmd().await; - assert!(result.is_ok(), "reboot should be accepted: {result:?}"); + let result = server.node_power(NodePowerRequest::Reboot).await; + assert!( + matches!(result, Ok(NodePowerOutcome::Sent)), + "reboot should be accepted: {result:?}" + ); // Give the background reboot task time to call the mock executor. for _ in 0..50 { @@ -959,12 +965,18 @@ mod tests { async fn dispatch_shutdown_over_live_connection() { use std::sync::atomic::Ordering; + use review_protocol::server::node::NodePowerOutcome; + use review_protocol::types::node::NodePowerRequest; + let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - let result = server.send_shutdown_cmd().await; - assert!(result.is_ok(), "shutdown should be accepted: {result:?}"); + let result = server.node_power(NodePowerRequest::Shutdown).await; + assert!( + matches!(result, Ok(NodePowerOutcome::Sent)), + "shutdown should be accepted: {result:?}" + ); for _ in 0..50 { if mock.power_off_count.load(Ordering::SeqCst) > 0 { @@ -982,17 +994,28 @@ mod tests { #[cfg(not(target_os = "linux"))] #[tokio::test] async fn dispatch_reboot_over_live_connection_non_linux() { + use std::sync::atomic::Ordering; + + use review_protocol::server::node::NodePowerOutcome; + use review_protocol::types::node::NodePowerRequest; + let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - let err = server - .send_reboot_cmd() - .await - .expect_err("immediate reboot is Linux-only"); + let result = server.node_power(NodePowerRequest::Reboot).await; assert!( - err.to_string().contains("invalid command"), - "expected 'invalid command', got: {err}" + matches!(result, Ok(NodePowerOutcome::Sent)), + "request should be sent: {result:?}" + ); + + for _ in 0..50 { + tokio::task::yield_now().await; + } + assert_eq!( + mock.reboot_count.load(Ordering::SeqCst), + 0, + "immediate reboot must not invoke the executor on non-Linux" ); drop(server); @@ -1002,17 +1025,28 @@ mod tests { #[cfg(not(target_os = "linux"))] #[tokio::test] async fn dispatch_shutdown_over_live_connection_non_linux() { + use std::sync::atomic::Ordering; + + use review_protocol::server::node::NodePowerOutcome; + use review_protocol::types::node::NodePowerRequest; + let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - let err = server - .send_shutdown_cmd() - .await - .expect_err("immediate shutdown is Linux-only"); + let result = server.node_power(NodePowerRequest::Shutdown).await; assert!( - err.to_string().contains("invalid command"), - "expected 'invalid command', got: {err}" + matches!(result, Ok(NodePowerOutcome::Sent)), + "request should be sent: {result:?}" + ); + + for _ in 0..50 { + tokio::task::yield_now().await; + } + assert_eq!( + mock.power_off_count.load(Ordering::SeqCst), + 0, + "immediate shutdown must not invoke the executor on non-Linux" ); drop(server); @@ -1024,7 +1058,8 @@ mod tests { async fn dispatch_node_power_reboot_over_live_connection() { use std::sync::atomic::Ordering; - use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; + use review_protocol::server::node::NodePowerOutcome; + use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); @@ -1034,7 +1069,7 @@ mod tests { .node_power(NodePowerRequest::Reboot) .await .expect("node_power reboot should succeed"); - assert_eq!(resp, NodePowerResponse::Initiated); + assert!(matches!(resp, NodePowerOutcome::Sent)); for _ in 0..50 { if mock.reboot_count.load(Ordering::SeqCst) > 0 { @@ -1051,21 +1086,26 @@ mod tests { #[cfg(not(target_os = "linux"))] #[tokio::test] async fn dispatch_node_power_reboot_over_live_connection_non_linux() { + use std::sync::atomic::Ordering; + + use review_protocol::server::node::NodePowerOutcome; use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerExecutor::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - let err = server - .node_power(NodePowerRequest::Reboot) - .await - .expect_err("immediate reboot is Linux-only"); + let result = server.node_power(NodePowerRequest::Reboot).await; assert!( - err.to_string().contains("invalid command"), - "expected 'invalid command', got: {err}" + matches!(result, Ok(NodePowerOutcome::Sent)), + "node_power reboot should be sent: {result:?}" ); + for _ in 0..50 { + tokio::task::yield_now().await; + } + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + drop(server); let _ = task.await.expect("dispatch task should not panic"); } @@ -1074,6 +1114,7 @@ mod tests { async fn dispatch_node_power_graceful_reboot_over_live_connection() { use std::sync::atomic::Ordering; + use review_protocol::server::node::NodePowerOutcome; use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; let (inner, server, _endpoint) = setup_test_connection().await; @@ -1084,7 +1125,10 @@ mod tests { .node_power(NodePowerRequest::GracefulReboot) .await .expect("graceful reboot should succeed"); - assert_eq!(resp, NodePowerResponse::Initiated); + assert!(matches!( + resp, + NodePowerOutcome::Response(NodePowerResponse::Initiated) + )); assert_eq!(mock.graceful_reboot_count.load(Ordering::SeqCst), 1); // Graceful path must not trigger immediate reboot. assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); @@ -1097,6 +1141,7 @@ mod tests { async fn dispatch_node_power_graceful_shutdown_over_live_connection() { use std::sync::atomic::Ordering; + use review_protocol::server::node::NodePowerOutcome; use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; let (inner, server, _endpoint) = setup_test_connection().await; @@ -1107,7 +1152,10 @@ mod tests { .node_power(NodePowerRequest::GracefulShutdown) .await .expect("graceful shutdown should succeed"); - assert_eq!(resp, NodePowerResponse::Initiated); + assert!(matches!( + resp, + NodePowerOutcome::Response(NodePowerResponse::Initiated) + )); assert_eq!(mock.graceful_power_off_count.load(Ordering::SeqCst), 1); drop(server); @@ -1186,7 +1234,9 @@ mod tests { } }); - let result = server.get_resource_usage().await; + let result = server + .node_observation(NodeObservationRequest::ResourceUsage) + .await; assert!(result.is_err(), "should fail: handler is unimplemented"); let task_err = task.await.expect_err("task should have panicked"); @@ -1207,7 +1257,9 @@ mod tests { } }); - let result = server.get_process_list().await; + let result = server + .node_observation(NodeObservationRequest::ProcessList) + .await; assert!(result.is_err(), "should fail: handler is unimplemented"); let task_err = task.await.expect_err("task should have panicked"); From 107882419f8c4dac5dd8fffb5c5d1af5b3c4704f Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Thu, 21 May 2026 14:21:26 +0000 Subject: [PATCH 3/9] Update power handling to review-protocol 0.19.0 Align roxyd's node power handling with review-protocol 0.19.0 semantics: immediate Reboot/Shutdown are fire-and-forget on the wire and are spawned as background tasks (Linux only). Graceful operations still follow request/response and return NodePowerResponse::Initiated. Remove the PendingPowerOperation/PowerAction types and the old release-ordering machinery; simplify dispatch to call review_protocol::request::handle directly. Retain PowerExecutor and MockPowerExecutor for testability. Update CHANGELOG; roxyd tests pass and clippy is clean. --- CHANGELOG.md | 19 ++- src/bin/roxyd/control.rs | 56 +-------- src/bin/roxyd/handlers/power.rs | 217 ++++++++------------------------ 3 files changed, 64 insertions(+), 228 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 299b714..c756622 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,17 +8,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Implement the full `node_power` family directly in `roxyd`. Immediate - `NodePowerRequest::Reboot` and `NodePowerRequest::Shutdown` (and the legacy - flat `reboot`/`shutdown` compatibility requests) now prepare a pending - reboot or power-off operation and only execute the destructive system call - after `roxyd` has written `NodePowerResponse::Initiated` successfully on - the request stream. If the response write fails, the pending operation is - cancelled without rebooting or powering off. Immediate variants remain - Linux-only and return `"invalid command"` on other platforms. - `NodePowerRequest::GracefulReboot` and `NodePowerRequest::GracefulShutdown` - spawn the platform's standard reboot/poweroff command and return - `"fail"` if the process could not be started. +- `roxyd` now handles node power-control requests from a Manager (immediate + and graceful reboot/shutdown), replacing the previous unimplemented + scaffolding. On Linux, immediate reboot and shutdown run in the background; + grouped `node_power` requests do not return a protocol response for these + operations. Graceful reboot and shutdown spawn the platform's standard + reboot or poweroff command and report success or `"fail"` to the Manager. + Legacy flat `reboot` and `shutdown` requests use the same behavior. + Immediate reboot and shutdown are not supported on non-Linux platforms. ## [0.6.0] - 2026-04-16 diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 5184739..0ce47fa 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -227,18 +227,7 @@ async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> dispatch_with_executor(send, recv, Arc::new(SystemPowerExecutor)).await } -/// Power-aware dispatch entry that takes a caller-supplied -/// [`PowerExecutor`]. -/// -/// Immediate `NodePowerRequest::Reboot` and `NodePowerRequest::Shutdown` -/// (and the legacy flat `reboot`/`shutdown` compatibility paths) prepare a -/// [`handlers::power::PendingPowerOperation`] inside the handler but do not -/// execute the destructive system call. After -/// [`review_protocol::request::handle`] returns successfully — which means -/// the `NodePowerResponse::Initiated` response has been written to the -/// stream — this function releases the pending operation. If the response -/// write fails, the pending operation is dropped without being released and -/// the executor is never invoked. +/// Power-aware dispatch entry that takes a caller-supplied [`PowerExecutor`]. /// /// # Errors /// @@ -249,19 +238,9 @@ async fn dispatch_with_executor( executor: Arc, ) -> Result<()> { let mut handler = RequestHandler::new(executor); - let result = review_protocol::request::handle(&mut handler, send, recv) + review_protocol::request::handle(&mut handler, send, recv) .await - .map_err(|e| anyhow::anyhow!("{e}")); - - if result.is_ok() { - // The response stream wrote successfully; release any prepared - // immediate power operations so they proceed to reboot/poweroff. - let _join_handles = handler.power.release_pending(); - } - // On error, `handler` is dropped here; any unreleased pending power - // operations observe the closed sender and exit without rebooting. - - result + .map_err(|e| anyhow::anyhow!("{e}")) } /// Request handler that maps review-protocol requests into roxyd handlers. @@ -1191,35 +1170,6 @@ mod tests { /// reaches the peer (simulated here by closing the stream from the /// server side after the connection-level send), the pending immediate /// power operation is not released and the executor is never invoked. - #[tokio::test] - async fn pending_reboot_cancelled_on_dispatch_error() { - use std::sync::atomic::Ordering; - - // Build a handler and call into it directly to simulate the dispatch - // path. This isolates the cancellation invariant from the QUIC - // transport. - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); - let mut handler = super::RequestHandler::new(mock.clone() as Arc); - - // Drive the node_power handler directly. On non-Linux this returns - // an error and there is nothing pending — the assertion holds. - let _ = review_protocol::request::Handler::node_power( - &mut handler, - review_protocol::types::node::NodePowerRequest::Reboot, - ) - .await; - - // Drop without calling release_pending — simulates a response-write - // failure in dispatch_with_executor. - drop(handler); - - for _ in 0..20 { - tokio::task::yield_now().await; - } - - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); - } - #[tokio::test] async fn dispatch_resource_usage_over_live_connection() { let (inner, server, _endpoint) = setup_test_connection().await; diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index 1cb1e1e..bc09d20 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -1,22 +1,18 @@ //! Power-control request handling. //! -//! Immediate `Reboot` and `Shutdown` requests are split into two phases: the -//! request handler *prepares* a [`PendingPowerOperation`] and returns -//! `NodePowerResponse::Initiated` without actually rebooting or powering off. -//! The dispatch layer is responsible for [releasing](PowerHandler::release_pending) -//! the pending operation only after the success response has been written -//! successfully. If the response write fails, the pending operation is dropped -//! and the destructive system call is never made. +//! Immediate [`NodePowerRequest::Reboot`] and [`NodePowerRequest::Shutdown`] +//! are fire-and-forget under review-protocol 0.19.0: the dispatch layer does +//! not send a wire response. The handler spawns the destructive system call in +//! the background so legacy flat `reboot`/`shutdown` compatibility paths can +//! still return before the operation runs. //! -//! Graceful variants spawn the standard platform reboot/poweroff command and -//! return `Initiated` on successful spawn, `"fail"` otherwise. +//! Graceful variants spawn the platform reboot/poweroff command and return +//! [`NodePowerResponse::Initiated`] on successful spawn, `"fail"` otherwise. use std::process::Command; use std::sync::Arc; use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; -use tokio::sync::oneshot; -use tokio::task::JoinHandle; const ERR_INVALID_COMMAND: &str = "invalid command"; const ERR_FAIL: &str = "fail"; @@ -126,75 +122,23 @@ impl PowerExecutor for SystemPowerExecutor { } } -/// Which immediate power action a [`PendingPowerOperation`] represents. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub(crate) enum PowerAction { - Reboot, - PowerOff, -} - -/// An immediate reboot or power-off intent that has been prepared but not -/// yet executed. -/// -/// The pending operation owns a oneshot release channel and a background -/// task that waits on the channel before calling the destructive system -/// operation. If the operation is dropped without being released (for -/// example, because the response write failed), the sender side of the -/// channel is dropped, the background task observes the closed channel, -/// and the executor is never invoked. -pub(crate) struct PendingPowerOperation { - release_tx: oneshot::Sender<()>, - task: JoinHandle<()>, -} - -impl PendingPowerOperation { - /// Prepares a pending operation. Spawns a background task that waits - /// for the release signal before calling the executor. - #[cfg_attr(not(target_os = "linux"), allow(dead_code))] - pub(crate) fn prepare(executor: Arc, action: PowerAction) -> Self { - let (release_tx, release_rx) = oneshot::channel::<()>(); - let task = tokio::spawn(async move { - if release_rx.await.is_ok() { - match action { - PowerAction::Reboot => executor.reboot(), - PowerAction::PowerOff => executor.power_off(), - } - } - }); - Self { release_tx, task } - } - - /// Releases the pending operation. The background task receives the - /// signal and proceeds to call the executor. The returned - /// [`JoinHandle`] resolves once the executor returns; tests can await - /// it to observe the call. Production callers may ignore the handle. - pub(crate) fn release(self) -> JoinHandle<()> { - let _ = self.release_tx.send(()); - self.task - } -} - /// Per-stream handler state for power requests. -/// -/// Owns a shared [`PowerExecutor`] and accumulates pending immediate -/// operations during a request. The dispatch layer must call -/// [`release_pending`](Self::release_pending) only after the success -/// response has been written successfully on the request stream. pub(crate) struct PowerHandler { executor: Arc, - pending: Vec, } impl PowerHandler { pub(crate) fn new(executor: Arc) -> Self { - Self { - executor, - pending: Vec::new(), - } + Self { executor } } /// Handles a [`NodePowerRequest`]. /// + /// Immediate variants spawn the system call in the background and return + /// without waiting for it to complete. The return value is not sent on the + /// wire for grouped `NodePower` requests (review-protocol 0.19.0), but is + /// still used by legacy flat `reboot`/`shutdown` compatibility paths. + /// /// # Errors /// /// Returns `Err("invalid command")` for immediate requests on platforms @@ -206,24 +150,41 @@ impl PowerHandler { req: NodePowerRequest, ) -> Result { match req { - NodePowerRequest::Reboot => self.prepare_immediate(PowerAction::Reboot), - NodePowerRequest::Shutdown => self.prepare_immediate(PowerAction::PowerOff), + NodePowerRequest::Reboot => self.immediate_reboot(), + NodePowerRequest::Shutdown => self.immediate_shutdown(), NodePowerRequest::GracefulReboot => self.graceful_reboot(), NodePowerRequest::GracefulShutdown => self.graceful_power_off(), } } - #[allow(clippy::unused_self)] - fn prepare_immediate(&mut self, action: PowerAction) -> Result { + #[cfg_attr(not(target_os = "linux"), allow(clippy::unused_self))] + fn immediate_reboot(&self) -> Result { #[cfg(target_os = "linux")] { - let op = PendingPowerOperation::prepare(self.executor.clone(), action); - self.pending.push(op); + let executor = self.executor.clone(); + tokio::spawn(async move { + executor.reboot(); + }); + Ok(NodePowerResponse::Initiated) + } + #[cfg(not(target_os = "linux"))] + { + Err(ERR_INVALID_COMMAND.to_string()) + } + } + + #[cfg_attr(not(target_os = "linux"), allow(clippy::unused_self))] + fn immediate_shutdown(&self) -> Result { + #[cfg(target_os = "linux")] + { + let executor = self.executor.clone(); + tokio::spawn(async move { + executor.power_off(); + }); Ok(NodePowerResponse::Initiated) } #[cfg(not(target_os = "linux"))] { - let _ = action; Err(ERR_INVALID_COMMAND.to_string()) } } @@ -241,23 +202,6 @@ impl PowerHandler { Err(()) => Err(ERR_FAIL.to_string()), } } - - /// Releases all pending immediate power operations. - /// - /// Returns the per-operation [`JoinHandle`]s so that callers (typically - /// tests) can await the executor calls. Production callers may drop the - /// returned vector. - pub(crate) fn release_pending(&mut self) -> Vec> { - self.pending - .drain(..) - .map(PendingPowerOperation::release) - .collect() - } - - #[cfg(test)] - pub(crate) fn pending_count(&self) -> usize { - self.pending.len() - } } #[cfg(test)] @@ -317,102 +261,47 @@ mod tests { use super::mock::MockPowerExecutor; use super::*; - #[tokio::test] - async fn pending_operation_executes_only_after_release() { - let mock = Arc::new(MockPowerExecutor::default()); - let executor: Arc = mock.clone(); - let pending = PendingPowerOperation::prepare(executor, PowerAction::Reboot); - - // Give the background task a chance to run; it should still be - // waiting on the release channel. - tokio::task::yield_now().await; - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); - - let task = pending.release(); - task.await.expect("background task should complete"); - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn pending_operation_dropped_without_release_does_not_execute() { - let mock = Arc::new(MockPowerExecutor::default()); - let executor: Arc = mock.clone(); - let pending = PendingPowerOperation::prepare(executor, PowerAction::PowerOff); - - // Drop without releasing. The background task should observe the - // closed sender and exit without calling the executor. - drop(pending); - - // Yield repeatedly to allow the background task to finish. - for _ in 0..10 { - tokio::task::yield_now().await; - } - - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); - assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 0); - } - #[cfg(target_os = "linux")] #[tokio::test] - async fn handle_reboot_on_linux_prepares_pending_op() { + async fn handle_reboot_on_linux_spawns_immediate_action() { let mock = Arc::new(MockPowerExecutor::default()); let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::Reboot) .await - .expect("reboot should return Initiated"); + .expect("reboot should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); - assert_eq!(handler.pending_count(), 1); assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); - let handles = handler.release_pending(); - for handle in handles { - handle.await.expect("background task should complete"); + for _ in 0..50 { + if mock.reboot_count.load(Ordering::SeqCst) > 0 { + break; + } + tokio::task::yield_now().await; } assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); } #[cfg(target_os = "linux")] #[tokio::test] - async fn handle_shutdown_on_linux_prepares_pending_op() { + async fn handle_shutdown_on_linux_spawns_immediate_action() { let mock = Arc::new(MockPowerExecutor::default()); let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::Shutdown) .await - .expect("shutdown should return Initiated"); + .expect("shutdown should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); - assert_eq!(handler.pending_count(), 1); - let handles = handler.release_pending(); - for handle in handles { - handle.await.expect("background task should complete"); - } - assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); - } - - #[cfg(target_os = "linux")] - #[tokio::test] - async fn handle_reboot_dropped_without_release_does_not_reboot() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - - let _ = handler - .handle(NodePowerRequest::Reboot) - .await - .expect("reboot should return Initiated"); - - // Simulate a response-write failure by dropping the handler without - // calling release_pending. - drop(handler); - - for _ in 0..10 { + for _ in 0..50 { + if mock.power_off_count.load(Ordering::SeqCst) > 0 { + break; + } tokio::task::yield_now().await; } - - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); + assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); } #[cfg(not(target_os = "linux"))] @@ -426,7 +315,7 @@ mod tests { .await .expect_err("reboot should be unsupported on non-Linux"); assert_eq!(err, ERR_INVALID_COMMAND); - assert_eq!(handler.pending_count(), 0); + assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); } #[cfg(not(target_os = "linux"))] @@ -440,7 +329,7 @@ mod tests { .await .expect_err("shutdown should be unsupported on non-Linux"); assert_eq!(err, ERR_INVALID_COMMAND); - assert_eq!(handler.pending_count(), 0); + assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 0); } #[tokio::test] From e25e3ba26272bb60db5f9edc5efdae56db04764f Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Fri, 22 May 2026 23:55:52 +0000 Subject: [PATCH 4/9] Rename executor to backend and simplify dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename power-related executor types to "backend" (PowerExecutor → PowerBackend, SystemPowerExecutor → SystemPowerBackend, MockPowerExecutor → MockPowerBackend) and rename the PowerHandler field from `executor` to `backend`. Simplify dispatch by removing dispatch_with_executor; production dispatch() now uses RequestHandler::default() and calls review_protocol::request::handle directly. Add a #[cfg(test)] RequestHandler::with_power_backend() seam for tests and have spawn_dispatch_loop_with_mock build a handler with the mock backend and call review_protocol::request::handle in the test loop. Remove the stale comment above dispatch_resource_usage_over_live_connection and restore the existing [Unreleased] CHANGELOG entries. Verified with cargo test --bin roxyd (32 passed) and cargo clippy clean. --- CHANGELOG.md | 15 +++++-- src/bin/roxyd/control.rs | 70 +++++++++++++++---------------- src/bin/roxyd/handlers/power.rs | 74 ++++++++++++++++----------------- 3 files changed, 82 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c756622..3b2540a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- Add explicit shutdown path for `roxyd` that handles OS signals + (SIGINT/SIGTERM), cancels any in-progress connection attempt or + accept/reconnect loop cleanly, and logs shutdown lifecycle events. - `roxyd` now handles node power-control requests from a Manager (immediate and graceful reboot/shutdown), replacing the previous unimplemented scaffolding. On Linux, immediate reboot and shutdown run in the background; @@ -17,13 +20,19 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm Legacy flat `reboot` and `shutdown` requests use the same behavior. Immediate reboot and shutdown are not supported on non-Linux platforms. +### Changed + +- Simplified `list_files` to return only file names instead of unused size and + modified-time data. + +### Removed + +- Removed the direct `chrono` dependency. + ## [0.6.0] - 2026-04-16 ### Added -- Add explicit shutdown path for `roxyd` that handles OS signals - (SIGINT/SIGTERM), cancels any in-progress connection attempt or - accept/reconnect loop cleanly, and logs shutdown lifecycle events. - Add tracing in `roxyd` for incoming `review-protocol` request dispatch, logging the selected handler group and request identifier. - Add `roxyd` binary entrypoint as a new implementation path for QUIC/mTLS diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 0ce47fa..2349b5a 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -18,7 +18,9 @@ use review_protocol::types::node::{ }; use tokio::sync::watch; -use super::handlers::power::{PowerExecutor, PowerHandler, SystemPowerExecutor}; +#[cfg(test)] +use super::handlers::power::PowerBackend; +use super::handlers::power::{PowerHandler, SystemPowerBackend}; use super::{handlers, settings::Settings}; /// The review-protocol version required by this client. @@ -224,20 +226,7 @@ impl Connection { /// /// Returns an error if request reading or response sending fails. async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Result<()> { - dispatch_with_executor(send, recv, Arc::new(SystemPowerExecutor)).await -} - -/// Power-aware dispatch entry that takes a caller-supplied [`PowerExecutor`]. -/// -/// # Errors -/// -/// Returns an error if request reading or response sending fails. -async fn dispatch_with_executor( - send: &mut quinn::SendStream, - recv: &mut quinn::RecvStream, - executor: Arc, -) -> Result<()> { - let mut handler = RequestHandler::new(executor); + let mut handler = RequestHandler::default(); review_protocol::request::handle(&mut handler, send, recv) .await .map_err(|e| anyhow::anyhow!("{e}")) @@ -257,10 +246,19 @@ struct RequestHandler { power: PowerHandler, } +impl Default for RequestHandler { + fn default() -> Self { + Self { + power: PowerHandler::new(Arc::new(SystemPowerBackend)), + } + } +} + +#[cfg(test)] impl RequestHandler { - fn new(executor: Arc) -> Self { + fn with_power_backend(backend: Arc) -> Self { Self { - power: PowerHandler::new(executor), + power: PowerHandler::new(backend), } } } @@ -886,20 +884,22 @@ mod tests { // -- Tests: RequestCode dispatch over live connection -------------- - /// Spawns a dispatch loop that uses the provided mock executor for every + /// Spawns a dispatch loop that uses the provided mock power backend for every /// stream accepted on this connection. Returns the task handle so the /// caller can drive shutdown. fn spawn_dispatch_loop_with_mock( inner: review_protocol::client::Connection, - mock: Arc, + mock: Arc, ) -> tokio::task::JoinHandle> { - let executor: Arc = mock; + let backend: Arc = mock; tokio::spawn(async move { loop { let Ok((mut send, mut recv)) = inner.accept_bi().await else { return Ok::<(), anyhow::Error>(()); }; - if let Err(e) = dispatch_with_executor(&mut send, &mut recv, executor.clone()).await + let mut handler = super::RequestHandler::with_power_backend(backend.clone()); + if let Err(e) = + review_protocol::request::handle(&mut handler, &mut send, &mut recv).await { tracing::error!("Request handling failed: {e}"); } @@ -916,7 +916,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let result = server.node_power(NodePowerRequest::Reboot).await; @@ -925,7 +925,7 @@ mod tests { "reboot should be accepted: {result:?}" ); - // Give the background reboot task time to call the mock executor. + // Give the background reboot task time to call the mock backend. for _ in 0..50 { if mock.reboot_count.load(Ordering::SeqCst) > 0 { break; @@ -948,7 +948,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let result = server.node_power(NodePowerRequest::Shutdown).await; @@ -979,7 +979,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let result = server.node_power(NodePowerRequest::Reboot).await; @@ -994,7 +994,7 @@ mod tests { assert_eq!( mock.reboot_count.load(Ordering::SeqCst), 0, - "immediate reboot must not invoke the executor on non-Linux" + "immediate reboot must not invoke the backend on non-Linux" ); drop(server); @@ -1010,7 +1010,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let result = server.node_power(NodePowerRequest::Shutdown).await; @@ -1025,7 +1025,7 @@ mod tests { assert_eq!( mock.power_off_count.load(Ordering::SeqCst), 0, - "immediate shutdown must not invoke the executor on non-Linux" + "immediate shutdown must not invoke the backend on non-Linux" ); drop(server); @@ -1041,7 +1041,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let resp = server @@ -1071,7 +1071,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let result = server.node_power(NodePowerRequest::Reboot).await; @@ -1097,7 +1097,7 @@ mod tests { use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let resp = server @@ -1124,7 +1124,7 @@ mod tests { use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); let resp = server @@ -1148,7 +1148,7 @@ mod tests { use review_protocol::types::node::NodePowerRequest; let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerExecutor::default()); + let mock = Arc::new(handlers::power::MockPowerBackend::default()); mock.graceful_reboot_fail.store(true, Ordering::SeqCst); let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); @@ -1166,10 +1166,6 @@ mod tests { let _ = task.await.expect("dispatch task should not panic"); } - /// Verifies that when the response stream is dropped before the response - /// reaches the peer (simulated here by closing the stream from the - /// server side after the connection-level send), the pending immediate - /// power operation is not released and the executor is never invoked. #[tokio::test] async fn dispatch_resource_usage_over_live_connection() { let (inner, server, _endpoint) = setup_test_connection().await; diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index bc09d20..7aec95d 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -17,11 +17,11 @@ use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; const ERR_INVALID_COMMAND: &str = "invalid command"; const ERR_FAIL: &str = "fail"; -/// Executes the platform-specific power-control operations. +/// Performs the platform-specific power-control operations. /// -/// Production code uses [`SystemPowerExecutor`]; tests inject a mock so that +/// Production code uses [`SystemPowerBackend`]; tests inject a mock so that /// power operations can be observed without actually rebooting the host. -pub(crate) trait PowerExecutor: Send + Sync { +pub(crate) trait PowerBackend: Send + Sync { /// Performs an immediate reboot. On Linux this calls /// `nix::sys::reboot::reboot`, which does not return on success. /// @@ -50,11 +50,11 @@ pub(crate) trait PowerExecutor: Send + Sync { fn graceful_power_off(&self) -> Result<(), ()>; } -/// Production executor that triggers reboot/power-off via `nix::sys::reboot` +/// Production backend that triggers reboot/power-off via `nix::sys::reboot` /// (immediate) and the platform's standard CLI tools (graceful). -pub(crate) struct SystemPowerExecutor; +pub(crate) struct SystemPowerBackend; -impl PowerExecutor for SystemPowerExecutor { +impl PowerBackend for SystemPowerBackend { fn reboot(&self) { #[cfg(target_os = "linux")] { @@ -124,12 +124,12 @@ impl PowerExecutor for SystemPowerExecutor { /// Per-stream handler state for power requests. pub(crate) struct PowerHandler { - executor: Arc, + backend: Arc, } impl PowerHandler { - pub(crate) fn new(executor: Arc) -> Self { - Self { executor } + pub(crate) fn new(backend: Arc) -> Self { + Self { backend } } /// Handles a [`NodePowerRequest`]. @@ -161,9 +161,9 @@ impl PowerHandler { fn immediate_reboot(&self) -> Result { #[cfg(target_os = "linux")] { - let executor = self.executor.clone(); + let backend = self.backend.clone(); tokio::spawn(async move { - executor.reboot(); + backend.reboot(); }); Ok(NodePowerResponse::Initiated) } @@ -177,9 +177,9 @@ impl PowerHandler { fn immediate_shutdown(&self) -> Result { #[cfg(target_os = "linux")] { - let executor = self.executor.clone(); + let backend = self.backend.clone(); tokio::spawn(async move { - executor.power_off(); + backend.power_off(); }); Ok(NodePowerResponse::Initiated) } @@ -190,14 +190,14 @@ impl PowerHandler { } fn graceful_reboot(&self) -> Result { - match self.executor.graceful_reboot() { + match self.backend.graceful_reboot() { Ok(()) => Ok(NodePowerResponse::Initiated), Err(()) => Err(ERR_FAIL.to_string()), } } fn graceful_power_off(&self) -> Result { - match self.executor.graceful_power_off() { + match self.backend.graceful_power_off() { Ok(()) => Ok(NodePowerResponse::Initiated), Err(()) => Err(ERR_FAIL.to_string()), } @@ -205,18 +205,18 @@ impl PowerHandler { } #[cfg(test)] -pub(crate) use mock::MockPowerExecutor; +pub(crate) use mock::MockPowerBackend; #[cfg(test)] pub(crate) mod mock { use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use super::PowerExecutor; + use super::PowerBackend; - /// In-memory mock executor used by tests. Records call counts and can + /// In-memory mock backend used by tests. Records call counts and can /// be configured to fail graceful operations. #[derive(Default)] - pub(crate) struct MockPowerExecutor { + pub(crate) struct MockPowerBackend { pub reboot_count: AtomicUsize, pub power_off_count: AtomicUsize, pub graceful_reboot_count: AtomicUsize, @@ -225,7 +225,7 @@ pub(crate) mod mock { pub graceful_power_off_fail: AtomicBool, } - impl PowerExecutor for MockPowerExecutor { + impl PowerBackend for MockPowerBackend { fn reboot(&self) { self.reboot_count.fetch_add(1, Ordering::SeqCst); } @@ -258,14 +258,14 @@ pub(crate) mod mock { mod tests { use std::sync::atomic::Ordering; - use super::mock::MockPowerExecutor; + use super::mock::MockPowerBackend; use super::*; #[cfg(target_os = "linux")] #[tokio::test] async fn handle_reboot_on_linux_spawns_immediate_action() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::Reboot) @@ -286,8 +286,8 @@ mod tests { #[cfg(target_os = "linux")] #[tokio::test] async fn handle_shutdown_on_linux_spawns_immediate_action() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::Shutdown) @@ -307,8 +307,8 @@ mod tests { #[cfg(not(target_os = "linux"))] #[tokio::test] async fn handle_reboot_on_non_linux_returns_invalid_command() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let err = handler .handle(NodePowerRequest::Reboot) @@ -321,8 +321,8 @@ mod tests { #[cfg(not(target_os = "linux"))] #[tokio::test] async fn handle_shutdown_on_non_linux_returns_invalid_command() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let err = handler .handle(NodePowerRequest::Shutdown) @@ -334,8 +334,8 @@ mod tests { #[tokio::test] async fn handle_graceful_reboot_returns_initiated_on_success() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::GracefulReboot) @@ -347,9 +347,9 @@ mod tests { #[tokio::test] async fn handle_graceful_reboot_returns_fail_on_spawn_error() { - let mock = Arc::new(MockPowerExecutor::default()); + let mock = Arc::new(MockPowerBackend::default()); mock.graceful_reboot_fail.store(true, Ordering::SeqCst); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mut handler = PowerHandler::new(mock.clone() as Arc); let err = handler .handle(NodePowerRequest::GracefulReboot) @@ -360,8 +360,8 @@ mod tests { #[tokio::test] async fn handle_graceful_shutdown_returns_initiated_on_success() { - let mock = Arc::new(MockPowerExecutor::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mock = Arc::new(MockPowerBackend::default()); + let mut handler = PowerHandler::new(mock.clone() as Arc); let resp = handler .handle(NodePowerRequest::GracefulShutdown) @@ -373,9 +373,9 @@ mod tests { #[tokio::test] async fn handle_graceful_shutdown_returns_fail_on_spawn_error() { - let mock = Arc::new(MockPowerExecutor::default()); + let mock = Arc::new(MockPowerBackend::default()); mock.graceful_power_off_fail.store(true, Ordering::SeqCst); - let mut handler = PowerHandler::new(mock.clone() as Arc); + let mut handler = PowerHandler::new(mock.clone() as Arc); let err = handler .handle(NodePowerRequest::GracefulShutdown) From 33bfccd1de9d20a123a0a9eb0c2f8c5f06036a1a Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 01:46:40 +0000 Subject: [PATCH 5/9] Migrate power handling to module handler Replace the PowerHandler struct with a module-level handlers::power::handle function and have RequestHandler carry Arc and delegate to handlers::power::handle(req, backend). This aligns power routing with other roxyd handlers and keeps PowerBackend/SystemPowerBackend intact for testing. Offload OS-level work to blocking threads: immediate Reboot/Shutdown use tokio::task::spawn_blocking as fire-and-forget, while graceful variants await spawn_blocking(...) and map the result to Initiated/fail. Immediate helper functions were made synchronous where they don't await. Also fix PowerBackend import visibility, resolve clippy/linux dead-code issues, and clean up CHANGELOG (remove duplicate 0.6.0 section; keep other unreleased notes). Local clippy and tests pass. --- CHANGELOG.md | 9 -- src/bin/roxyd/control.rs | 12 +-- src/bin/roxyd/handlers/power.rs | 160 +++++++++++++------------------- 3 files changed, 69 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b2540a..7fb0c86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,15 +51,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm `shutdown`, `process_list`, `resource_usage`) now delegate through the grouped handlers as compatibility adapters. -### Changed - -- Simplified `list_files` to return only file names instead of unused size and - modified-time data. - -### Removed - -- Removed the direct `chrono` dependency. - ### Fixed - Fix `NetplanYaml::merge` bridge handling so `bridges` is no longer dropped diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 2349b5a..d370e26 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -18,9 +18,7 @@ use review_protocol::types::node::{ }; use tokio::sync::watch; -#[cfg(test)] -use super::handlers::power::PowerBackend; -use super::handlers::power::{PowerHandler, SystemPowerBackend}; +use super::handlers::power::{PowerBackend, SystemPowerBackend}; use super::{handlers, settings::Settings}; /// The review-protocol version required by this client. @@ -243,13 +241,13 @@ async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> /// `resource_usage`) are temporary protocol-compatibility adapters that /// route through the grouped handlers. struct RequestHandler { - power: PowerHandler, + power_backend: Arc, } impl Default for RequestHandler { fn default() -> Self { Self { - power: PowerHandler::new(Arc::new(SystemPowerBackend)), + power_backend: Arc::new(SystemPowerBackend), } } } @@ -258,7 +256,7 @@ impl Default for RequestHandler { impl RequestHandler { fn with_power_backend(backend: Arc) -> Self { Self { - power: PowerHandler::new(backend), + power_backend: backend, } } } @@ -309,7 +307,7 @@ impl review_protocol::request::Handler for RequestHandler { async fn node_power(&mut self, req: NodePowerRequest) -> Result { tracing::info!(handler_group = "node_power", request = %req.service_id(), "Dispatching request"); - self.power.handle(req).await + handlers::power::handle(req, self.power_backend.clone()).await } async fn node_observation( diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index 7aec95d..c741b89 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -2,18 +2,20 @@ //! //! Immediate [`NodePowerRequest::Reboot`] and [`NodePowerRequest::Shutdown`] //! are fire-and-forget under review-protocol 0.19.0: the dispatch layer does -//! not send a wire response. The handler spawns the destructive system call in -//! the background so legacy flat `reboot`/`shutdown` compatibility paths can +//! not send a wire response. The handler runs the destructive system call on a +//! blocking thread so legacy flat `reboot`/`shutdown` compatibility paths can //! still return before the operation runs. //! -//! Graceful variants spawn the platform reboot/poweroff command and return -//! [`NodePowerResponse::Initiated`] on successful spawn, `"fail"` otherwise. +//! Graceful variants spawn the platform reboot/poweroff command on a blocking +//! thread and return [`NodePowerResponse::Initiated`] on successful spawn, +//! `"fail"` otherwise. use std::process::Command; use std::sync::Arc; use review_protocol::types::node::{NodePowerRequest, NodePowerResponse}; +#[cfg(not(target_os = "linux"))] const ERR_INVALID_COMMAND: &str = "invalid command"; const ERR_FAIL: &str = "fail"; @@ -122,85 +124,67 @@ impl PowerBackend for SystemPowerBackend { } } -/// Per-stream handler state for power requests. -pub(crate) struct PowerHandler { +/// Handles a [`NodePowerRequest`]. +/// +/// Immediate variants run the system call on a blocking thread and return +/// without waiting for it to complete. The return value is not sent on the wire +/// for grouped `NodePower` requests (review-protocol 0.19.0), but is still used +/// by legacy flat `reboot`/`shutdown` compatibility paths. +/// +/// # Errors +/// +/// Returns `Err("invalid command")` for immediate requests on platforms where +/// they are not supported, and `Err("fail")` if a graceful operation could not +/// be initiated. +pub(crate) async fn handle( + req: NodePowerRequest, backend: Arc, +) -> Result { + match req { + NodePowerRequest::Reboot => immediate_reboot(backend), + NodePowerRequest::Shutdown => immediate_shutdown(backend), + NodePowerRequest::GracefulReboot => graceful_reboot(backend).await, + NodePowerRequest::GracefulShutdown => graceful_power_off(backend).await, + } } -impl PowerHandler { - pub(crate) fn new(backend: Arc) -> Self { - Self { backend } +fn immediate_reboot(backend: Arc) -> Result { + #[cfg(target_os = "linux")] + { + tokio::task::spawn_blocking(move || backend.reboot()); + Ok(NodePowerResponse::Initiated) } - - /// Handles a [`NodePowerRequest`]. - /// - /// Immediate variants spawn the system call in the background and return - /// without waiting for it to complete. The return value is not sent on the - /// wire for grouped `NodePower` requests (review-protocol 0.19.0), but is - /// still used by legacy flat `reboot`/`shutdown` compatibility paths. - /// - /// # Errors - /// - /// Returns `Err("invalid command")` for immediate requests on platforms - /// where they are not supported, and `Err("fail")` if a graceful - /// operation could not be initiated. - #[allow(clippy::unused_async)] - pub(crate) async fn handle( - &mut self, - req: NodePowerRequest, - ) -> Result { - match req { - NodePowerRequest::Reboot => self.immediate_reboot(), - NodePowerRequest::Shutdown => self.immediate_shutdown(), - NodePowerRequest::GracefulReboot => self.graceful_reboot(), - NodePowerRequest::GracefulShutdown => self.graceful_power_off(), - } + #[cfg(not(target_os = "linux"))] + { + drop(backend); + Err(ERR_INVALID_COMMAND.to_string()) } +} - #[cfg_attr(not(target_os = "linux"), allow(clippy::unused_self))] - fn immediate_reboot(&self) -> Result { - #[cfg(target_os = "linux")] - { - let backend = self.backend.clone(); - tokio::spawn(async move { - backend.reboot(); - }); - Ok(NodePowerResponse::Initiated) - } - #[cfg(not(target_os = "linux"))] - { - Err(ERR_INVALID_COMMAND.to_string()) - } +fn immediate_shutdown(backend: Arc) -> Result { + #[cfg(target_os = "linux")] + { + tokio::task::spawn_blocking(move || backend.power_off()); + Ok(NodePowerResponse::Initiated) } - - #[cfg_attr(not(target_os = "linux"), allow(clippy::unused_self))] - fn immediate_shutdown(&self) -> Result { - #[cfg(target_os = "linux")] - { - let backend = self.backend.clone(); - tokio::spawn(async move { - backend.power_off(); - }); - Ok(NodePowerResponse::Initiated) - } - #[cfg(not(target_os = "linux"))] - { - Err(ERR_INVALID_COMMAND.to_string()) - } + #[cfg(not(target_os = "linux"))] + { + drop(backend); + Err(ERR_INVALID_COMMAND.to_string()) } +} - fn graceful_reboot(&self) -> Result { - match self.backend.graceful_reboot() { - Ok(()) => Ok(NodePowerResponse::Initiated), - Err(()) => Err(ERR_FAIL.to_string()), - } +async fn graceful_reboot(backend: Arc) -> Result { + match tokio::task::spawn_blocking(move || backend.graceful_reboot()).await { + Ok(Ok(())) => Ok(NodePowerResponse::Initiated), + Ok(Err(())) | Err(_) => Err(ERR_FAIL.to_string()), } +} - fn graceful_power_off(&self) -> Result { - match self.backend.graceful_power_off() { - Ok(()) => Ok(NodePowerResponse::Initiated), - Err(()) => Err(ERR_FAIL.to_string()), - } +async fn graceful_power_off(backend: Arc) -> Result { + match tokio::task::spawn_blocking(move || backend.graceful_power_off()).await { + Ok(Ok(())) => Ok(NodePowerResponse::Initiated), + Ok(Err(())) | Err(_) => Err(ERR_FAIL.to_string()), } } @@ -265,10 +249,8 @@ mod tests { #[tokio::test] async fn handle_reboot_on_linux_spawns_immediate_action() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let resp = handler - .handle(NodePowerRequest::Reboot) + let resp = handle(NodePowerRequest::Reboot, mock.clone()) .await .expect("reboot should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); @@ -287,10 +269,8 @@ mod tests { #[tokio::test] async fn handle_shutdown_on_linux_spawns_immediate_action() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let resp = handler - .handle(NodePowerRequest::Shutdown) + let resp = handle(NodePowerRequest::Shutdown, mock.clone()) .await .expect("shutdown should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); @@ -308,10 +288,8 @@ mod tests { #[tokio::test] async fn handle_reboot_on_non_linux_returns_invalid_command() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let err = handler - .handle(NodePowerRequest::Reboot) + let err = handle(NodePowerRequest::Reboot, mock.clone()) .await .expect_err("reboot should be unsupported on non-Linux"); assert_eq!(err, ERR_INVALID_COMMAND); @@ -322,10 +300,8 @@ mod tests { #[tokio::test] async fn handle_shutdown_on_non_linux_returns_invalid_command() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let err = handler - .handle(NodePowerRequest::Shutdown) + let err = handle(NodePowerRequest::Shutdown, mock.clone()) .await .expect_err("shutdown should be unsupported on non-Linux"); assert_eq!(err, ERR_INVALID_COMMAND); @@ -335,10 +311,8 @@ mod tests { #[tokio::test] async fn handle_graceful_reboot_returns_initiated_on_success() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let resp = handler - .handle(NodePowerRequest::GracefulReboot) + let resp = handle(NodePowerRequest::GracefulReboot, mock.clone()) .await .expect("graceful reboot should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); @@ -349,10 +323,8 @@ mod tests { async fn handle_graceful_reboot_returns_fail_on_spawn_error() { let mock = Arc::new(MockPowerBackend::default()); mock.graceful_reboot_fail.store(true, Ordering::SeqCst); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let err = handler - .handle(NodePowerRequest::GracefulReboot) + let err = handle(NodePowerRequest::GracefulReboot, mock.clone()) .await .expect_err("graceful reboot should fail"); assert_eq!(err, ERR_FAIL); @@ -361,10 +333,8 @@ mod tests { #[tokio::test] async fn handle_graceful_shutdown_returns_initiated_on_success() { let mock = Arc::new(MockPowerBackend::default()); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let resp = handler - .handle(NodePowerRequest::GracefulShutdown) + let resp = handle(NodePowerRequest::GracefulShutdown, mock.clone()) .await .expect("graceful shutdown should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); @@ -375,10 +345,8 @@ mod tests { async fn handle_graceful_shutdown_returns_fail_on_spawn_error() { let mock = Arc::new(MockPowerBackend::default()); mock.graceful_power_off_fail.store(true, Ordering::SeqCst); - let mut handler = PowerHandler::new(mock.clone() as Arc); - let err = handler - .handle(NodePowerRequest::GracefulShutdown) + let err = handle(NodePowerRequest::GracefulShutdown, mock.clone()) .await .expect_err("graceful shutdown should fail"); assert_eq!(err, ERR_FAIL); From 94a79a6332bc71632f397760a92fd24d09a1cc7d Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 17:59:25 +0000 Subject: [PATCH 6/9] Improve power handler and control test stability Refactor Linux/non-Linux paths in the power handler to satisfy clippy (lint: unnecessary_wraps) and to ensure non-Linux handle() arms do not incorrectly wrap results. Split immediate_reboot/immediate_shutdown by platform so Linux can return NodePowerResponse directly. Revise module docs to describe the wire contract and handler responsibilities without referencing legacy response ordering. Remove duplicate control tests and replace the fixed-yield mock wait with a Linux-only wait_for_mock_count (1s timeout) to eliminate a coverage-driven race where the mock count could be observed as zero. These changes resolve clippy warnings and intermittent CI/coverage flakes. --- src/bin/roxyd/control.rs | 90 +++++---------------------------- src/bin/roxyd/handlers/power.rs | 74 ++++++++++++++------------- 2 files changed, 52 insertions(+), 112 deletions(-) diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index d370e26..fc57465 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -905,11 +905,21 @@ mod tests { }) } + /// Waits until the mock backend call count reaches `expected`. + #[cfg(target_os = "linux")] + async fn wait_for_mock_count(count: &std::sync::atomic::AtomicUsize, expected: usize) { + tokio::time::timeout(std::time::Duration::from_secs(1), async { + while count.load(std::sync::atomic::Ordering::SeqCst) < expected { + tokio::task::yield_now().await; + } + }) + .await + .expect("timed out waiting for mock backend call"); + } + #[cfg(target_os = "linux")] #[tokio::test] async fn dispatch_reboot_over_live_connection() { - use std::sync::atomic::Ordering; - use review_protocol::server::node::NodePowerOutcome; use review_protocol::types::node::NodePowerRequest; @@ -923,14 +933,7 @@ mod tests { "reboot should be accepted: {result:?}" ); - // Give the background reboot task time to call the mock backend. - for _ in 0..50 { - if mock.reboot_count.load(Ordering::SeqCst) > 0 { - break; - } - tokio::task::yield_now().await; - } - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + wait_for_mock_count(&mock.reboot_count, 1).await; drop(server); let task_result = task.await.expect("dispatch task should not panic"); @@ -940,8 +943,6 @@ mod tests { #[cfg(target_os = "linux")] #[tokio::test] async fn dispatch_shutdown_over_live_connection() { - use std::sync::atomic::Ordering; - use review_protocol::server::node::NodePowerOutcome; use review_protocol::types::node::NodePowerRequest; @@ -955,13 +956,7 @@ mod tests { "shutdown should be accepted: {result:?}" ); - for _ in 0..50 { - if mock.power_off_count.load(Ordering::SeqCst) > 0 { - break; - } - tokio::task::yield_now().await; - } - assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); + wait_for_mock_count(&mock.power_off_count, 1).await; drop(server); let task_result = task.await.expect("dispatch task should not panic"); @@ -1030,63 +1025,6 @@ mod tests { let _ = task.await.expect("dispatch task should not panic"); } - #[cfg(target_os = "linux")] - #[tokio::test] - async fn dispatch_node_power_reboot_over_live_connection() { - use std::sync::atomic::Ordering; - - use review_protocol::server::node::NodePowerOutcome; - use review_protocol::types::node::NodePowerRequest; - - let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerBackend::default()); - let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - - let resp = server - .node_power(NodePowerRequest::Reboot) - .await - .expect("node_power reboot should succeed"); - assert!(matches!(resp, NodePowerOutcome::Sent)); - - for _ in 0..50 { - if mock.reboot_count.load(Ordering::SeqCst) > 0 { - break; - } - tokio::task::yield_now().await; - } - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); - - drop(server); - let _ = task.await.expect("dispatch task should not panic"); - } - - #[cfg(not(target_os = "linux"))] - #[tokio::test] - async fn dispatch_node_power_reboot_over_live_connection_non_linux() { - use std::sync::atomic::Ordering; - - use review_protocol::server::node::NodePowerOutcome; - use review_protocol::types::node::NodePowerRequest; - - let (inner, server, _endpoint) = setup_test_connection().await; - let mock = Arc::new(handlers::power::MockPowerBackend::default()); - let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); - - let result = server.node_power(NodePowerRequest::Reboot).await; - assert!( - matches!(result, Ok(NodePowerOutcome::Sent)), - "node_power reboot should be sent: {result:?}" - ); - - for _ in 0..50 { - tokio::task::yield_now().await; - } - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); - - drop(server); - let _ = task.await.expect("dispatch task should not panic"); - } - #[tokio::test] async fn dispatch_node_power_graceful_reboot_over_live_connection() { use std::sync::atomic::Ordering; diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index c741b89..069f44c 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -1,14 +1,12 @@ //! Power-control request handling. //! -//! Immediate [`NodePowerRequest::Reboot`] and [`NodePowerRequest::Shutdown`] -//! are fire-and-forget under review-protocol 0.19.0: the dispatch layer does -//! not send a wire response. The handler runs the destructive system call on a -//! blocking thread so legacy flat `reboot`/`shutdown` compatibility paths can -//! still return before the operation runs. +//! Immediate reboot and shutdown requests are fire-and-forget: the request +//! handler accepts the command and dispatches the OS-facing operation without +//! waiting for it to complete. The operation runs on Tokio's blocking pool +//! because it may call synchronous OS APIs that do not return on success. //! -//! Graceful variants spawn the platform reboot/poweroff command on a blocking -//! thread and return [`NodePowerResponse::Initiated`] on successful spawn, -//! `"fail"` otherwise. +//! Graceful reboot and shutdown requests return an acknowledgement after +//! successfully starting the platform reboot or power-off command. use std::process::Command; use std::sync::Arc; @@ -60,8 +58,8 @@ impl PowerBackend for SystemPowerBackend { fn reboot(&self) { #[cfg(target_os = "linux")] { - if let Err(e) = nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_AUTOBOOT) { - tracing::error!("nix reboot failed: {e}"); + match nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_AUTOBOOT) { + Err(e) => tracing::error!("nix reboot failed: {e}"), } } #[cfg(not(target_os = "linux"))] @@ -73,8 +71,8 @@ impl PowerBackend for SystemPowerBackend { fn power_off(&self) { #[cfg(target_os = "linux")] { - if let Err(e) = nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF) { - tracing::error!("nix poweroff failed: {e}"); + match nix::sys::reboot::reboot(nix::sys::reboot::RebootMode::RB_POWER_OFF) { + Err(e) => tracing::error!("nix poweroff failed: {e}"), } } #[cfg(not(target_os = "linux"))] @@ -126,10 +124,10 @@ impl PowerBackend for SystemPowerBackend { /// Handles a [`NodePowerRequest`]. /// -/// Immediate variants run the system call on a blocking thread and return -/// without waiting for it to complete. The return value is not sent on the wire -/// for grouped `NodePower` requests (review-protocol 0.19.0), but is still used -/// by legacy flat `reboot`/`shutdown` compatibility paths. +/// Immediate reboot and shutdown are accepted and dispatched to the backend on +/// a blocking thread without awaiting completion. Graceful variants run the +/// platform command on a blocking thread and return [`NodePowerResponse::Initiated`] +/// after the command is successfully started. /// /// # Errors /// @@ -141,37 +139,41 @@ pub(crate) async fn handle( backend: Arc, ) -> Result { match req { + #[cfg(target_os = "linux")] + NodePowerRequest::Reboot => Ok(immediate_reboot(backend)), + #[cfg(not(target_os = "linux"))] NodePowerRequest::Reboot => immediate_reboot(backend), + #[cfg(target_os = "linux")] + NodePowerRequest::Shutdown => Ok(immediate_shutdown(backend)), + #[cfg(not(target_os = "linux"))] NodePowerRequest::Shutdown => immediate_shutdown(backend), NodePowerRequest::GracefulReboot => graceful_reboot(backend).await, NodePowerRequest::GracefulShutdown => graceful_power_off(backend).await, } } +#[cfg(target_os = "linux")] +fn immediate_reboot(backend: Arc) -> NodePowerResponse { + tokio::task::spawn_blocking(move || backend.reboot()); + NodePowerResponse::Initiated +} + +#[cfg(not(target_os = "linux"))] fn immediate_reboot(backend: Arc) -> Result { - #[cfg(target_os = "linux")] - { - tokio::task::spawn_blocking(move || backend.reboot()); - Ok(NodePowerResponse::Initiated) - } - #[cfg(not(target_os = "linux"))] - { - drop(backend); - Err(ERR_INVALID_COMMAND.to_string()) - } + drop(backend); + Err(ERR_INVALID_COMMAND.to_string()) } +#[cfg(target_os = "linux")] +fn immediate_shutdown(backend: Arc) -> NodePowerResponse { + tokio::task::spawn_blocking(move || backend.power_off()); + NodePowerResponse::Initiated +} + +#[cfg(not(target_os = "linux"))] fn immediate_shutdown(backend: Arc) -> Result { - #[cfg(target_os = "linux")] - { - tokio::task::spawn_blocking(move || backend.power_off()); - Ok(NodePowerResponse::Initiated) - } - #[cfg(not(target_os = "linux"))] - { - drop(backend); - Err(ERR_INVALID_COMMAND.to_string()) - } + drop(backend); + Err(ERR_INVALID_COMMAND.to_string()) } async fn graceful_reboot(backend: Arc) -> Result { From bd82a67e90a9a95841c695661cb826230f2e9b02 Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 02:22:12 +0000 Subject: [PATCH 7/9] Unify wait_for_mock_count for power tests Move the wait_for_mock_count helper into the power handler's mock test module and use it from both power and control tests instead of a fragile 50-yield polling loop. Replace the repeated local helper in control.rs and the polling in power.rs tests with the shared helper. Gate the helper to Linux (it only applies to immediate power paths that spawn on Linux) and use a 1s timeout. This centralizes the wait logic, removes duplication, and stabilizes the power-related tests. All roxyd tests pass and clippy is clean. --- src/bin/roxyd/control.rs | 16 ++-------------- src/bin/roxyd/handlers/power.rs | 32 ++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index fc57465..daab38a 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -905,18 +905,6 @@ mod tests { }) } - /// Waits until the mock backend call count reaches `expected`. - #[cfg(target_os = "linux")] - async fn wait_for_mock_count(count: &std::sync::atomic::AtomicUsize, expected: usize) { - tokio::time::timeout(std::time::Duration::from_secs(1), async { - while count.load(std::sync::atomic::Ordering::SeqCst) < expected { - tokio::task::yield_now().await; - } - }) - .await - .expect("timed out waiting for mock backend call"); - } - #[cfg(target_os = "linux")] #[tokio::test] async fn dispatch_reboot_over_live_connection() { @@ -933,7 +921,7 @@ mod tests { "reboot should be accepted: {result:?}" ); - wait_for_mock_count(&mock.reboot_count, 1).await; + handlers::power::wait_for_mock_count(&mock.reboot_count, 1).await; drop(server); let task_result = task.await.expect("dispatch task should not panic"); @@ -956,7 +944,7 @@ mod tests { "shutdown should be accepted: {result:?}" ); - wait_for_mock_count(&mock.power_off_count, 1).await; + handlers::power::wait_for_mock_count(&mock.power_off_count, 1).await; drop(server); let task_result = task.await.expect("dispatch task should not panic"); diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index 069f44c..6de44a4 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -192,6 +192,8 @@ async fn graceful_power_off(backend: Arc) -> Result 0 { - break; - } - tokio::task::yield_now().await; - } - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 1); + wait_for_mock_count(&mock.reboot_count, 1).await; } #[cfg(target_os = "linux")] @@ -277,13 +287,7 @@ mod tests { .expect("shutdown should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); - for _ in 0..50 { - if mock.power_off_count.load(Ordering::SeqCst) > 0 { - break; - } - tokio::task::yield_now().await; - } - assert_eq!(mock.power_off_count.load(Ordering::SeqCst), 1); + wait_for_mock_count(&mock.power_off_count, 1).await; } #[cfg(not(target_os = "linux"))] From f51f7d17968e63f26c2db95102d837853ff441eb Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 17:09:08 +0000 Subject: [PATCH 8/9] Add flat reboot/shutdown tests; clarify CHANGELOG Add live-connection tests that dispatch legacy flat wire codes 4 and 21 and assert legacy `()` responses. These complement existing `node_power` tests and ensure the mock power backend is invoked on Linux. Remove a timing-sensitive assertion in the reboot test to avoid flakes; tests now wait for the mock reboot count via the existing helper. Switch tests to use review_protocol::unary_request and add bincode 2 as a dev-dependency to match the wire format and avoid a version mismatch. Update the Unreleased CHANGELOG entry to separate grouped `node_power` behavior (no protocol response) from the legacy flat reboot/shutdown path (immediate backend action, `()` on the wire). Gate Linux-only helpers/constants to silence dead-code warnings on non-Linux platforms. --- CHANGELOG.md | 15 ++++----- src/bin/roxyd/control.rs | 56 +++++++++++++++++++++++++++++++++ src/bin/roxyd/handlers/power.rs | 1 - 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb0c86..dad128e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,13 +11,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add explicit shutdown path for `roxyd` that handles OS signals (SIGINT/SIGTERM), cancels any in-progress connection attempt or accept/reconnect loop cleanly, and logs shutdown lifecycle events. -- `roxyd` now handles node power-control requests from a Manager (immediate - and graceful reboot/shutdown), replacing the previous unimplemented - scaffolding. On Linux, immediate reboot and shutdown run in the background; - grouped `node_power` requests do not return a protocol response for these - operations. Graceful reboot and shutdown spawn the platform's standard - reboot or poweroff command and report success or `"fail"` to the Manager. - Legacy flat `reboot` and `shutdown` requests use the same behavior. +- `roxyd` now handles node power-control requests from a Manager (immediate and + graceful reboot/shutdown), replacing the previous unimplemented scaffolding. + Immediate grouped `node_power` reboot and shutdown requests do not return a + protocol response; on Linux, they dispatch the OS-facing operation in the + background. Graceful reboot and shutdown attempt to spawn the + platform-specific command and return `Initiated` on success or `"fail"` on + start failure. Legacy flat `reboot` and `shutdown` requests delegate to the + same immediate power path while keeping the legacy `()` response path. Immediate reboot and shutdown are not supported on non-Linux platforms. ### Changed diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index daab38a..5c51928 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -905,6 +905,62 @@ mod tests { }) } + /// Legacy flat request codes from review-protocol 0.19.0 (`client::RequestCode`). + #[cfg(target_os = "linux")] + const FLAT_REQUEST_REBOOT: u32 = 4; + #[cfg(target_os = "linux")] + const FLAT_REQUEST_SHUTDOWN: u32 = 21; + + /// Sends a legacy flat request (review-protocol 0.19.0 wire framing). + #[cfg(target_os = "linux")] + async fn send_legacy_flat_request( + conn: &review_protocol::server::Connection, + request_code: u32, + ) -> anyhow::Result<()> { + let (mut send, mut recv) = conn.open_bi().await?; + let resp: Result<(), String> = + review_protocol::unary_request(&mut send, &mut recv, request_code, ()) + .await + .map_err(|e| anyhow::anyhow!("flat request transport failed: {e}"))?; + resp.map_err(|e| anyhow::anyhow!(e)) + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn dispatch_flat_reboot_request_code_over_live_connection() { + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerBackend::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + send_legacy_flat_request(&server, FLAT_REQUEST_REBOOT) + .await + .expect("flat reboot should return legacy ()"); + + handlers::power::wait_for_mock_count(&mock.reboot_count, 1).await; + + drop(server); + let task_result = task.await.expect("dispatch task should not panic"); + assert!(task_result.is_ok()); + } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn dispatch_flat_shutdown_request_code_over_live_connection() { + let (inner, server, _endpoint) = setup_test_connection().await; + let mock = Arc::new(handlers::power::MockPowerBackend::default()); + let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + + send_legacy_flat_request(&server, FLAT_REQUEST_SHUTDOWN) + .await + .expect("flat shutdown should return legacy ()"); + + handlers::power::wait_for_mock_count(&mock.power_off_count, 1).await; + + drop(server); + let task_result = task.await.expect("dispatch task should not panic"); + assert!(task_result.is_ok()); + } + #[cfg(target_os = "linux")] #[tokio::test] async fn dispatch_reboot_over_live_connection() { diff --git a/src/bin/roxyd/handlers/power.rs b/src/bin/roxyd/handlers/power.rs index 6de44a4..683695e 100644 --- a/src/bin/roxyd/handlers/power.rs +++ b/src/bin/roxyd/handlers/power.rs @@ -272,7 +272,6 @@ mod tests { .await .expect("reboot should succeed"); assert_eq!(resp, NodePowerResponse::Initiated); - assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0); wait_for_mock_count(&mock.reboot_count, 1).await; } From 8ca2ad61c70456b091f1260fa9f0f90d9541ca7d Mon Sep 17 00:00:00 2001 From: "octoaide[bot]" <204759324+octoaide[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 18:06:09 +0000 Subject: [PATCH 9/9] Use Notify for node_power test synchronization Generalize dispatch-loop test helpers and replace flaky yield_now() loops in non-Linux node_power tests with Notify-based synchronization. Add NotifyingPowerHandler and spawn_dispatch_loop_with_handler so tests wait on a Notify after node_power completes, providing a real happens-before relationship and eliminating timing-based flakes. Updated non-Linux reboot/shutdown tests accordingly; roxyd tests and clippy pass locally. --- src/bin/roxyd/control.rs | 72 ++++++++++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/src/bin/roxyd/control.rs b/src/bin/roxyd/control.rs index 5c51928..42732a1 100644 --- a/src/bin/roxyd/control.rs +++ b/src/bin/roxyd/control.rs @@ -882,20 +882,22 @@ mod tests { // -- Tests: RequestCode dispatch over live connection -------------- - /// Spawns a dispatch loop that uses the provided mock power backend for every - /// stream accepted on this connection. Returns the task handle so the - /// caller can drive shutdown. - fn spawn_dispatch_loop_with_mock( + fn spawn_dispatch_loop_with_handler( inner: review_protocol::client::Connection, mock: Arc, - ) -> tokio::task::JoinHandle> { + make_handler: F, + ) -> tokio::task::JoinHandle> + where + F: Fn(Arc) -> H + Send + 'static, + H: review_protocol::request::Handler + 'static, + { let backend: Arc = mock; tokio::spawn(async move { loop { let Ok((mut send, mut recv)) = inner.accept_bi().await else { return Ok::<(), anyhow::Error>(()); }; - let mut handler = super::RequestHandler::with_power_backend(backend.clone()); + let mut handler = make_handler(backend.clone()); if let Err(e) = review_protocol::request::handle(&mut handler, &mut send, &mut recv).await { @@ -905,6 +907,44 @@ mod tests { }) } + /// Spawns a dispatch loop that uses the provided mock power backend for every + /// stream accepted on this connection. Returns the task handle so the + /// caller can drive shutdown. + fn spawn_dispatch_loop_with_mock( + inner: review_protocol::client::Connection, + mock: Arc, + ) -> tokio::task::JoinHandle> { + spawn_dispatch_loop_with_handler(inner, mock, super::RequestHandler::with_power_backend) + } + + #[cfg(not(target_os = "linux"))] + struct NotifyingPowerHandler { + inner: super::RequestHandler, + processed: Arc, + } + + #[cfg(not(target_os = "linux"))] + #[async_trait::async_trait] + impl review_protocol::request::Handler for NotifyingPowerHandler { + async fn node_power(&mut self, req: NodePowerRequest) -> Result { + let result = review_protocol::request::Handler::node_power(&mut self.inner, req).await; + self.processed.notify_one(); + result + } + } + + #[cfg(not(target_os = "linux"))] + fn spawn_node_power_dispatch_loop_with_notify( + inner: review_protocol::client::Connection, + mock: Arc, + processed: Arc, + ) -> tokio::task::JoinHandle> { + spawn_dispatch_loop_with_handler(inner, mock, move |backend| NotifyingPowerHandler { + inner: super::RequestHandler::with_power_backend(backend), + processed: processed.clone(), + }) + } + /// Legacy flat request codes from review-protocol 0.19.0 (`client::RequestCode`). #[cfg(target_os = "linux")] const FLAT_REQUEST_REBOOT: u32 = 4; @@ -1017,7 +1057,9 @@ mod tests { let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerBackend::default()); - let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + let processed = Arc::new(tokio::sync::Notify::new()); + let task = + spawn_node_power_dispatch_loop_with_notify(inner, mock.clone(), processed.clone()); let result = server.node_power(NodePowerRequest::Reboot).await; assert!( @@ -1025,9 +1067,9 @@ mod tests { "request should be sent: {result:?}" ); - for _ in 0..50 { - tokio::task::yield_now().await; - } + tokio::time::timeout(Duration::from_secs(1), processed.notified()) + .await + .expect("node_power request should be processed"); assert_eq!( mock.reboot_count.load(Ordering::SeqCst), 0, @@ -1048,7 +1090,9 @@ mod tests { let (inner, server, _endpoint) = setup_test_connection().await; let mock = Arc::new(handlers::power::MockPowerBackend::default()); - let task = spawn_dispatch_loop_with_mock(inner, mock.clone()); + let processed = Arc::new(tokio::sync::Notify::new()); + let task = + spawn_node_power_dispatch_loop_with_notify(inner, mock.clone(), processed.clone()); let result = server.node_power(NodePowerRequest::Shutdown).await; assert!( @@ -1056,9 +1100,9 @@ mod tests { "request should be sent: {result:?}" ); - for _ in 0..50 { - tokio::task::yield_now().await; - } + tokio::time::timeout(Duration::from_secs(1), processed.notified()) + .await + .expect("node_power request should be processed"); assert_eq!( mock.power_off_count.load(Ordering::SeqCst), 0,