Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,92 @@ impl Agent {
// Process based on submission type
let result = match submission {
Submission::UserInput { content } => {
self.process_user_input(message, session, thread_id, &content)
.await
let mut result = self
.process_user_input(message, session.clone(), thread_id, &content)
.await;

// Drain any messages queued during processing.
// Messages are merged (newline-separated) so the LLM receives
// full context from rapid consecutive inputs instead of
// processing each as a separate turn with partial context (#259).
//
// Only `Response` continues the drain — the user got a normal
// reply and there may be more queued messages to process.
//
// Everything else stops the loop:
// - `NeedApproval`: thread is blocked on user approval
// - `Interrupted`: turn was cancelled
// - `Ok`: control-command acknowledgment (including the "queued"
// ack returned when a message arrives during Processing)
// - `Error`: soft error — draining more messages after an error
// would produce confusing interleaved output
// - `Err(_)`: hard error
while let Ok(SubmissionResult::Response { content: outgoing }) = &result {
let merged = {
let mut sess = session.lock().await;
sess.threads
.get_mut(&thread_id)
.and_then(|t| t.drain_pending_messages())
};
let Some(next_content) = merged else {
break;
};

tracing::debug!(
thread_id = %thread_id,
merged_len = next_content.len(),
"Drain loop: processing merged queued messages"
);

// Send the completed turn's response before starting the next.
//
// Known limitations:
// - One-shot channels (HttpChannel) consume the response
// sender on the first respond() call keyed by msg.id.
// Subsequent calls (including the outer handler's final
// respond) are silently dropped. For one-shot channels
// only this intermediate response is delivered.
// - All drain-loop responses are routed via the original
// `message`, so channels that key routing on message
// identity will attribute every response to the first
// message. This is acceptable for the current
// single-user-per-thread model.
if let Err(e) = self
.channels
.respond(message, OutgoingResponse::text(outgoing.clone()))
.await
{
tracing::warn!(
thread_id = %thread_id,
"Failed to send intermediate drain-loop response: {e}"
);
}

// Process merged queued messages as a single turn.
// Use a message clone with cleared attachments so
// augment_with_attachments doesn't re-apply the original
// message's attachments to unrelated queued text.
let mut queued_msg = message.clone();
queued_msg.attachments.clear();
result = self
.process_user_input(&queued_msg, session.clone(), thread_id, &next_content)
.await;
Comment on lines +1221 to +1225
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented as known limitation in db185a8. All drain-loop responses route via the original message, which is acceptable for single-user-per-thread.


// If processing failed, re-queue the drained content so it
// isn't lost. It will be picked up on the next successful turn.
if !matches!(&result, Ok(SubmissionResult::Response { .. })) {
let mut sess = session.lock().await;
if let Some(thread) = sess.threads.get_mut(&thread_id) {
thread.requeue_drained(next_content);
tracing::debug!(
thread_id = %thread_id,
"Re-queued drained content after non-Response result"
);
}
}
}

result
}
Submission::SystemCommand { command, args } => {
tracing::debug!(
Expand Down
218 changes: 216 additions & 2 deletions src/agent/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! - Compaction: Summarize old turns to save context
//! - Resume: Continue from a saved checkpoint

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};

use chrono::{DateTime, TimeDelta, Utc};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -222,8 +222,17 @@ pub struct Thread {
/// Pending auth token request (thread is in auth mode).
#[serde(default)]
pub pending_auth: Option<PendingAuth>,
/// Messages queued while the thread was processing a turn.
#[serde(default, skip_serializing_if = "VecDeque::is_empty")]
pub pending_messages: VecDeque<String>,
}

/// Maximum number of messages that can be queued while a thread is processing.
/// 10 merged messages can produce a large combined input for the LLM, but this
/// is acceptable for the personal assistant use case where a single user sends
/// rapid follow-ups. The drain loop processes them as one newline-delimited turn.
pub const MAX_PENDING_MESSAGES: usize = 10;

impl Thread {
/// Create a new thread.
pub fn new(session_id: Uuid) -> Self {
Expand All @@ -238,6 +247,7 @@ impl Thread {
metadata: serde_json::Value::Null,
pending_approval: None,
pending_auth: None,
pending_messages: VecDeque::new(),
}
}

Expand All @@ -254,6 +264,7 @@ impl Thread {
metadata: serde_json::Value::Null,
pending_approval: None,
pending_auth: None,
pending_messages: VecDeque::new(),
}
}

Expand All @@ -272,6 +283,47 @@ impl Thread {
self.turns.last_mut()
}

/// Queue a message for processing after the current turn completes.
/// Returns `false` if the queue is at capacity ([`MAX_PENDING_MESSAGES`]).
pub fn queue_message(&mut self, content: String) -> bool {
if self.pending_messages.len() >= MAX_PENDING_MESSAGES {
return false;
}
self.pending_messages.push_back(content);
self.updated_at = Utc::now();
true
}
Comment on lines +286 to +295
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread::queue_message() mutates pending_messages but does not update updated_at. Since thread lists are sorted by updated_at (e.g. web thread listing), a long-running turn that is receiving queued follow-ups may not appear as recently updated, and operational cleanup that keys off timestamps could be inaccurate. Consider touching updated_at when queue state changes (queue/drain/requeue).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d91c76a — queue_message(), drain_pending_messages(), and requeue_drained() all now touch updated_at.


/// Take the next pending message from the queue.
pub fn take_pending_message(&mut self) -> Option<String> {
self.pending_messages.pop_front()
}

/// Drain all pending messages from the queue.
/// Multiple messages are joined with newlines so the LLM receives
/// full context from rapid consecutive inputs (#259).
pub fn drain_pending_messages(&mut self) -> Option<String> {
if self.pending_messages.is_empty() {
return None;
}
let parts: Vec<String> = self.pending_messages.drain(..).collect();
self.updated_at = Utc::now();
Some(parts.join("\n"))
}

/// Re-queue previously drained content at the front of the queue.
/// Used to preserve user input when the drain loop fails to process
/// merged messages (soft error, hard error, interrupt).
///
/// This intentionally bypasses [`MAX_PENDING_MESSAGES`] — the content
/// was already counted against the cap before draining. The overshoot
/// is bounded to 1 entry (the re-queued merged string) plus any new
/// messages that arrived during the failed attempt.
pub fn requeue_drained(&mut self, content: String) {
self.pending_messages.push_front(content);
self.updated_at = Utc::now();
}

/// Start a new turn with user input.
pub fn start_turn(&mut self, user_input: impl Into<String>) -> &mut Turn {
let turn_number = self.turns.len();
Expand Down Expand Up @@ -335,11 +387,12 @@ impl Thread {
self.pending_auth.take()
}

/// Interrupt the current turn.
/// Interrupt the current turn and discard any queued messages.
pub fn interrupt(&mut self) {
if let Some(turn) = self.turns.last_mut() {
turn.interrupt();
}
self.pending_messages.clear();
self.state = ThreadState::Interrupted;
self.updated_at = Utc::now();
}
Expand Down Expand Up @@ -1392,4 +1445,165 @@ mod tests {
);
assert!(tool_result_content.ends_with("..."));
}

#[test]
fn test_thread_message_queue() {
let mut thread = Thread::new(Uuid::new_v4());

// Queue is initially empty
assert!(thread.pending_messages.is_empty());
assert!(thread.take_pending_message().is_none());

// Queue messages and verify FIFO ordering
assert!(thread.queue_message("first".to_string()));
assert!(thread.queue_message("second".to_string()));
assert!(thread.queue_message("third".to_string()));
assert_eq!(thread.pending_messages.len(), 3);

assert_eq!(thread.take_pending_message(), Some("first".to_string()));
assert_eq!(thread.take_pending_message(), Some("second".to_string()));
assert_eq!(thread.take_pending_message(), Some("third".to_string()));
assert!(thread.take_pending_message().is_none());

// Fill to capacity — all 10 should succeed
for i in 0..MAX_PENDING_MESSAGES {
assert!(thread.queue_message(format!("msg-{}", i)));
}
assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES);

// 11th message rejected by queue_message itself
assert!(!thread.queue_message("overflow".to_string()));
assert_eq!(thread.pending_messages.len(), MAX_PENDING_MESSAGES);

// Drain and verify order
for i in 0..MAX_PENDING_MESSAGES {
assert_eq!(thread.take_pending_message(), Some(format!("msg-{}", i)));
}
assert!(thread.take_pending_message().is_none());
}

#[test]
fn test_thread_message_queue_serialization() {
let mut thread = Thread::new(Uuid::new_v4());

// Empty queue should not appear in serialization (skip_serializing_if)
let json = serde_json::to_string(&thread).unwrap();
assert!(!json.contains("pending_messages"));

// Non-empty queue should serialize and deserialize
thread.queue_message("queued msg".to_string());
let json = serde_json::to_string(&thread).unwrap();
assert!(json.contains("pending_messages"));
assert!(json.contains("queued msg"));

let restored: Thread = serde_json::from_str(&json).unwrap();
assert_eq!(restored.pending_messages.len(), 1);
assert_eq!(restored.pending_messages[0], "queued msg");
}

#[test]
fn test_thread_message_queue_default_on_old_data() {
// Deserialization of old data without pending_messages should default to empty
let thread = Thread::new(Uuid::new_v4());
let json = serde_json::to_string(&thread).unwrap();

// The field is absent (skip_serializing_if), simulating old data
assert!(!json.contains("pending_messages"));
let restored: Thread = serde_json::from_str(&json).unwrap();
assert!(restored.pending_messages.is_empty());
}

#[test]
fn test_interrupt_clears_pending_messages() {
let mut thread = Thread::new(Uuid::new_v4());

// Start a turn so there's something to interrupt
thread.start_turn("initial input");

// Queue several messages while "processing"
thread.queue_message("queued-1".to_string());
thread.queue_message("queued-2".to_string());
thread.queue_message("queued-3".to_string());
assert_eq!(thread.pending_messages.len(), 3);

// Interrupt should clear the queue
thread.interrupt();
assert!(thread.pending_messages.is_empty());
assert_eq!(thread.state, ThreadState::Interrupted);
}

#[test]
fn test_thread_state_idle_after_full_drain() {
let mut thread = Thread::new(Uuid::new_v4());

// Simulate a full drain cycle: start turn, queue messages, complete turn,
// then drain all queued messages as a single merged turn (#259).
thread.start_turn("turn 1");
assert_eq!(thread.state, ThreadState::Processing);

thread.queue_message("queued-a".to_string());
thread.queue_message("queued-b".to_string());

// Complete the turn (simulates process_user_input finishing)
thread.complete_turn("response 1");
assert_eq!(thread.state, ThreadState::Idle);

// Drain: merge all queued messages and process as a single turn
let merged = thread.drain_pending_messages().unwrap();
assert_eq!(merged, "queued-a\nqueued-b");
thread.start_turn(&merged);
thread.complete_turn("response for merged");

// Queue is fully drained, thread is idle
assert!(thread.drain_pending_messages().is_none());
assert!(thread.pending_messages.is_empty());
assert_eq!(thread.state, ThreadState::Idle);
}

#[test]
fn test_drain_pending_messages_merges_with_newlines() {
let mut thread = Thread::new(Uuid::new_v4());

// Empty queue returns None
assert!(thread.drain_pending_messages().is_none());

// Single message returned as-is (no trailing newline)
thread.queue_message("only one".to_string());
assert_eq!(
thread.drain_pending_messages(),
Some("only one".to_string()),
);
assert!(thread.pending_messages.is_empty());

// Multiple messages joined with newlines
thread.queue_message("hey".to_string());
thread.queue_message("can you check the server".to_string());
thread.queue_message("it started 10 min ago".to_string());
assert_eq!(
thread.drain_pending_messages(),
Some("hey\ncan you check the server\nit started 10 min ago".to_string()),
);
assert!(thread.pending_messages.is_empty());

// Queue is empty after drain
assert!(thread.drain_pending_messages().is_none());
}

#[test]
fn test_requeue_drained_preserves_content_at_front() {
let mut thread = Thread::new(Uuid::new_v4());

// Re-queue into empty queue
thread.requeue_drained("failed batch".to_string());
assert_eq!(thread.pending_messages.len(), 1);
assert_eq!(thread.pending_messages[0], "failed batch");

// New messages go behind the re-queued content
thread.queue_message("new msg".to_string());
assert_eq!(thread.pending_messages.len(), 2);

// Drain should return re-queued content first (front of queue)
let merged = thread.drain_pending_messages().unwrap();
assert_eq!(merged, "failed batch\nnew msg");
}
}
Loading
Loading