Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 95 additions & 4 deletions src/agent/thread_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::agent::dispatcher::{
};
use crate::agent::session::{PendingApproval, Session, ThreadState};
use crate::agent::submission::SubmissionResult;
use crate::channels::web::util::truncate_preview;
use crate::channels::{IncomingMessage, StatusUpdate};
use crate::context::JobContext;
use crate::error::Error;
Expand Down Expand Up @@ -69,6 +70,8 @@ impl Agent {
.filter_map(|m| match m.role.as_str() {
"user" => Some(ChatMessage::user(&m.content)),
"assistant" => Some(ChatMessage::assistant(&m.content)),
// tool_calls rows are UI metadata (tool name + preview),
// not part of the LLM conversation context.
_ => None,
})
.collect();
Expand Down Expand Up @@ -315,6 +318,11 @@ impl Agent {
};

thread.complete_turn(&response);
let tool_calls = thread
.turns
.last()
.map(|t| t.tool_calls.clone())
.unwrap_or_default();
let _ = self
.channels
.send_status(
Expand All @@ -324,7 +332,9 @@ impl Agent {
)
.await;

// Persist assistant response (user message already persisted at turn start)
// Persist tool calls then assistant response (user message already persisted at turn start)
self.persist_tool_calls(thread_id, &message.user_id, &tool_calls)
.await;
self.persist_assistant_response(thread_id, &message.user_id, &response)
.await;

Expand Down Expand Up @@ -423,6 +433,68 @@ impl Agent {
}
}

/// Persist tool call summaries to the DB as a `role="tool_calls"` message.
///
/// Stored between the user and assistant messages so that
/// `build_turns_from_db_messages` can reconstruct the tool call history.
/// Content is a JSON array of tool call summaries.
pub(super) async fn persist_tool_calls(
&self,
thread_id: Uuid,
user_id: &str,
tool_calls: &[crate::agent::session::TurnToolCall],
) {
if tool_calls.is_empty() {
return;
}

let store = match self.store() {
Some(s) => Arc::clone(s),
None => return,
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persist_tool_calls and persist_assistant_response lack transactional ordering guarantee

The persist_tool_calls and persist_assistant_response are called sequentially but each performs its own ensure_conversation + add_conversation_message pair without a wrapping transaction. If the process crashes between persisting tool_calls and assistant response, the DB will have a tool_calls row without a following assistant row. build_turns_from_db_messages handles this gracefully (the turn is marked 'Failed'), but the tool_calls data would be orphaned relative to its turn.

In the normal case, since these are sequential awaits in a single-user gateway, interleaving is unlikely. But it's worth noting the lack of atomicity.

Suggested fix:

Consider wrapping tool_calls + assistant message persistence in a single DB transaction, or document that partial persistence is acceptable and handled by the reader.

Severity: low · Confidence: medium

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — the reader (build_turns_from_db_messages) already handles partial persistence gracefully by marking turns without an assistant response as "Failed". Adding a wrapping transaction would require plumbing a transaction API through the Database trait which feels out of scope here. Leaving as-is with the understanding that partial persistence is handled by the reader.


let summaries: Vec<serde_json::Value> = tool_calls
.iter()
.map(|tc| {
let mut obj = serde_json::json!({ "name": tc.name });
if let Some(ref result) = tc.result {
let preview = match result {
serde_json::Value::String(s) => truncate_preview(s, 500),
other => truncate_preview(&other.to_string(), 500),
};
obj["result_preview"] = serde_json::Value::String(preview);
}
if let Some(ref error) = tc.error {
obj["error"] = serde_json::Value::String(truncate_preview(error, 200));
}
obj
})
.collect();

let content = match serde_json::to_string(&summaries) {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to serialize tool calls: {}", e);
return;
}
};

if let Err(e) = store
.ensure_conversation(thread_id, "gateway", user_id, None)
.await
{
tracing::warn!("Failed to ensure conversation {}: {}", thread_id, e);
return;
}

if let Err(e) = store
.add_conversation_message(thread_id, "tool_calls", &content)
.await
{
tracing::warn!("Failed to persist tool calls: {}", e);
}
}

pub(super) async fn process_undo(
&self,
session: Arc<Mutex<Session>>,
Expand Down Expand Up @@ -591,15 +663,27 @@ impl Agent {
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;

if thread.state != ThreadState::AwaitingApproval {
return Ok(SubmissionResult::error("No pending approval request."));
// Stale or duplicate approval (tool already executed) — silently ignore.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silencing stale approval errors masks real bugs

Changing SubmissionResult::error("No pending approval request.") to SubmissionResult::ok_with_message("") silences the response when a user submits an approval for a thread not in AwaitingApproval state or when take_pending_approval() returns None. While this improves UX for the common stale/duplicate approval case, it also means that if there's a real bug that causes approval state to be lost (e.g., a race condition in session management), the error signal is completely suppressed. The user's approval action silently succeeds with an empty message, and the tool never executes.

Suggested fix:

Consider logging at `debug!` or `info!` level when a stale approval is silently ignored, so the suppression is at least observable in logs. E.g.: `tracing::debug!("Ignoring stale approval for thread {}: state is {:?}", thread_id, thread.state);`

Severity: medium · Confidence: medium

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 06786c6 — added tracing::debug! with thread_id and current state for both stale approval paths (not in AwaitingApproval state, and no pending approval found).

tracing::debug!(
%thread_id,
state = ?thread.state,
"Ignoring stale approval: thread not in AwaitingApproval state"
);
return Ok(SubmissionResult::ok_with_message(""));
}

thread.take_pending_approval()
};

let pending = match pending {
Some(p) => p,
None => return Ok(SubmissionResult::error("No pending approval request.")),
None => {
tracing::debug!(
%thread_id,
"Ignoring stale approval: no pending approval found"
);
return Ok(SubmissionResult::ok_with_message(""));
}
};

// Verify request ID if provided
Expand Down Expand Up @@ -1040,7 +1124,14 @@ impl Agent {
match result {
Ok(AgenticLoopResult::Response(response)) => {
thread.complete_turn(&response);
// User message already persisted at turn start; save assistant response
let tool_calls = thread
.turns
.last()
.map(|t| t.tool_calls.clone())
.unwrap_or_default();
// User message already persisted at turn start; save tool calls then assistant response
self.persist_tool_calls(thread_id, &message.user_id, &tool_calls)
.await;
self.persist_assistant_response(thread_id, &message.user_id, &response)
.await;
let _ = self
Expand Down
170 changes: 126 additions & 44 deletions src/channels/web/handlers/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use uuid::Uuid;
use crate::channels::IncomingMessage;
use crate::channels::web::server::GatewayState;
use crate::channels::web::types::*;
use crate::channels::web::util::{build_turns_from_db_messages, truncate_preview};

pub async fn chat_send_handler(
State(state): State<Arc<GatewayState>>,
Expand Down Expand Up @@ -317,12 +318,13 @@ pub async fn chat_history_handler(
turns,
has_more,
oldest_timestamp,
pending_approval: None,
}));
}

// Try in-memory first (freshest data for active threads)
if let Some(thread) = sess.threads.get(&thread_id)
&& !thread.turns.is_empty()
&& (!thread.turns.is_empty() || thread.pending_approval.is_some())
{
let turns: Vec<TurnInfo> = thread
.turns
Expand All @@ -341,16 +343,35 @@ pub async fn chat_history_handler(
name: tc.name.clone(),
has_result: tc.result.is_some(),
has_error: tc.error.is_some(),
result_preview: tc.result.as_ref().map(|r| {
let s = match r {
serde_json::Value::String(s) => s.clone(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw tool error strings exposed to frontend via persisted history

The error field from TurnToolCall is now serialized into the tool_calls DB content and returned to the frontend in ToolCallInfo.error. These error strings originate from e.to_string() on tool execution failures and could contain internal file paths, stack traces, or system-specific details. While these errors are already visible in real-time via SSE tool_completed events, persisting them in the DB and returning them in history responses increases their exposure surface — they're now available long after the original error occurred.

Suggested fix:

Consider truncating or sanitizing error strings before persisting (e.g., `truncate_preview(&error, 200)`) to limit exposure of potentially sensitive internal details.

Severity: low · Confidence: low

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 06786c6 — error strings are now truncated to 200 bytes via truncate_preview(&error, 200) before persisting to DB.

other => other.to_string(),
};
truncate_preview(&s, 500)
}),
error: tc.error.clone(),
})
.collect(),
})
.collect();

let pending_approval = thread
.pending_approval
.as_ref()
.map(|pa| PendingApprovalInfo {
request_id: pa.request_id.to_string(),
tool_name: pa.tool_name.clone(),
description: pa.description.clone(),
parameters: serde_json::to_string_pretty(&pa.parameters).unwrap_or_default(),
});

return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more: false,
oldest_timestamp: None,
pending_approval,
}));
}

Expand All @@ -369,6 +390,7 @@ pub async fn chat_history_handler(
turns,
has_more,
oldest_timestamp,
pending_approval: None,
}));
}
}
Expand All @@ -379,51 +401,10 @@ pub async fn chat_history_handler(
turns: Vec::new(),
has_more: false,
oldest_timestamp: None,
pending_approval: None,
}))
}

/// Build TurnInfo pairs from flat DB messages (alternating user/assistant).
pub fn build_turns_from_db_messages(
messages: &[crate::history::ConversationMessage],
) -> Vec<TurnInfo> {
let mut turns = Vec::new();
let mut turn_number = 0;
let mut iter = messages.iter().peekable();

while let Some(msg) = iter.next() {
if msg.role == "user" {
let mut turn = TurnInfo {
turn_number,
user_input: msg.content.clone(),
response: None,
state: "Completed".to_string(),
started_at: msg.created_at.to_rfc3339(),
completed_at: None,
tool_calls: Vec::new(),
};

// Check if next message is an assistant response
if let Some(next) = iter.peek()
&& next.role == "assistant"
{
let assistant_msg = iter.next().expect("peeked");
turn.response = Some(assistant_msg.content.clone());
turn.completed_at = Some(assistant_msg.created_at.to_rfc3339());
}

// Incomplete turn (user message without response)
if turn.response.is_none() {
turn.state = "Failed".to_string();
}

turns.push(turn);
turn_number += 1;
}
}

turns
}

pub async fn chat_threads_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<ThreadListResponse>, (StatusCode, String)> {
Expand Down Expand Up @@ -454,7 +435,7 @@ pub async fn chat_threads_handler(
let info = ThreadInfo {
id: s.id,
state: "Idle".to_string(),
turn_count: (s.message_count / 2).max(0) as usize,
turn_count: s.message_count.max(0) as usize,
created_at: s.started_at.to_rfc3339(),
updated_at: s.last_activity.to_rfc3339(),
title: s.title.clone(),
Expand Down Expand Up @@ -630,4 +611,105 @@ mod tests {
assert!(turns[1].response.is_none());
assert_eq!(turns[1].state, "Failed");
}

#[test]
fn test_build_turns_with_tool_calls() {
let now = chrono::Utc::now();
let tool_calls_json = serde_json::json!([
{"name": "shell", "result_preview": "file1.txt\nfile2.txt"},
{"name": "http", "error": "timeout"}
]);
let messages = vec![
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "List files".to_string(),
created_at: now,
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "tool_calls".to_string(),
content: tool_calls_json.to_string(),
created_at: now + chrono::TimeDelta::milliseconds(500),
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Here are the files".to_string(),
created_at: now + chrono::TimeDelta::seconds(1),
},
];

let turns = build_turns_from_db_messages(&messages);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].tool_calls.len(), 2);
assert_eq!(turns[0].tool_calls[0].name, "shell");
assert!(turns[0].tool_calls[0].has_result);
assert!(!turns[0].tool_calls[0].has_error);
assert_eq!(
turns[0].tool_calls[0].result_preview.as_deref(),
Some("file1.txt\nfile2.txt")
);
assert_eq!(turns[0].tool_calls[1].name, "http");
assert!(turns[0].tool_calls[1].has_error);
assert_eq!(turns[0].tool_calls[1].error.as_deref(), Some("timeout"));
assert_eq!(turns[0].response.as_deref(), Some("Here are the files"));
assert_eq!(turns[0].state, "Completed");
}

#[test]
fn test_build_turns_with_malformed_tool_calls() {
let now = chrono::Utc::now();
let messages = vec![
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "Hello".to_string(),
created_at: now,
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "tool_calls".to_string(),
content: "not valid json".to_string(),
created_at: now + chrono::TimeDelta::milliseconds(500),
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Done".to_string(),
created_at: now + chrono::TimeDelta::seconds(1),
},
];

let turns = build_turns_from_db_messages(&messages);
assert_eq!(turns.len(), 1);
assert!(turns[0].tool_calls.is_empty());
assert_eq!(turns[0].response.as_deref(), Some("Done"));
}

#[test]
fn test_build_turns_backward_compatible_no_tool_calls() {
// Old threads without tool_calls messages still work
let now = chrono::Utc::now();
let messages = vec![
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "Hello".to_string(),
created_at: now,
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Hi!".to_string(),
created_at: now + chrono::TimeDelta::seconds(1),
},
];

let turns = build_turns_from_db_messages(&messages);
assert_eq!(turns.len(), 1);
assert!(turns[0].tool_calls.is_empty());
assert_eq!(turns[0].response.as_deref(), Some("Hi!"));
assert_eq!(turns[0].state, "Completed");
}
}
1 change: 1 addition & 0 deletions src/channels/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod openai_compat;
pub mod server;
pub mod sse;
pub mod types;
pub(crate) mod util;
pub mod ws;

use std::net::SocketAddr;
Expand Down
Loading
Loading