diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index a0e8278fc7..54575eccb4 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -1153,8 +1153,92 @@ impl Agent { // Process based on submission type let result = match submission { Submission::UserInput { content } => { - self.process_user_input(message, session, thread_id, &content) - .await + let mut result = self + .process_user_input(message, session.clone(), thread_id, &content) + .await; + + // Drain any messages queued during processing. + // Messages are merged (newline-separated) so the LLM receives + // full context from rapid consecutive inputs instead of + // processing each as a separate turn with partial context (#259). + // + // Only `Response` continues the drain — the user got a normal + // reply and there may be more queued messages to process. + // + // Everything else stops the loop: + // - `NeedApproval`: thread is blocked on user approval + // - `Interrupted`: turn was cancelled + // - `Ok`: control-command acknowledgment (including the "queued" + // ack returned when a message arrives during Processing) + // - `Error`: soft error — draining more messages after an error + // would produce confusing interleaved output + // - `Err(_)`: hard error + while let Ok(SubmissionResult::Response { content: outgoing }) = &result { + let merged = { + let mut sess = session.lock().await; + sess.threads + .get_mut(&thread_id) + .and_then(|t| t.drain_pending_messages()) + }; + let Some(next_content) = merged else { + break; + }; + + tracing::debug!( + thread_id = %thread_id, + merged_len = next_content.len(), + "Drain loop: processing merged queued messages" + ); + + // Send the completed turn's response before starting the next. + // + // Known limitations: + // - One-shot channels (HttpChannel) consume the response + // sender on the first respond() call keyed by msg.id. + // Subsequent calls (including the outer handler's final + // respond) are silently dropped. For one-shot channels + // only this intermediate response is delivered. + // - All drain-loop responses are routed via the original + // `message`, so channels that key routing on message + // identity will attribute every response to the first + // message. This is acceptable for the current + // single-user-per-thread model. + if let Err(e) = self + .channels + .respond(message, OutgoingResponse::text(outgoing.clone())) + .await + { + tracing::warn!( + thread_id = %thread_id, + "Failed to send intermediate drain-loop response: {e}" + ); + } + + // Process merged queued messages as a single turn. + // Use a message clone with cleared attachments so + // augment_with_attachments doesn't re-apply the original + // message's attachments to unrelated queued text. + let mut queued_msg = message.clone(); + queued_msg.attachments.clear(); + result = self + .process_user_input(&queued_msg, session.clone(), thread_id, &next_content) + .await; + + // If processing failed, re-queue the drained content so it + // isn't lost. It will be picked up on the next successful turn. + if !matches!(&result, Ok(SubmissionResult::Response { .. })) { + let mut sess = session.lock().await; + if let Some(thread) = sess.threads.get_mut(&thread_id) { + thread.requeue_drained(next_content); + tracing::debug!( + thread_id = %thread_id, + "Re-queued drained content after non-Response result" + ); + } + } + } + + result } Submission::SystemCommand { command, args } => { tracing::debug!( diff --git a/src/agent/session.rs b/src/agent/session.rs index 3e84afc0b6..745b26be10 100644 --- a/src/agent/session.rs +++ b/src/agent/session.rs @@ -10,7 +10,7 @@ //! - Compaction: Summarize old turns to save context //! - Resume: Continue from a saved checkpoint -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use chrono::{DateTime, TimeDelta, Utc}; use serde::{Deserialize, Serialize}; @@ -222,8 +222,17 @@ pub struct Thread { /// Pending auth token request (thread is in auth mode). #[serde(default)] pub pending_auth: Option, + /// Messages queued while the thread was processing a turn. + #[serde(default, skip_serializing_if = "VecDeque::is_empty")] + pub pending_messages: VecDeque, } +/// Maximum number of messages that can be queued while a thread is processing. +/// 10 merged messages can produce a large combined input for the LLM, but this +/// is acceptable for the personal assistant use case where a single user sends +/// rapid follow-ups. The drain loop processes them as one newline-delimited turn. +pub const MAX_PENDING_MESSAGES: usize = 10; + impl Thread { /// Create a new thread. pub fn new(session_id: Uuid) -> Self { @@ -238,6 +247,7 @@ impl Thread { metadata: serde_json::Value::Null, pending_approval: None, pending_auth: None, + pending_messages: VecDeque::new(), } } @@ -254,6 +264,7 @@ impl Thread { metadata: serde_json::Value::Null, pending_approval: None, pending_auth: None, + pending_messages: VecDeque::new(), } } @@ -272,6 +283,47 @@ impl Thread { self.turns.last_mut() } + /// Queue a message for processing after the current turn completes. + /// Returns `false` if the queue is at capacity ([`MAX_PENDING_MESSAGES`]). + pub fn queue_message(&mut self, content: String) -> bool { + if self.pending_messages.len() >= MAX_PENDING_MESSAGES { + return false; + } + self.pending_messages.push_back(content); + self.updated_at = Utc::now(); + true + } + + /// Take the next pending message from the queue. + pub fn take_pending_message(&mut self) -> Option { + self.pending_messages.pop_front() + } + + /// Drain all pending messages from the queue. + /// Multiple messages are joined with newlines so the LLM receives + /// full context from rapid consecutive inputs (#259). + pub fn drain_pending_messages(&mut self) -> Option { + if self.pending_messages.is_empty() { + return None; + } + let parts: Vec = self.pending_messages.drain(..).collect(); + self.updated_at = Utc::now(); + Some(parts.join("\n")) + } + + /// Re-queue previously drained content at the front of the queue. + /// Used to preserve user input when the drain loop fails to process + /// merged messages (soft error, hard error, interrupt). + /// + /// This intentionally bypasses [`MAX_PENDING_MESSAGES`] — the content + /// was already counted against the cap before draining. The overshoot + /// is bounded to 1 entry (the re-queued merged string) plus any new + /// messages that arrived during the failed attempt. + pub fn requeue_drained(&mut self, content: String) { + self.pending_messages.push_front(content); + self.updated_at = Utc::now(); + } + /// Start a new turn with user input. pub fn start_turn(&mut self, user_input: impl Into) -> &mut Turn { let turn_number = self.turns.len(); @@ -335,11 +387,12 @@ impl Thread { self.pending_auth.take() } - /// Interrupt the current turn. + /// Interrupt the current turn and discard any queued messages. pub fn interrupt(&mut self) { if let Some(turn) = self.turns.last_mut() { turn.interrupt(); } + self.pending_messages.clear(); self.state = ThreadState::Interrupted; self.updated_at = Utc::now(); } @@ -1392,4 +1445,165 @@ mod tests { ); assert!(tool_result_content.ends_with("...")); } + + #[test] + fn test_thread_message_queue() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Queue is initially empty + assert!(thread.pending_messages.is_empty()); + assert!(thread.take_pending_message().is_none()); + + // Queue messages and verify FIFO ordering + assert!(thread.queue_message("first".to_string())); + assert!(thread.queue_message("second".to_string())); + assert!(thread.queue_message("third".to_string())); + assert_eq!(thread.pending_messages.len(), 3); + + assert_eq!(thread.take_pending_message(), Some("first".to_string())); + assert_eq!(thread.take_pending_message(), Some("second".to_string())); + assert_eq!(thread.take_pending_message(), Some("third".to_string())); + assert!(thread.take_pending_message().is_none()); + + // Fill to capacity — all 10 should succeed + for i in 0..MAX_PENDING_MESSAGES { + assert!(thread.queue_message(format!("msg-{}", i))); + } + assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES); + + // 11th message rejected by queue_message itself + assert!(!thread.queue_message("overflow".to_string())); + assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES); + + // Drain and verify order + for i in 0..MAX_PENDING_MESSAGES { + assert_eq!(thread.take_pending_message(), Some(format!("msg-{}", i))); + } + assert!(thread.take_pending_message().is_none()); + } + + #[test] + fn test_thread_message_queue_serialization() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Empty queue should not appear in serialization (skip_serializing_if) + let json = serde_json::to_string(&thread).unwrap(); + assert!(!json.contains("pending_messages")); + + // Non-empty queue should serialize and deserialize + thread.queue_message("queued msg".to_string()); + let json = serde_json::to_string(&thread).unwrap(); + assert!(json.contains("pending_messages")); + assert!(json.contains("queued msg")); + + let restored: Thread = serde_json::from_str(&json).unwrap(); + assert_eq!(restored.pending_messages.len(), 1); + assert_eq!(restored.pending_messages[0], "queued msg"); + } + + #[test] + fn test_thread_message_queue_default_on_old_data() { + // Deserialization of old data without pending_messages should default to empty + let thread = Thread::new(Uuid::new_v4()); + let json = serde_json::to_string(&thread).unwrap(); + + // The field is absent (skip_serializing_if), simulating old data + assert!(!json.contains("pending_messages")); + let restored: Thread = serde_json::from_str(&json).unwrap(); + assert!(restored.pending_messages.is_empty()); + } + + #[test] + fn test_interrupt_clears_pending_messages() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Start a turn so there's something to interrupt + thread.start_turn("initial input"); + + // Queue several messages while "processing" + thread.queue_message("queued-1".to_string()); + thread.queue_message("queued-2".to_string()); + thread.queue_message("queued-3".to_string()); + assert_eq!(thread.pending_messages.len(), 3); + + // Interrupt should clear the queue + thread.interrupt(); + assert!(thread.pending_messages.is_empty()); + assert_eq!(thread.state, ThreadState::Interrupted); + } + + #[test] + fn test_thread_state_idle_after_full_drain() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Simulate a full drain cycle: start turn, queue messages, complete turn, + // then drain all queued messages as a single merged turn (#259). + thread.start_turn("turn 1"); + assert_eq!(thread.state, ThreadState::Processing); + + thread.queue_message("queued-a".to_string()); + thread.queue_message("queued-b".to_string()); + + // Complete the turn (simulates process_user_input finishing) + thread.complete_turn("response 1"); + assert_eq!(thread.state, ThreadState::Idle); + + // Drain: merge all queued messages and process as a single turn + let merged = thread.drain_pending_messages().unwrap(); + assert_eq!(merged, "queued-a\nqueued-b"); + thread.start_turn(&merged); + thread.complete_turn("response for merged"); + + // Queue is fully drained, thread is idle + assert!(thread.drain_pending_messages().is_none()); + assert!(thread.pending_messages.is_empty()); + assert_eq!(thread.state, ThreadState::Idle); + } + + #[test] + fn test_drain_pending_messages_merges_with_newlines() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Empty queue returns None + assert!(thread.drain_pending_messages().is_none()); + + // Single message returned as-is (no trailing newline) + thread.queue_message("only one".to_string()); + assert_eq!( + thread.drain_pending_messages(), + Some("only one".to_string()), + ); + assert!(thread.pending_messages.is_empty()); + + // Multiple messages joined with newlines + thread.queue_message("hey".to_string()); + thread.queue_message("can you check the server".to_string()); + thread.queue_message("it started 10 min ago".to_string()); + assert_eq!( + thread.drain_pending_messages(), + Some("hey\ncan you check the server\nit started 10 min ago".to_string()), + ); + assert!(thread.pending_messages.is_empty()); + + // Queue is empty after drain + assert!(thread.drain_pending_messages().is_none()); + } + + #[test] + fn test_requeue_drained_preserves_content_at_front() { + let mut thread = Thread::new(Uuid::new_v4()); + + // Re-queue into empty queue + thread.requeue_drained("failed batch".to_string()); + assert_eq!(thread.pending_messages.len(), 1); + assert_eq!(thread.pending_messages[0], "failed batch"); + + // New messages go behind the re-queued content + thread.queue_message("new msg".to_string()); + assert_eq!(thread.pending_messages.len(), 2); + + // Drain should return re-queued content first (front of queue) + let merged = thread.drain_pending_messages().unwrap(); + assert_eq!(merged, "failed batch\nnew msg"); + } } diff --git a/src/agent/thread_ops.rs b/src/agent/thread_ops.rs index 0fb968f160..5b81dfa9f3 100644 --- a/src/agent/thread_ops.rs +++ b/src/agent/thread_ops.rs @@ -14,7 +14,7 @@ use crate::agent::compaction::ContextCompactor; use crate::agent::dispatcher::{ AgenticLoopResult, check_auth_required, execute_chat_tool_standalone, parse_auth_result, }; -use crate::agent::session::{PendingApproval, Session, ThreadState}; +use crate::agent::session::{MAX_PENDING_MESSAGES, PendingApproval, Session, ThreadState}; use crate::agent::submission::SubmissionResult; use crate::channels::web::util::truncate_preview; use crate::channels::{IncomingMessage, StatusUpdate}; @@ -211,14 +211,72 @@ impl Agent { // Check thread state match thread_state { ThreadState::Processing => { - tracing::warn!( - message_id = %message.id, - thread_id = %thread_id, - "Thread is processing, rejecting new input" - ); - return Ok(SubmissionResult::error( - "Turn in progress. Use /interrupt to cancel.", - )); + let mut sess = session.lock().await; + if let Some(thread) = sess.threads.get_mut(&thread_id) { + // Re-check state under lock — the turn may have completed + // between the snapshot read and this mutable lock acquisition. + if thread.state == ThreadState::Processing { + // Reject messages with attachments — the queue stores + // text only, so attachments would be silently dropped. + if !message.attachments.is_empty() { + return Ok(SubmissionResult::error( + "Cannot queue messages with attachments while a turn is processing. \ + Please resend after the current turn completes.", + )); + } + + // Run the same safety checks that the normal path applies + // (validation, policy, secret scan) so that blocked content + // is never stored in pending_messages or serialized. + let validation = self.safety().validate_input(content); + if !validation.is_valid { + let details = validation + .errors + .iter() + .map(|e| format!("{}: {}", e.field, e.message)) + .collect::>() + .join("; "); + return Ok(SubmissionResult::error(format!( + "Input rejected by safety validation: {details}", + ))); + } + let violations = self.safety().check_policy(content); + if violations + .iter() + .any(|rule| rule.action == crate::safety::PolicyAction::Block) + { + return Ok(SubmissionResult::error("Input rejected by safety policy.")); + } + if let Some(warning) = self.safety().scan_inbound_for_secrets(content) { + tracing::warn!( + user = %message.user_id, + channel = %message.channel, + "Queued message blocked: contains leaked secret" + ); + return Ok(SubmissionResult::error(warning)); + } + + if !thread.queue_message(content.to_string()) { + return Ok(SubmissionResult::error(format!( + "Message queue full ({MAX_PENDING_MESSAGES}). Wait for the current turn to complete.", + ))); + } + // Return `Ok` (not `Response`) so the drain loop in + // agent_loop.rs breaks — `Ok` signals a control + // acknowledgment, not a completed LLM turn. + return Ok(SubmissionResult::Ok { + message: Some( + "Message queued — will be processed after the current turn.".into(), + ), + }); + } + // State changed (turn completed) — fall through to process normally. + // NOTE: `sess` (the Mutex guard) is dropped at the end of + // this `Processing` match arm, releasing the session lock + // before the rest of process_user_input runs. No deadlock. + } else { + return Ok(SubmissionResult::error("Thread no longer exists.")); + } } ThreadState::AwaitingApproval => { tracing::warn!( @@ -849,6 +907,7 @@ impl Agent { .get_mut(&thread_id) .ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?; thread.turns.clear(); + thread.pending_messages.clear(); thread.state = ThreadState::Idle; // Clear undo history too @@ -2012,6 +2071,112 @@ mod tests { } } + #[test] + fn test_queue_cap_rejects_at_capacity() { + use crate::agent::session::{MAX_PENDING_MESSAGES, Thread, ThreadState}; + use uuid::Uuid; + + let mut thread = Thread::new(Uuid::new_v4()); + thread.start_turn("processing something"); + assert_eq!(thread.state, ThreadState::Processing); + + // Fill the queue to the cap + for i in 0..MAX_PENDING_MESSAGES { + assert!(thread.queue_message(format!("msg-{}", i))); + } + assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES); + + // The next message should be rejected by queue_message + assert!(!thread.queue_message("overflow".to_string())); + assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES); + + // Verify all drain in FIFO order + for i in 0..MAX_PENDING_MESSAGES { + assert_eq!(thread.take_pending_message(), Some(format!("msg-{}", i))); + } + assert!(thread.take_pending_message().is_none()); + } + + #[test] + fn test_clear_clears_pending_messages() { + use crate::agent::session::{Thread, ThreadState}; + use uuid::Uuid; + + let mut thread = Thread::new(Uuid::new_v4()); + thread.start_turn("processing"); + + thread.queue_message("pending-1".to_string()); + thread.queue_message("pending-2".to_string()); + assert_eq!(thread.pending_messages.len(), 2); + + // Simulate what process_clear does: clear turns and pending_messages + thread.turns.clear(); + thread.pending_messages.clear(); + thread.state = ThreadState::Idle; + + assert!(thread.pending_messages.is_empty()); + assert!(thread.turns.is_empty()); + assert_eq!(thread.state, ThreadState::Idle); + } + + #[test] + fn test_processing_arm_thread_gone_returns_error() { + // Regression: if the thread disappears between the state snapshot and the + // mutable lock, the Processing arm must return an error — not a false + // "queued" acknowledgment. + // + // Exercises the exact branch at the `else` of + // `if let Some(thread) = sess.threads.get_mut(&thread_id)`. + use crate::agent::session::{Session, Thread, ThreadState}; + use uuid::Uuid; + + let thread_id = Uuid::new_v4(); + let session_id = Uuid::new_v4(); + let mut thread = Thread::with_id(thread_id, session_id); + thread.start_turn("working"); + assert_eq!(thread.state, ThreadState::Processing); + + let mut session = Session::new("test-user"); + session.threads.insert(thread_id, thread); + + // Simulate the thread disappearing (e.g., /clear racing with queue) + session.threads.remove(&thread_id); + + // The Processing arm re-locks and calls get_mut — must get None. + assert!(session.threads.get_mut(&thread_id).is_none()); + // Nothing was queued anywhere — the removed thread's queue is gone. + } + + #[test] + fn test_processing_arm_state_changed_does_not_queue() { + // Regression: if the thread transitions from Processing to Idle between + // the state snapshot and the mutable lock, the message must NOT be queued. + // Instead the Processing arm falls through to normal processing. + // + // Exercises the `if thread.state == ThreadState::Processing` re-check. + use crate::agent::session::{Session, Thread, ThreadState}; + use uuid::Uuid; + + let thread_id = Uuid::new_v4(); + let session_id = Uuid::new_v4(); + let mut thread = Thread::with_id(thread_id, session_id); + thread.start_turn("working"); + assert_eq!(thread.state, ThreadState::Processing); + + // Simulate the turn completing between snapshot and re-lock + thread.complete_turn("done"); + assert_eq!(thread.state, ThreadState::Idle); + + let mut session = Session::new("test-user"); + session.threads.insert(thread_id, thread); + + // Re-check under lock: state is Idle, so queue_message must NOT be called. + let t = session.threads.get_mut(&thread_id).unwrap(); + assert_ne!(t.state, ThreadState::Processing); + // Verify nothing was queued — the fall-through path doesn't touch the queue. + assert!(t.pending_messages.is_empty()); + } + // Helper function to extract the approval message without needing a full Agent instance fn extract_approval_message( session: &crate::agent::session::Session, diff --git a/tests/e2e_advanced_traces.rs b/tests/e2e_advanced_traces.rs index 9ae9c09b86..2b9fac2990 100644 --- a/tests/e2e_advanced_traces.rs +++ b/tests/e2e_advanced_traces.rs @@ -707,7 +707,115 @@ mod advanced { } // ----------------------------------------------------------------------- - // 9. Bootstrap greeting fires on fresh workspace + // 9. Message queue during tool execution + // + // Verifies that messages queued on a thread's pending_messages are + // auto-processed by the drain loop after the current turn completes. + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn message_queue_drains_after_tool_turn() { + let trace = + LlmTrace::from_file(format!("{FIXTURES}/message_queue_during_tools.json")).unwrap(); + let rig = TestRigBuilder::new() + .with_trace(trace.clone()) + .build() + .await; + + // Turn 1: Send initial message to establish the session and thread. + rig.send_message("Echo hello for me").await; + let r1 = rig.wait_for_responses(1, TIMEOUT).await; + assert!(!r1.is_empty(), "Turn 1: no response"); + assert!( + r1[0].content.to_lowercase().contains("hello"), + "Turn 1: missing 'hello' in: {}", + r1[0].content, + ); + + // Verify the echo tool was used in turn 1. + let started = rig.tool_calls_started(); + assert!( + started.iter().any(|s| s == "echo"), + "Turn 1: echo tool not called: {started:?}", + ); + + // Pre-populate the thread's pending_messages queue. + // This simulates what happens when a concurrent request (e.g. gateway + // POST) arrives while the thread is in Processing state. + { + let session = rig + .session_manager() + .get_or_create_session("test-user") + .await; + let mut sess = session.lock().await; + // Find the active thread and queue a message. + let thread = sess + .active_thread + .and_then(|tid| sess.threads.get_mut(&tid)) + .expect("active thread should exist after turn 1"); + thread.queue_message("What is 2+2?".to_string()); + assert_eq!(thread.pending_messages.len(), 1); + } + + // Turn 2: Send a message that triggers tool calls. + // After this turn completes, the drain loop should find "What is 2+2?" + // in pending_messages and process it automatically. + rig.send_message("Now echo world and check the time").await; + + // Wait for 3 total responses: + // r1 = turn 1 response ("hello") + // r2 = turn 2 response ("echo world + time") — sent inline by drain loop + // r3 = queued message response ("2+2 = 4") — processed by drain loop + let all = rig.wait_for_responses(3, TIMEOUT).await; + assert!( + all.len() >= 3, + "Expected 3 responses (turn1 + turn2 + queued), got {}:\n{:?}", + all.len(), + all.iter().map(|r| &r.content).collect::>(), + ); + + // The third response should be from the queued message ("What is 2+2?") + let queued_response = &all[2].content; + assert!( + queued_response.contains("4"), + "Queued message response should contain '4', got: {queued_response}", + ); + + // Verify the pending queue was fully drained. + { + let session = rig + .session_manager() + .get_or_create_session("test-user") + .await; + let sess = session.lock().await; + let thread = sess + .active_thread + .and_then(|tid| sess.threads.get(&tid)) + .expect("active thread should still exist"); + assert!( + thread.pending_messages.is_empty(), + "Pending queue should be empty after drain, got: {:?}", + thread.pending_messages, + ); + } + + // Verify tool usage across all turns. + let all_started = rig.tool_calls_started(); + let echo_count = all_started.iter().filter(|s| *s == "echo").count(); + assert_eq!( + echo_count, 2, + "Expected 2 echo calls (turn 1 + turn 2), got {echo_count}", + ); + assert!( + all_started.iter().any(|s| s == "time"), + "time tool should have been called in turn 2: {all_started:?}", + ); + + rig.shutdown(); + } + + // ----------------------------------------------------------------------- + // 10. Bootstrap greeting fires on fresh workspace // ----------------------------------------------------------------------- /// Verifies that a fresh workspace triggers a static bootstrap greeting @@ -740,7 +848,7 @@ mod advanced { } // ----------------------------------------------------------------------- - // 10. Bootstrap onboarding completes and clears BOOTSTRAP.md + // 11. Bootstrap onboarding completes and clears BOOTSTRAP.md // ----------------------------------------------------------------------- /// Exercises the full onboarding flow: bootstrap greeting fires, user diff --git a/tests/fixtures/llm_traces/advanced/message_queue_during_tools.json b/tests/fixtures/llm_traces/advanced/message_queue_during_tools.json new file mode 100644 index 0000000000..915825ad8e --- /dev/null +++ b/tests/fixtures/llm_traces/advanced/message_queue_during_tools.json @@ -0,0 +1,104 @@ +{ + "model_name": "advanced-message-queue-during-tools", + "turns": [ + { + "user_input": "Echo hello for me", + "steps": [ + { + "request_hint": { "last_user_message_contains": "Echo hello" }, + "response": { + "type": "tool_calls", + "tool_calls": [ + { + "id": "call_echo_setup", + "name": "echo", + "arguments": { "message": "hello" } + } + ], + "input_tokens": 80, + "output_tokens": 20 + } + }, + { + "response": { + "type": "text", + "content": "I echoed hello for you. The tool returned: hello", + "input_tokens": 120, + "output_tokens": 25 + } + } + ], + "expects": { + "tools_used": ["echo"], + "all_tools_succeeded": true, + "response_contains": ["hello"] + } + }, + { + "user_input": "Now echo world and check the time", + "steps": [ + { + "request_hint": { "last_user_message_contains": "echo world" }, + "response": { + "type": "tool_calls", + "tool_calls": [ + { + "id": "call_echo_main", + "name": "echo", + "arguments": { "message": "world" } + } + ], + "input_tokens": 160, + "output_tokens": 20 + } + }, + { + "response": { + "type": "tool_calls", + "tool_calls": [ + { + "id": "call_time_main", + "name": "time", + "arguments": {} + } + ], + "input_tokens": 200, + "output_tokens": 15 + } + }, + { + "response": { + "type": "text", + "content": "Done! I echoed world and checked the time for you.", + "input_tokens": 250, + "output_tokens": 20 + } + } + ], + "expects": { + "tools_used": ["echo", "time"], + "all_tools_succeeded": true + } + }, + { + "user_input": "What is 2+2?", + "steps": [ + { + "response": { + "type": "text", + "content": "2+2 equals 4.", + "input_tokens": 80, + "output_tokens": 10 + } + } + ], + "expects": { + "response_contains": ["4"] + } + } + ], + "expects": { + "tools_used": ["echo", "time"], + "min_responses": 3 + } +} diff --git a/tests/support/test_rig.rs b/tests/support/test_rig.rs index 55cba5d067..b3bd06be65 100644 --- a/tests/support/test_rig.rs +++ b/tests/support/test_rig.rs @@ -53,6 +53,9 @@ pub struct TestRig { /// Extension manager for direct extension operations in tests. #[cfg(feature = "libsql")] extension_manager: Option>, + /// Session manager for direct session/thread access in tests. + #[cfg(feature = "libsql")] + session_manager: Arc, /// Temp directory guard -- keeps the libSQL database file alive. #[cfg(feature = "libsql")] _temp_dir: tempfile::TempDir, @@ -84,6 +87,12 @@ impl TestRig { self.extension_manager.as_ref() } + /// Return the session manager for direct session/thread access in tests. + #[cfg(feature = "libsql")] + pub fn session_manager(&self) -> &Arc { + &self.session_manager + } + /// Wait until at least `n` responses have been captured, or `timeout` elapses. pub async fn wait_for_responses(&self, n: usize, timeout: Duration) -> Vec { self.channel.wait_for_responses(n, timeout).await @@ -626,6 +635,7 @@ impl TestRigBuilder { let db_ref = components.db.clone().expect("test rig requires a database"); let workspace_ref = components.workspace.clone(); let ext_mgr_ref = components.extension_manager.clone(); + let session_manager_ref = Arc::new(ironclaw::agent::SessionManager::new()); // 7. Construct AgentDeps from AppComponents (mirrors main.rs). let deps = AgentDeps { @@ -703,7 +713,7 @@ impl TestRigBuilder { None, // hygiene_config routine_config, Some(Arc::clone(&components.context_manager)), - None, // session_manager + Some(Arc::clone(&session_manager_ref)), ); // Match main.rs: fill the scheduler slot once Agent::new has created it. @@ -731,6 +741,7 @@ impl TestRigBuilder { workspace: workspace_ref, trace_llm: trace_llm_ref, extension_manager: ext_mgr_ref, + session_manager: session_manager_ref, _temp_dir: temp_dir, } }