fix: release lock guards before awaiting channel send (#869)#905
fix: release lock guards before awaiting channel send (#869)#905qbit-glitch wants to merge 5 commits intonearai:mainfrom
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
Pull request overview
Fixes an async deadlock class caused by holding RwLock guards across .send().await in multiple channel implementations, and (in the same PR) introduces a shared “agentic loop” + shared tool execution pipeline used by chat, background jobs, and the container worker runtime.
Changes:
- Clone
mpsc::Senderout ofRwLock<Option<Sender<...>>>before awaiting.send()across HTTP/web/ws/wasm channels; add a regression test for the HTTP webhook path. - Introduce
src/agent/agentic_loop.rs(shared loop engine + delegate trait) and migrate chat/job/container execution flows to use it. - Centralize tool execution and tool-result processing in
src/tools/execute.rsand route scheduler/chat/job/container through it; update docs/coverage references to moved worker code.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/channels/http.rs |
Avoid lock-across-await in webhook processing; adds regression test for shutdown deadlock. |
src/channels/web/server.rs |
Clone sender before send().await in HTTP handlers to prevent deadlock with start/shutdown. |
src/channels/web/ws.rs |
Clone sender before send().await in WS message handlers. |
src/channels/web/handlers/chat.rs |
Applies the same sender-clone pattern in (currently) unused handlers for consistency. |
src/channels/wasm/wrapper.rs |
Clone sender before loop and scope rate limiter write-lock per iteration to avoid lock-across-await. |
src/worker/mod.rs |
Re-exports reorganized worker entry points (container, job). |
src/worker/container.rs |
Replaces deleted runtime.rs; container worker now runs via ContainerDelegate + shared loop. |
src/worker/job.rs |
Background job worker migrated to JobDelegate + shared loop; uses shared tool result pipeline. |
src/worker/runtime.rs |
Removed (logic moved into src/worker/container.rs). |
src/tools/mod.rs |
Exposes new tools::execute module. |
src/tools/execute.rs |
Adds canonical tool execution pipeline and shared tool-result processing helpers. |
src/agent/dispatcher.rs |
Chat execution migrated onto shared agentic loop via ChatDelegate. |
src/agent/agentic_loop.rs |
New shared loop engine used by chat/job/container. |
src/agent/thread_ops.rs |
Uses shared tool-result processing helper for consistency. |
src/agent/scheduler.rs |
Scheduler tool tasks delegate execution to shared tool pipeline. |
src/agent/mod.rs |
Exposes agentic_loop, adjusts scheduler visibility, re-exports worker types from crate::worker. |
src/util.rs |
Updates completion-detection comment to reflect new worker module layout. |
src/agent/CLAUDE.md |
Documents the new shared agentic loop architecture and moved worker implementation. |
COVERAGE_PLAN.md |
Updates coverage plan references to moved worker files. |
CLAUDE.md |
Updates repository structure docs for new worker modules. |
.claude/commands/add-sse-event.md |
Updates contributor guidance to point at src/worker/job.rs instead of old worker location. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Brief yield to let the spawned task reach the blocked send() inside | ||
| // process_message. With multi_thread runtime, the task runs on a | ||
| // separate worker thread, so this is just a scheduling yield. | ||
| tokio::task::yield_now().await; |
There was a problem hiding this comment.
This regression test can pass even if process_message() does not actually block on the 257th send: started.notify_one() only proves the task started, and a single yield_now() doesn’t guarantee it reached the .send().await (or that it is holding the tx read lock). To make the deadlock detection deterministic, add an assertion that the spawned task is still pending before calling shutdown() (e.g., check the JoinHandle isn’t finished after a short delay/timeout), or add an additional sync point that only fires after process_message() has acquired the sender and attempted the send.
| tokio::task::yield_now().await; | |
| tokio::task::yield_now().await; | |
| // Give the spawned task a short time to reach the send().await and | |
| // verify that it is still pending (i.e., blocked) before shutdown(). | |
| tokio::time::sleep(std::time::Duration::from_millis(50)).await; | |
| assert!( | |
| !blocked_send.is_finished(), | |
| "process_message task should still be pending before shutdown()" | |
| ); |
| //! Unified agentic loop engine. | ||
| //! | ||
| //! Provides a single implementation of the core LLM call → tool execution → | ||
| //! result processing → context update → repeat cycle. Three consumers | ||
| //! (chat dispatcher, job worker, container runtime) customize behavior | ||
| //! via the `LoopDelegate` trait. | ||
|
|
There was a problem hiding this comment.
The PR title/description focus on fixing lock guards around send().await, but this PR also introduces a large architectural refactor (new agentic_loop engine, new shared tool execution pipeline, and worker runtime restructuring). Please update the PR description to cover these behavioral/architectural changes (or split into a separate PR) so reviewers can properly assess risk and migration impacts.
cafcef1 to
8eb7896
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let tx = { | ||
| let tx_guard = state.tx.read().await; | ||
| match tx_guard.as_ref() { | ||
| Some(tx) => tx.clone(), | ||
| None => { | ||
| return ( | ||
| StatusCode::SERVICE_UNAVAILABLE, | ||
| Json(WebhookResponse { | ||
| message_id: msg_id, | ||
| status: "error".to_string(), | ||
| response: Some("Channel not started".to_string()), | ||
| }), | ||
| ); | ||
| } | ||
| } | ||
| } else { | ||
| }; | ||
| if tx.send(msg).await.is_err() { | ||
| return ( | ||
| StatusCode::SERVICE_UNAVAILABLE, | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| Json(WebhookResponse { | ||
| message_id: msg_id, | ||
| status: "error".to_string(), | ||
| response: Some("Channel not started".to_string()), | ||
| response: Some("Channel closed".to_string()), | ||
| }), | ||
| ); | ||
| } |
There was a problem hiding this comment.
process_message() may leak an entry in state.pending_responses on early returns. When wait_for_response is true, the oneshot sender is inserted before this block; if state.tx is None (or if the subsequent tx.send(..).await fails and returns early), the function returns without removing the pending response entry, which can accumulate and eventually trip MAX_PENDING_RESPONSES (effectively blocking future requests). Consider checking/obtaining tx before inserting into pending_responses, or ensuring the map entry is removed on all error/early-return paths.
| #[tokio::test] | ||
| async fn shutdown_completes_while_process_message_blocked() { | ||
| let channel = Arc::new(test_channel(Some("secret"))); |
There was a problem hiding this comment.
The PR description says this regression test uses #[tokio::test(flavor = "multi_thread", worker_threads = 2)] for true parallelism, but the code uses plain #[tokio::test]. To make sure this test actually exercises the deadlock scenario (shutdown running concurrently with a blocked send) and matches the documented test plan, set the explicit runtime flavor/worker count here.
| // Wait for the spawned task to start, then give it time to reach | ||
| // the send().await and verify that it is still pending (i.e., blocked). | ||
| started.notified().await; | ||
| tokio::time::sleep(std::time::Duration::from_millis(50)).await; | ||
| assert!( | ||
| !blocked_send.is_finished(), | ||
| "process_message task should still be pending before shutdown()" | ||
| ); |
There was a problem hiding this comment.
This test uses a fixed sleep(50ms) plus !JoinHandle::is_finished() to infer that process_message() is blocked on send(). That check can be a false positive if the spawned task hasn't reached the send yet (e.g., under scheduler/CI variance), which weakens the regression signal. Consider replacing the timing-based assertion with a more deterministic synchronization/timeout that confirms the send path is pending before calling shutdown().
8be3193 to
7d9f04a
Compare
ebe2886 to
ce8686e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Signal so we know the spawned task has started and is about to | ||
| // call process_message (which will block on the full channel). | ||
| let started = Arc::new(tokio::sync::Notify::new()); | ||
| let started_clone = started.clone(); | ||
|
|
||
| // Spawn a task that calls the actual production code path. | ||
| // process_message() internally acquires the RwLock read guard and | ||
| // sends on the channel. With the fix, the guard is released before | ||
| // send().await; without the fix, shutdown() would deadlock. | ||
| let state = channel.state.clone(); | ||
| let blocked_send = tokio::spawn(async move { | ||
| started_clone.notify_one(); | ||
| let msg = IncomingMessage::new("http", "user", "blocked-257th"); | ||
| let _ = process_message(state, msg, false).await; | ||
| }); |
There was a problem hiding this comment.
The started Notify is triggered before calling process_message(), so this test can still pass if shutdown() wins the race and completes before the spawned task actually reaches the blocking tx.send(...).await (which would make the regression undetected). Consider adding stronger synchronization so the test only proceeds once the spawned task is confirmed to be pending on the full-channel send (e.g., refactor the send path into a helper that can signal right before awaiting, or add a check that shutdown()/state.tx.write() is blocked in the pre-fix code path).
zmanian
left a comment
There was a problem hiding this comment.
Review: fix: release lock guards before awaiting channel send (#869)
Verdict: APPROVE
This is a well-executed fix for a real deadlock that occurs when bounded mpsc channel buffers fill up. The analysis, fix pattern, and test are all solid.
Correctness
The core fix is correct: cloning the mpsc::Sender out of the RwLock<Option<Sender>> before calling .send().await eliminates the lock-across-await deadlock. tokio::sync::mpsc::Sender::clone() is cheap (Arc bump) and the cloned sender remains valid independently of the Option being cleared by shutdown().
All 10 call sites across 5 files apply the same pattern consistently:
- Acquire read lock
- Clone the
Sender(or return early ifNone) - Drop the lock guard (via scope block)
- Await
.send()on the clone
The TOCTOU window is correctly identified and harmless -- a cloned Sender will either deliver the message (receiver alive) or return Err (receiver dropped), and all call sites handle the error.
wasm/wrapper.rs rate_limiter change
Moving the rate_limiter.write().await inside the per-iteration scope block is the right call. Holding a write lock across the entire loop with .send().await calls had the same class of bug. Per-iteration acquisition is semantically correct for rate limiting and eliminates the lock-across-await.
Regression test
The shutdown_completes_while_process_message_blocked test is well-designed:
- Exercises the actual
process_message()production code path - Fills the 256-slot buffer to trigger the blocking
.send() - Uses
tokio::sync::Notifyfor deterministic synchronization - Uses
multi_threadflavor with 2 workers for true parallelism - Timeout-based deadlock detection (2 seconds)
- Properly cleans up by dropping the stream to unblock the spawned task
CI workflow fix
The second commit fixing regression-test-check.yml to fetch the base branch and use pr-head ref is a legitimate CI environment fix. However, this CI check is still failing -- the workflow can't find the #[tokio::test] annotation in the three-dot diff. This appears to be because the test was added in the first commit but the HEAD_REF resolution or the diff range isn't capturing it correctly. This needs a fix before merge.
Minor observations (non-blocking)
- The
handlers/chat.rschanges are in dead code (acknowledged in the PR description). Fine for consistency. - No
.unwrap()in production code paths -- good.
CI status
The "Regression test enforcement" check is failing. The PR clearly includes a regression test, so this is a false negative in the CI workflow. Either fix the workflow's test detection logic for this case, or apply the skip-regression-check label as a workaround. All other checks pass (clippy, tests, builds, E2E).
Clone `mpsc::Sender` out of `RwLock` before `.send().await` to prevent read guards from blocking write lock acquisition (shutdown/start) when the channel buffer is full. Fixed call sites: - src/channels/http.rs: process_message() - src/channels/web/server.rs: chat_send_handler(), chat_approval_handler() - src/channels/web/handlers/chat.rs: chat_send_handler(), chat_approval_handler() - src/channels/web/ws.rs: handle_client_message() (2 sites) - src/channels/wasm/wrapper.rs: process_emitted_messages() (2 impls, also scoped rate_limiter write lock per-iteration) Includes regression test: shutdown_completes_while_process_message_blocked Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The regression-test-check workflow failed because origin/main wasn't available as a ref in the CI environment. actions/checkout@v4 fetches the PR merge ref history but doesn't make the base branch ref available for three-dot diff comparisons. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ce8686e to
1d5a7bd
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
zmanian
left a comment
There was a problem hiding this comment.
Review: fix: release lock guards before awaiting channel send (#869)
Verdict: APPROVE
Clean and focused fix. The diff applies the same mechanical pattern consistently across all 10 call sites: acquire lock, clone sender, drop lock, then await send.
Correctness
tokio::sync::mpsc::Sender::clone()is cheap (Arc bump) and the cloned sender remains valid independently of theOptionbeing cleared byshutdown()- TOCTOU window is harmless -- cloned
Sendereither delivers (receiver alive) or returnsErr(receiver dropped), and all call sites handle the error - WASM
rate_limiterfix is also correct -- moved inside the loop with explicit scope blocks, which is better for rate limiting semantics anyway
Regression test
The test in http.rs is thorough: fills all 256 buffer slots, spawns a blocking process_message, verifies shutdown() completes within 2s. The sleep(50ms) is a minor race but acceptable for testing a blocking condition.
CI workflow fix
The pr-head ref fix in regression-test-check.yml is a legitimate fix -- HEAD points to the merge commit, not the PR head. Belongs in this PR since it was discovered while testing.
No issues found.
|
Closing — superseded by #1003 (merged) which fixes the same lock-across-await issue. |
Summary
Fixes #869 — [CRITICAL] Lock held across async I/O boundary blocks webhook processing.
RwLockread guards onOption<mpsc::Sender<IncomingMessage>>were held across.send(msg).awaitin 10 call sites across 5 channel files. When the bounded mpsc channel buffer (capacity 256) fills,.send()suspends indefinitely while still holding the read guard. This preventsshutdown()andstart()from ever acquiring their write locks — a deadlock under load.Root cause: The lock was used to check whether the sender exists (the
Option), but the guard was unnecessarily kept alive across the.send().awaitcall that follows.Fix: Clone the
mpsc::Senderout of theRwLockbefore awaiting send.Sender::clone()is cheap (internallyArc-based). The lock scope is reduced to just the existence check. Additionally, tworate_limiterwrite locks inwasm/wrapper.rshad the same class of bug (held across the same.send().awaitloop) — these were moved inside the loop with explicit scope blocks.Changed files
src/channels/http.rsprocess_message()src/channels/web/server.rschat_send_handler(),chat_approval_handler()src/channels/web/ws.rssrc/channels/web/handlers/chat.rschat_send_handler(),chat_approval_handler()src/channels/wasm/wrapper.rsprocess_emitted_messagesfunctionsrate_limiterwrite lock per-iterationBehavioral notes
shutdown()can race and clear theOption. The clonedSenderremains valid —send()succeeds if the receiver is still alive (held by the agent loop'sReceiverStream), or returnsErrcleanly if the receiver was dropped. All call sites already handle send failure. This is a minor behavioral change from the old code (whereshutdown()was serialized behind the read guard), documented in comments.rate_limiterwrite lock inwasm/wrapper.rsis now acquired and released on each loop iteration instead of once for the entire batch. This is correct for rate limiting semantics and eliminates the lock-across-await.handlers/chat.rsis dead code (marked#[allow(dead_code)]inhandlers/mod.rs) — fixed for consistency so the correct pattern is in place when these handlers are eventually wired up.Test plan
cargo clippy --all --benches --tests --examples --all-features— zero warningscargo test channels— 410 passed, 0 failedshutdown_completes_while_process_message_blocked:process_message()production code path (not a synthetic reproduction)process_message()which blocks on the 257th sendtokio::sync::Notifyfor deterministic task synchronization#[tokio::test(flavor = "multi_thread", worker_threads = 2)]for true parallel executionshutdown()completes within 2 seconds (timeout-based deadlock detection)grepverification: no remainingtx_guardheld across.send().awaitinsrc/channels/origin/main🤖 Generated with Claude Code