Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/regression-test-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ jobs:
with:
fetch-depth: 0

- name: Fetch base branch
run: git fetch origin ${{ github.event.pull_request.base.ref }}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added fetch step may still not create the origin/<base> remote-tracking ref that the script relies on (BASE_REF="origin/..."). To make this robust regardless of actions/checkout's configured refspec, fetch with an explicit destination (e.g., refs/heads/<base>:refs/remotes/origin/<base>) and quote the ref to avoid shell word-splitting for unusual branch names.

Suggested change
run: git fetch origin ${{ github.event.pull_request.base.ref }}
run: git fetch origin "refs/heads/${{ github.event.pull_request.base.ref }}:refs/remotes/origin/${{ github.event.pull_request.base.ref }}"

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +17
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/title focus on the CI workflow change, but this PR also includes substantial runtime code changes across multiple channel implementations plus a new regression test. Please update the PR description/title to reflect the broader scope, or split the Rust changes into a separate PR to keep review and rollout risk manageable.

Copilot uses AI. Check for mistakes.

- name: Check for regression tests
env:
PR_TITLE: ${{ github.event.pull_request.title }}
Expand Down
99 changes: 83 additions & 16 deletions src/channels/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,30 +372,36 @@ async fn process_message(
None
};

// Send message to the channel
let tx_guard = state.tx.read().await;
if let Some(tx) = tx_guard.as_ref() {
if tx.send(msg).await.is_err() {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(WebhookResponse {
message_id: msg_id,
status: "error".to_string(),
response: Some("Channel closed".to_string()),
}),
);
// Send message to the channel — clone sender to avoid holding lock across await.
// Note: after cloning, shutdown() can run and clear the Option while we still hold
// a valid Sender clone. This is safe: the send will succeed (receiver still alive
// in the agent loop) or fail cleanly if the receiver was dropped.
let tx = {
let tx_guard = state.tx.read().await;
match tx_guard.as_ref() {
Some(tx) => tx.clone(),
None => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(WebhookResponse {
message_id: msg_id,
status: "error".to_string(),
response: Some("Channel not started".to_string()),
}),
);
}
}
} else {
};
if tx.send(msg).await.is_err() {
return (
StatusCode::SERVICE_UNAVAILABLE,
StatusCode::INTERNAL_SERVER_ERROR,
Json(WebhookResponse {
message_id: msg_id,
status: "error".to_string(),
response: Some("Channel not started".to_string()),
response: Some("Channel closed".to_string()),
}),
);
Comment on lines +395 to 403
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tx.send(msg).await fails while wait_for_response is true, the function returns here before the cleanup at the end, leaving a stale entry in pending_responses. Ensure the map entry is removed on this error path as well (e.g., remove before returning, or use a scope guard so cleanup runs even on early returns).

Copilot uses AI. Check for mistakes.
}
drop(tx_guard);

// Wait for response if requested
let response = if let Some(rx) = response_rx {
Expand Down Expand Up @@ -543,6 +549,67 @@ mod tests {
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}

/// Regression test for issue #869: RwLock read guard was held across
/// tx.send(msg).await in `process_message()`, blocking shutdown() from
/// acquiring the write lock when the channel buffer was full.
///
/// This test exercises the actual production code path (`process_message`)
/// with a full channel buffer, then verifies shutdown() can still complete.
#[tokio::test]
async fn shutdown_completes_while_process_message_blocked() {
let channel = Arc::new(test_channel(Some("secret")));
let stream = channel.start().await.unwrap();

// Fill all 256 slots in the channel buffer
{
let tx = {
let guard = channel.state.tx.read().await;
guard.as_ref().unwrap().clone()
};
for i in 0..256 {
let msg = IncomingMessage::new("http", "user", format!("fill-{}", i));
tx.send(msg).await.unwrap();
Comment on lines +563 to +571
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regression test hard-codes the channel buffer size (256) in the fill loop, which will silently become incorrect if the buffer capacity changes in HttpChannel::start() (and the test may stop actually blocking on the 257th send). Consider extracting the capacity into a const used by both start() and this test to keep them in sync.

Copilot uses AI. Check for mistakes.
}
}

// Signal so we know the spawned task has started and is about to
// call process_message (which will block on the full channel).
let started = Arc::new(tokio::sync::Notify::new());
let started_clone = started.clone();

// Spawn a task that calls the actual production code path.
// process_message() internally acquires the RwLock read guard and
// sends on the channel. With the fix, the guard is released before
// send().await; without the fix, shutdown() would deadlock.
let state = channel.state.clone();
let blocked_send = tokio::spawn(async move {
started_clone.notify_one();
let msg = IncomingMessage::new("http", "user", "blocked-257th");
let _ = process_message(state, msg, false).await;
});

// Wait for the spawned task to start, then give it time to reach
// the send().await and verify that it is still pending (i.e., blocked).
started.notified().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
!blocked_send.is_finished(),
"process_message task should still be pending before shutdown()"
);

// shutdown() must complete even though process_message is blocked on
// send(). Before the fix, the read guard held across send().await
// would prevent shutdown() from acquiring the write lock.
let result =
tokio::time::timeout(std::time::Duration::from_secs(2), channel.shutdown()).await;
assert!(result.is_ok(), "shutdown() must not deadlock");
assert!(result.unwrap().is_ok());

// Drop the stream (receiver) so the blocked send task can complete
drop(stream);
let _ = blocked_send.await;
}

#[tokio::test]
async fn webhook_missing_secret_returns_unauthorized() {
let channel = test_channel(Some("correct-secret"));
Expand Down
90 changes: 50 additions & 40 deletions src/channels/wasm/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1994,28 +1994,33 @@ impl WasmChannel {
return Ok(());
}

let tx_guard = self.message_tx.read().await;
let Some(tx) = tx_guard.as_ref() else {
tracing::error!(
channel = %self.name,
count = messages.len(),
"Messages emitted but no sender available - channel may not be started!"
);
return Ok(());
// Clone sender to avoid holding RwLock read guard across send().await in the loop
let tx = {
let tx_guard = self.message_tx.read().await;
let Some(tx) = tx_guard.as_ref() else {
tracing::error!(
channel = %self.name,
count = messages.len(),
"Messages emitted but no sender available - channel may not be started!"
);
return Ok(());
};
tx.clone()
};

let mut rate_limiter = self.rate_limiter.write().await;

for emitted in messages {
// Check rate limit
if !rate_limiter.check_and_record() {
tracing::warn!(
channel = %self.name,
"Message emission rate limited"
);
return Err(WasmChannelError::EmitRateLimited {
name: self.name.clone(),
});
// Check rate limit — acquire and release the write lock before send().await
{
let mut rate_limiter = self.rate_limiter.write().await;
if !rate_limiter.check_and_record() {
tracing::warn!(
channel = %self.name,
"Message emission rate limited"
);
return Err(WasmChannelError::EmitRateLimited {
name: self.name.clone(),
});
}
}

// Convert to IncomingMessage
Expand Down Expand Up @@ -2057,7 +2062,7 @@ impl WasmChannel {
self.update_broadcast_metadata(&emitted.metadata_json).await;
}

// Send to stream
// Send to stream — no locks held across this await
tracing::info!(
channel = %self.name,
user_id = %emitted.user_id,
Expand Down Expand Up @@ -2281,28 +2286,33 @@ impl WasmChannel {
"Processing emitted messages from polling callback"
);

let tx_guard = message_tx.read().await;
let Some(tx) = tx_guard.as_ref() else {
tracing::error!(
channel = %channel_name,
count = messages.len(),
"Messages emitted but no sender available - channel may not be started!"
);
return Ok(());
// Clone sender to avoid holding RwLock read guard across send().await in the loop
let tx = {
let tx_guard = message_tx.read().await;
let Some(tx) = tx_guard.as_ref() else {
tracing::error!(
channel = %channel_name,
count = messages.len(),
"Messages emitted but no sender available - channel may not be started!"
);
return Ok(());
};
tx.clone()
};

let mut limiter = rate_limiter.write().await;

for emitted in messages {
// Check rate limit
if !limiter.check_and_record() {
tracing::warn!(
channel = %channel_name,
"Message emission rate limited"
);
return Err(WasmChannelError::EmitRateLimited {
name: channel_name.to_string(),
});
// Check rate limit — acquire and release the write lock before send().await
{
let mut limiter = rate_limiter.write().await;
if !limiter.check_and_record() {
tracing::warn!(
channel = %channel_name,
"Message emission rate limited"
);
return Err(WasmChannelError::EmitRateLimited {
name: channel_name.to_string(),
});
}
}

// Convert to IncomingMessage
Expand Down Expand Up @@ -2350,7 +2360,7 @@ impl WasmChannel {
.await;
}

// Send to stream
// Send to stream — no locks held across this await
tracing::info!(
channel = %channel_name,
user_id = %emitted.user_id,
Expand Down
32 changes: 22 additions & 10 deletions src/channels/web/handlers/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ pub async fn chat_send_handler(

let msg_id = msg.id;

let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard
.as_ref()
.ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?
.clone()
};

tx.send(msg).await.map_err(|_| {
(
Expand Down Expand Up @@ -103,11 +109,17 @@ pub async fn chat_approval_handler(

let msg_id = msg.id;

let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard
.as_ref()
.ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?
.clone()
};

tx.send(msg).await.map_err(|_| {
(
Expand Down
32 changes: 22 additions & 10 deletions src/channels/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,17 @@ async fn chat_send_handler(
req.images.len()
);

let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard
.as_ref()
.ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?
.clone()
};

tracing::debug!("[chat_send_handler] Sending message through channel");
tx.send(msg).await.map_err(|_| {
Expand Down Expand Up @@ -775,11 +781,17 @@ async fn chat_approval_handler(

let msg_id = msg.id;

let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard
.as_ref()
.ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?
.clone()
};

tx.send(msg).await.map_err(|_| {
(
Expand Down
16 changes: 12 additions & 4 deletions src/channels/web/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,12 @@ async fn handle_client_message(
incoming = incoming.with_attachments(attachments);
}

let tx_guard = state.msg_tx.read().await;
if let Some(ref tx) = *tx_guard {
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard.as_ref().cloned()
};
if let Some(tx) = tx {
if tx.send(incoming).await.is_err() {
let _ = direct_tx
.send(WsServerMessage::Error {
Expand Down Expand Up @@ -245,8 +249,12 @@ async fn handle_client_message(
if let Some(ref tid) = thread_id {
msg = msg.with_thread(tid);
}
let tx_guard = state.msg_tx.read().await;
if let Some(ref tx) = *tx_guard {
// Clone sender to avoid holding RwLock read guard across send().await
let tx = {
let tx_guard = state.msg_tx.read().await;
tx_guard.as_ref().cloned()
};
if let Some(tx) = tx {
let _ = tx.send(msg).await;
}
}
Expand Down
Loading