Skip to content
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add explicit shutdown path for `roxyd` that handles OS signals
(SIGINT/SIGTERM), cancels any in-progress connection attempt or
accept/reconnect loop cleanly, and logs shutdown lifecycle events.
- `roxyd` now handles node power-control requests from a Manager (immediate and
graceful reboot/shutdown), replacing the previous unimplemented scaffolding.
Immediate grouped `node_power` reboot and shutdown requests do not return a
protocol response; on Linux, they dispatch the OS-facing operation in the
background. Graceful reboot and shutdown attempt to spawn the
platform-specific command and return `Initiated` on success or `"fail"` on
start failure. Legacy flat `reboot` and `shutdown` requests delegate to the
same immediate power path while keeping the legacy `()` response path.
Immediate reboot and shutdown are not supported on non-Linux platforms.

### Changed

Expand Down
339 changes: 336 additions & 3 deletions src/bin/roxyd/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Manager: connect, run loop, stream accept, reconnect, and dispatch entry.
//! All request handling is delegated to the [`handlers`] module.

use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result};
Expand All @@ -17,6 +18,7 @@ use review_protocol::types::node::{
};
use tokio::sync::watch;

use super::handlers::power::{PowerBackend, SystemPowerBackend};
use super::{handlers, settings::Settings};

/// The review-protocol version required by this client.
Expand Down Expand Up @@ -222,7 +224,7 @@ impl Connection {
///
/// Returns an error if request reading or response sending fails.
async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Result<()> {
let mut handler = RequestHandler;
let mut handler = RequestHandler::default();
review_protocol::request::handle(&mut handler, send, recv)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
Expand All @@ -238,7 +240,26 @@ async fn dispatch(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) ->
/// The legacy flat methods (`reboot`, `shutdown`, `process_list`,
/// `resource_usage`) are temporary protocol-compatibility adapters that
/// route through the grouped handlers.
struct RequestHandler;
struct RequestHandler {
power_backend: Arc<dyn PowerBackend>,
}

impl Default for RequestHandler {
fn default() -> Self {
Self {
power_backend: Arc::new(SystemPowerBackend),
}
}
}

#[cfg(test)]
impl RequestHandler {
fn with_power_backend(backend: Arc<dyn PowerBackend>) -> Self {
Self {
power_backend: backend,
}
}
}

#[async_trait::async_trait]
impl review_protocol::request::Handler for RequestHandler {
Expand Down Expand Up @@ -286,7 +307,7 @@ impl review_protocol::request::Handler for RequestHandler {

async fn node_power(&mut self, req: NodePowerRequest) -> Result<NodePowerResponse, String> {
tracing::info!(handler_group = "node_power", request = %req.service_id(), "Dispatching request");
handlers::power::handle(req).await
handlers::power::handle(req, self.power_backend.clone()).await
}

async fn node_observation(
Expand Down Expand Up @@ -859,6 +880,318 @@ mod tests {
assert!(task_result.is_ok());
}

// -- Tests: RequestCode dispatch over live connection --------------

/// Spawns a dispatch loop that uses the provided mock power backend for every
/// stream accepted on this connection. Returns the task handle so the
/// caller can drive shutdown.
fn spawn_dispatch_loop_with_mock(
inner: review_protocol::client::Connection,
mock: Arc<handlers::power::MockPowerBackend>,
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
let backend: Arc<dyn PowerBackend> = mock;
tokio::spawn(async move {
loop {
let Ok((mut send, mut recv)) = inner.accept_bi().await else {
return Ok::<(), anyhow::Error>(());
};
let mut handler = super::RequestHandler::with_power_backend(backend.clone());
if let Err(e) =
review_protocol::request::handle(&mut handler, &mut send, &mut recv).await
{
tracing::error!("Request handling failed: {e}");
}
}
})
}

@zmrdltl zmrdltl May 29, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn spawn_dispatch_loop_with_mock(
inner: review_protocol::client::Connection,
mock: Arc<handlers::power::MockPowerBackend>,
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
let backend: Arc<dyn PowerBackend> = mock;
tokio::spawn(async move {
loop {
let Ok((mut send, mut recv)) = inner.accept_bi().await else {
return Ok::<(), anyhow::Error>(());
};
let mut handler = super::RequestHandler::with_power_backend(backend.clone());
if let Err(e) =
review_protocol::request::handle(&mut handler, &mut send, &mut recv).await
{
tracing::error!("Request handling failed: {e}");
}
}
})
}
fn spawn_dispatch_loop_with_handler<H, F>(
inner: review_protocol::client::Connection,
mock: Arc<handlers::power::MockPowerBackend>,
make_handler: F,
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>>
where
F: Fn(Arc<dyn PowerBackend>) -> H + Send + 'static,
H: review_protocol::request::Handler + 'static,
{
let backend: Arc<dyn PowerBackend> = mock;
tokio::spawn(async move {
loop {
let Ok((mut send, mut recv)) = inner.accept_bi().await else {
return Ok::<(), anyhow::Error>(());
};
let mut handler = make_handler(backend.clone());
if let Err(e) =
review_protocol::request::handle(&mut handler, &mut send, &mut recv).await
{
tracing::error!("Request handling failed: {e}");
}
}
})
}
fn spawn_dispatch_loop_with_mock(
inner: review_protocol::client::Connection,
mock: Arc<handlers::power::MockPowerBackend>,
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
spawn_dispatch_loop_with_handler(inner, mock, super::RequestHandler::with_power_backend)
}
#[cfg(not(target_os = "linux"))]
struct NotifyingPowerHandler {
inner: super::RequestHandler,
processed: Arc<tokio::sync::Notify>,
}
#[cfg(not(target_os = "linux"))]
#[async_trait::async_trait]
impl review_protocol::request::Handler for NotifyingPowerHandler {
async fn node_power(&mut self, req: NodePowerRequest) -> Result<NodePowerResponse, String> {
let result = review_protocol::request::Handler::node_power(&mut self.inner, req).await;
self.processed.notify_one();
result
}
}
#[cfg(not(target_os = "linux"))]
fn spawn_node_power_dispatch_loop_with_notify(
inner: review_protocol::client::Connection,
mock: Arc<handlers::power::MockPowerBackend>,
processed: Arc<tokio::sync::Notify>,
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
spawn_dispatch_loop_with_handler(inner, mock, move |backend| NotifyingPowerHandler {
inner: super::RequestHandler::with_power_backend(backend),
processed: processed.clone(),
})
}


/// Legacy flat request codes from review-protocol 0.19.0 (`client::RequestCode`).
#[cfg(target_os = "linux")]
const FLAT_REQUEST_REBOOT: u32 = 4;
#[cfg(target_os = "linux")]
const FLAT_REQUEST_SHUTDOWN: u32 = 21;

/// Sends a legacy flat request (review-protocol 0.19.0 wire framing).
#[cfg(target_os = "linux")]
async fn send_legacy_flat_request(
conn: &review_protocol::server::Connection,
request_code: u32,
) -> anyhow::Result<()> {
let (mut send, mut recv) = conn.open_bi().await?;
let resp: Result<(), String> =
review_protocol::unary_request(&mut send, &mut recv, request_code, ())
.await
.map_err(|e| anyhow::anyhow!("flat request transport failed: {e}"))?;
resp.map_err(|e| anyhow::anyhow!(e))
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn dispatch_flat_reboot_request_code_over_live_connection() {
let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

send_legacy_flat_request(&server, FLAT_REQUEST_REBOOT)
.await
.expect("flat reboot should return legacy ()");

handlers::power::wait_for_mock_count(&mock.reboot_count, 1).await;

drop(server);
let task_result = task.await.expect("dispatch task should not panic");
assert!(task_result.is_ok());
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn dispatch_flat_shutdown_request_code_over_live_connection() {
let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

send_legacy_flat_request(&server, FLAT_REQUEST_SHUTDOWN)
.await
.expect("flat shutdown should return legacy ()");

handlers::power::wait_for_mock_count(&mock.power_off_count, 1).await;

drop(server);
let task_result = task.await.expect("dispatch task should not panic");
assert!(task_result.is_ok());
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn dispatch_reboot_over_live_connection() {
use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::NodePowerRequest;

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

let result = server.node_power(NodePowerRequest::Reboot).await;
assert!(
matches!(result, Ok(NodePowerOutcome::Sent)),
"reboot should be accepted: {result:?}"
);

handlers::power::wait_for_mock_count(&mock.reboot_count, 1).await;

drop(server);
let task_result = task.await.expect("dispatch task should not panic");
assert!(task_result.is_ok());
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn dispatch_shutdown_over_live_connection() {
use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::NodePowerRequest;

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

let result = server.node_power(NodePowerRequest::Shutdown).await;
assert!(
matches!(result, Ok(NodePowerOutcome::Sent)),
"shutdown should be accepted: {result:?}"
);

handlers::power::wait_for_mock_count(&mock.power_off_count, 1).await;

drop(server);
let task_result = task.await.expect("dispatch task should not panic");
assert!(task_result.is_ok());
}

#[cfg(not(target_os = "linux"))]
#[tokio::test]
async fn dispatch_reboot_over_live_connection_non_linux() {
use std::sync::atomic::Ordering;

use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::NodePowerRequest;

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());
let processed = Arc::new(tokio::sync::Notify::new());
let task =
spawn_node_power_dispatch_loop_with_notify(inner, mock.clone(), processed.clone());


let result = server.node_power(NodePowerRequest::Reboot).await;
assert!(
matches!(result, Ok(NodePowerOutcome::Sent)),
"request should be sent: {result:?}"
);

for _ in 0..50 {
tokio::task::yield_now().await;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _ in 0..50 {
tokio::task::yield_now().await;
}
tokio::time::timeout(Duration::from_secs(1), processed.notified())
.await
.expect("node_power request should be processed");

assert_eq!(
mock.reboot_count.load(Ordering::SeqCst),
0,
"immediate reboot must not invoke the backend on non-Linux"
);

drop(server);
let _ = task.await.expect("dispatch task should not panic");
}

#[cfg(not(target_os = "linux"))]
#[tokio::test]
async fn dispatch_shutdown_over_live_connection_non_linux() {
use std::sync::atomic::Ordering;

use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::NodePowerRequest;

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());
let processed = Arc::new(tokio::sync::Notify::new());
let task =
spawn_node_power_dispatch_loop_with_notify(inner, mock.clone(), processed.clone());


let result = server.node_power(NodePowerRequest::Shutdown).await;
assert!(
matches!(result, Ok(NodePowerOutcome::Sent)),
"request should be sent: {result:?}"
);

for _ in 0..50 {
tokio::task::yield_now().await;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _ in 0..50 {
tokio::task::yield_now().await;
}
tokio::time::timeout(Duration::from_secs(1), processed.notified())
.await
.expect("node_power request should be processed");

assert_eq!(
mock.power_off_count.load(Ordering::SeqCst),
0,
"immediate shutdown must not invoke the backend on non-Linux"
);

drop(server);
let _ = task.await.expect("dispatch task should not panic");
}

#[tokio::test]
async fn dispatch_node_power_graceful_reboot_over_live_connection() {
use std::sync::atomic::Ordering;

use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::{NodePowerRequest, NodePowerResponse};

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

let resp = server
.node_power(NodePowerRequest::GracefulReboot)
.await
.expect("graceful reboot should succeed");
assert!(matches!(
resp,
NodePowerOutcome::Response(NodePowerResponse::Initiated)
));
assert_eq!(mock.graceful_reboot_count.load(Ordering::SeqCst), 1);
// Graceful path must not trigger immediate reboot.
assert_eq!(mock.reboot_count.load(Ordering::SeqCst), 0);

drop(server);
let _ = task.await.expect("dispatch task should not panic");
}

#[tokio::test]
async fn dispatch_node_power_graceful_shutdown_over_live_connection() {
use std::sync::atomic::Ordering;

use review_protocol::server::node::NodePowerOutcome;
use review_protocol::types::node::{NodePowerRequest, NodePowerResponse};

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

let resp = server
.node_power(NodePowerRequest::GracefulShutdown)
.await
.expect("graceful shutdown should succeed");
assert!(matches!(
resp,
NodePowerOutcome::Response(NodePowerResponse::Initiated)
));
assert_eq!(mock.graceful_power_off_count.load(Ordering::SeqCst), 1);

drop(server);
let _ = task.await.expect("dispatch task should not panic");
}

#[tokio::test]
async fn dispatch_node_power_graceful_reboot_fail_over_live_connection() {
use std::sync::atomic::Ordering;

use review_protocol::types::node::NodePowerRequest;

let (inner, server, _endpoint) = setup_test_connection().await;
let mock = Arc::new(handlers::power::MockPowerBackend::default());
mock.graceful_reboot_fail.store(true, Ordering::SeqCst);
let task = spawn_dispatch_loop_with_mock(inner, mock.clone());

let err = server
.node_power(NodePowerRequest::GracefulReboot)
.await
.expect_err("graceful reboot should fail");
let msg = err.to_string();
assert!(
msg.contains("fail"),
"expected 'fail' in error message, got: {msg}"
);

drop(server);
let _ = task.await.expect("dispatch task should not panic");
}

#[tokio::test]
async fn dispatch_resource_usage_over_live_connection() {
let (inner, server, _endpoint) = setup_test_connection().await;
let task = tokio::spawn(async move {
loop {
let Ok((mut send, mut recv)) = inner.accept_bi().await else {
return Ok::<(), anyhow::Error>(());
};
if let Err(e) = dispatch(&mut send, &mut recv).await {
tracing::error!("Request handling failed: {e}");
}
}
});

let result = server
.node_observation(NodeObservationRequest::ResourceUsage)
.await;
assert!(result.is_err(), "should fail: handler is unimplemented");

let task_err = task.await.expect_err("task should have panicked");
assert!(task_err.is_panic());
}

#[tokio::test]
async fn dispatch_process_list_over_live_connection() {
let (inner, server, _endpoint) = setup_test_connection().await;
let task = tokio::spawn(async move {
loop {
let Ok((mut send, mut recv)) = inner.accept_bi().await else {
return Ok::<(), anyhow::Error>(());
};
if let Err(e) = dispatch(&mut send, &mut recv).await {
tracing::error!("Request handling failed: {e}");
}
}
});

let result = server
.node_observation(NodeObservationRequest::ProcessList)
.await;
assert!(result.is_err(), "should fail: handler is unimplemented");

let task_err = task.await.expect_err("task should have panicked");
assert!(task_err.is_panic());
}

#[tokio::test]
async fn run_reconnects_after_connection_close() {
use tokio::sync::oneshot;
Expand Down
Loading