Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions crates/zeroclaw-providers/src/compatible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,32 +313,41 @@ impl OpenAiCompatibleProvider {
self
}

/// Collect all `system` role messages, concatenate their content,
/// and prepend to the first `user` message. Drop all system messages.
/// Used for providers (e.g. MiniMax) that reject `role: system`.
/// Collect all `system` role messages and keep them in a provider-safe
/// shape. Strict OpenAI-compatible endpoints accept a leading system
/// message but reject system messages later in the history.
fn flatten_system_messages(messages: &[ChatMessage], merge: bool) -> Vec<ChatMessage> {
if !merge {
return messages.to_vec();
let mut saw_system = false;
let mut system_content = String::new();
let mut result: Vec<ChatMessage> = Vec::with_capacity(messages.len());

for message in messages {
if message.role == "system" {
saw_system = true;
if !message.content.is_empty() {
if !system_content.is_empty() {
system_content.push_str("\n\n");
}
system_content.push_str(&message.content);
}
} else {
result.push(message.clone());
}
}
let system_content: String = messages
.iter()
.filter(|m| m.role == "system")
.map(|m| m.content.as_str())
.collect::<Vec<_>>()
.join("\n\n");

if system_content.is_empty() {
if !saw_system {
return messages.to_vec();
}

let mut result: Vec<ChatMessage> = messages
.iter()
.filter(|m| m.role != "system")
.cloned()
.collect();
if !merge {
result.insert(0, ChatMessage::system(system_content));
return result;
}

if let Some(first_user) = result.iter_mut().find(|m| m.role == "user") {
first_user.content = format!("{system_content}\n\n{}", first_user.content);
if !system_content.is_empty() {
first_user.content = format!("{system_content}\n\n{}", first_user.content);
}
} else {
// No user message found: insert a synthetic user message with system content
result.insert(0, ChatMessage::user(&system_content));
Expand Down Expand Up @@ -3853,6 +3862,35 @@ mod tests {
assert!(!flattened.iter().any(|m| m.role == "system"));
}

#[test]
fn flatten_system_messages_keeps_system_only_at_start_without_user_merge() {
let messages = vec![
ChatMessage::system("System A"),
ChatMessage::user("User turn"),
ChatMessage::assistant("Assistant turn"),
ChatMessage::system("System B"),
ChatMessage::user("Follow-up"),
];

let flattened = OpenAiCompatibleProvider::flatten_system_messages(&messages, false);
assert_eq!(
flattened
.iter()
.map(|message| message.role.as_str())
.collect::<Vec<_>>(),
vec!["system", "user", "assistant", "user"]
);
assert_eq!(
flattened
.iter()
.filter(|message| message.role == "system")
.count(),
1
);
assert!(flattened[0].content.contains("System A"));
assert!(flattened[0].content.contains("System B"));
}

#[test]
fn flatten_system_messages_inserts_synthetic_user_when_no_user_exists() {
let messages = vec![
Expand Down
40 changes: 40 additions & 0 deletions crates/zeroclaw-runtime/src/agent/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,44 @@ pub fn estimate_history_tokens(history: &[ChatMessage]) -> usize {
.sum()
}

pub fn normalize_system_messages(history: &mut Vec<ChatMessage>) {
let mut saw_system = false;
let mut system_content = String::new();
let mut non_system = Vec::with_capacity(history.len());

for message in history.drain(..) {
if message.role == "system" {
saw_system = true;
if !message.content.is_empty() {
if !system_content.is_empty() {
system_content.push_str("\n\n");
}
system_content.push_str(&message.content);
}
} else {
non_system.push(message);
}
}

if saw_system {
history.push(ChatMessage::system(system_content));
}
history.extend(non_system);
}

pub fn append_or_merge_system_message(history: &mut Vec<ChatMessage>, content: impl Into<String>) {
let content = content.into();
if let Some(system_message) = history.iter_mut().find(|message| message.role == "system") {
if !system_message.content.is_empty() && !content.is_empty() {
system_message.content.push_str("\n\n");
}
system_message.content.push_str(&content);
} else {
history.insert(0, ChatMessage::system(content));
}
normalize_system_messages(history);
}

/// Trim conversation history to prevent unbounded growth.
/// Preserves the system prompt (first message if role=system) and the most recent messages.
pub fn trim_history(history: &mut Vec<ChatMessage>, max_history: usize) {
Expand All @@ -162,6 +200,7 @@ pub fn trim_history(history: &mut Vec<ChatMessage>, max_history: usize) {
let to_remove = non_system_count - max_history;
history.drain(start..start + to_remove);
remove_orphaned_tool_messages(history);
normalize_system_messages(history);
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -194,6 +233,7 @@ pub fn load_interactive_session_history(
} else if state.history.first().map(|msg| msg.role.as_str()) != Some("system") {
state.history.insert(0, ChatMessage::system(system_prompt));
}
normalize_system_messages(&mut state.history);

// Self-heal persisted sessions that were written with orphaned
// tool_result messages (e.g. a crash mid-compaction, or a trim that
Expand Down
167 changes: 159 additions & 8 deletions crates/zeroclaw-runtime/src/agent/loop_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ const DEFAULT_MAX_TOOL_ITERATIONS: usize = 10;

// History management moved to `super::history`.
pub use super::history::{
emergency_history_trim, estimate_history_tokens, fast_trim_tool_results,
load_interactive_session_history, save_interactive_session_history, trim_history,
truncate_tool_result,
append_or_merge_system_message, emergency_history_trim, estimate_history_tokens,
fast_trim_tool_results, load_interactive_session_history, normalize_system_messages,
save_interactive_session_history, trim_history, truncate_tool_result,
};

/// Minimum user-message length (in chars) for auto-save to memory.
Expand Down Expand Up @@ -971,6 +971,7 @@ pub async fn run_tool_call_loop(
// or session history reloading. Without this, providers like MiniMax
// reject the request with "tool result's tool id not found" (bug #5743).
crate::agent::history_pruner::remove_orphaned_tool_messages(history);
normalize_system_messages(history);

// Check if model switch was requested via model_switch tool
if let Some(ref callback) = model_switch_callback
Expand Down Expand Up @@ -1906,16 +1907,16 @@ pub async fn run_tool_call_loop(
crate::agent::loop_detector::LoopDetectionResult::Ok => {}
crate::agent::loop_detector::LoopDetectionResult::Warning(ref msg) => {
tracing::warn!(tool = %tool_name, %msg, "loop detector warning");
// Inject a system nudge so the LLM adjusts strategy.
history.push(ChatMessage::system(format!("[Loop Detection] {msg}")));
append_or_merge_system_message(history, format!("[Loop Detection] {msg}"));
}
crate::agent::loop_detector::LoopDetectionResult::Block(ref msg) => {
tracing::warn!(tool = %tool_name, %msg, "loop detector blocked tool call");
// Replace the tool output with the block message.
// We still continue the loop so the LLM sees the block feedback.
history.push(ChatMessage::system(format!(
"[Loop Detection — BLOCKED] {msg}"
)));
append_or_merge_system_message(
history,
format!("[Loop Detection — BLOCKED] {msg}"),
);
}
crate::agent::loop_detector::LoopDetectionResult::Break(msg) => {
runtime_trace::record_event(
Expand Down Expand Up @@ -3898,6 +3899,49 @@ mod tests {
assert_eq!(restored[1].content, "orphan");
}

#[test]
fn load_interactive_session_merges_non_leading_system_messages() {
let dir = tempdir().unwrap();
let path = dir.path().join("session.json");
let payload = serde_json::to_string_pretty(&InteractiveSessionState {
version: 1,
history: vec![
ChatMessage::system("base system"),
ChatMessage::user("first question"),
ChatMessage::assistant("first answer"),
ChatMessage::system("late loop-detection guidance"),
ChatMessage::user("follow-up"),
],
})
.unwrap();
std::fs::write(&path, payload).unwrap();

let restored = load_interactive_session_history(&path, "fallback").unwrap();

assert_eq!(
restored
.iter()
.filter(|message| message.role == "system")
.count(),
1,
"loaded session must not contain non-leading system messages: {:?}",
restored
.iter()
.map(|message| message.role.as_str())
.collect::<Vec<_>>()
);
assert_eq!(restored[0].role, "system");
assert!(restored[0].content.contains("base system"));
assert!(restored[0].content.contains("late loop-detection guidance"));
assert_eq!(
restored
.iter()
.map(|message| message.role.as_str())
.collect::<Vec<_>>(),
vec!["system", "user", "assistant", "user"]
);
}

/// Regression test for issue #5813: a persisted session whose assistant
/// (tool_use) was lost to compaction must self-heal on load so the next
/// API call doesn't fail with "unexpected tool_use_id found in tool_result
Expand Down Expand Up @@ -4172,6 +4216,49 @@ mod tests {
}
}

struct RecordingProvider {
requests: Arc<Mutex<Vec<Vec<ChatMessage>>>>,
}

impl RecordingProvider {
fn new() -> Self {
Self {
requests: Arc::new(Mutex::new(Vec::new())),
}
}
}

#[async_trait]
impl Provider for RecordingProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: Option<f64>,
) -> anyhow::Result<String> {
anyhow::bail!("chat_with_system should not be used in recording provider tests");
}

async fn chat(
&self,
request: ChatRequest<'_>,
_model: &str,
_temperature: Option<f64>,
) -> anyhow::Result<ChatResponse> {
self.requests
.lock()
.expect("requests lock should be valid")
.push(request.messages.to_vec());
Ok(ChatResponse {
text: Some("done".to_string()),
tool_calls: Vec::new(),
usage: None,
reasoning_content: None,
})
}
}

struct StreamingScriptedProvider {
responses: Arc<Mutex<VecDeque<String>>>,
stream_calls: Arc<AtomicUsize>,
Expand Down Expand Up @@ -7722,6 +7809,70 @@ Let me check the result."#;
assert!(summary.session_cost_usd > 0.0);
}

#[tokio::test]
async fn tool_loop_normalizes_non_leading_system_messages_before_provider_request() {
let provider = RecordingProvider::new();
let requests = Arc::clone(&provider.requests);
let observer = NoopObserver;
let mut history = vec![
ChatMessage::system("base system"),
ChatMessage::user("first question"),
ChatMessage::assistant("first answer"),
ChatMessage::system("late loop-detection guidance"),
ChatMessage::user("follow-up"),
];

let result = run_tool_call_loop(
&provider,
&mut history,
&[],
&observer,
"recording-provider",
"mock-model",
0.0,
true,
None,
"test",
None,
&zeroclaw_config::schema::MultimodalConfig::default(),
2,
None,
None,
None,
&[],
&[],
None,
None,
&zeroclaw_config::schema::PacingConfig::default(),
0,
0,
None,
None,
None,
None,
)
.await
.expect("tool loop should complete");

assert_eq!(result, "done");
let requests = requests.lock().expect("requests lock should be valid");
assert_eq!(requests.len(), 1);
let sent = &requests[0];
assert_eq!(sent[0].role, "system");
assert_eq!(
sent.iter().filter(|msg| msg.role == "system").count(),
1,
"provider request must not contain non-leading system messages: {:?}",
sent.iter().map(|msg| msg.role.as_str()).collect::<Vec<_>>()
);
assert!(sent[0].content.contains("base system"));
assert!(sent[0].content.contains("late loop-detection guidance"));
assert_eq!(
sent.iter().map(|msg| msg.role.as_str()).collect::<Vec<_>>(),
vec!["system", "user", "assistant", "user"]
);
}

#[tokio::test]
async fn cost_tracking_enforces_budget() {
use super::{
Expand Down
Loading