Skip to content

Commit 03b4c13

Browse files
authored
Merge pull request #6545 from singlerider/feat/6272-multi-agent-runtime
feat(runtime): multi-agent runtime
2 parents 0e6749b + 17b2ed8 commit 03b4c13

98 files changed

Lines changed: 8180 additions & 3524 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

crates/zeroclaw-api/src/channel.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,51 @@ pub trait Channel: Send + Sync {
144144
false
145145
}
146146

147+
/// Self-loop guard for multi-agent runs.
148+
///
149+
/// Returns the bot's own handle/identity on this channel
150+
/// (e.g. `@my_bot` for Telegram, the bot's user ID for Discord)
151+
/// when known, so the orchestrator can drop inbound events whose
152+
/// `sender` matches: a bot must never respond to its own
153+
/// messages, even if a misconfigured peer group lists the bot's
154+
/// handle as an external peer.
155+
///
156+
/// **Channels that handle inbound traffic must override this.**
157+
/// The default `None` makes both layers of the orchestrator's
158+
/// self-loop guard (the SDK-side `drop_self_messages` here, and
159+
/// the agent-loop fallback `peers::should_drop_self_loop`) into
160+
/// no-ops — both layers consult the same `self_handle`, so a
161+
/// channel that returns `None` has no protection from looping on
162+
/// its own outbound. Outbound-only channels (webhook, gmail-push,
163+
/// voice-call) never see inbound and can keep the default. The
164+
/// in-tree overrides currently cover Telegram (`bot_username`
165+
/// cache), IRC (configured nickname), Discord (decoded from token),
166+
/// Slack (cached `auth.test` user_id); other inbound channels
167+
/// remain on the default and rely on per-impl filtering instead
168+
/// of the shared guard.
169+
fn self_handle(&self) -> Option<String> {
170+
None
171+
}
172+
173+
/// Whether the orchestrator should drop an inbound message as
174+
/// self-authored (multi-agent self-loop guard).
175+
///
176+
/// Default implementation compares `msg.sender` against
177+
/// [`Self::self_handle`] case-insensitively, after stripping a
178+
/// leading `@` from each side so Telegram-style handles match
179+
/// regardless of which form the SDK delivers. Override only for
180+
/// platforms whose identity comparison is non-string (e.g. a
181+
/// numeric Discord user ID is `as_str` already; this default
182+
/// works there too).
183+
fn drop_self_messages(&self, msg: &ChannelMessage) -> bool {
184+
let Some(handle) = self.self_handle() else {
185+
return false;
186+
};
187+
let handle_norm = handle.trim_start_matches('@').to_ascii_lowercase();
188+
let sender_norm = msg.sender.trim_start_matches('@').to_ascii_lowercase();
189+
!handle_norm.is_empty() && handle_norm == sender_norm
190+
}
191+
147192
/// Whether this channel supports multi-message streaming delivery.
148193
fn supports_multi_message_streaming(&self) -> bool {
149194
false
@@ -285,3 +330,87 @@ pub trait Channel: Send + Sync {
285330
true
286331
}
287332
}
333+
334+
#[cfg(test)]
335+
mod tests {
336+
use super::*;
337+
338+
/// Stub channel that overrides `self_handle` so the default
339+
/// `drop_self_messages` implementation can be exercised.
340+
struct StubChannel {
341+
handle: Option<String>,
342+
}
343+
344+
#[async_trait]
345+
impl Channel for StubChannel {
346+
fn name(&self) -> &str {
347+
"stub"
348+
}
349+
async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
350+
Ok(())
351+
}
352+
async fn listen(
353+
&self,
354+
_tx: tokio::sync::mpsc::Sender<ChannelMessage>,
355+
) -> anyhow::Result<()> {
356+
Ok(())
357+
}
358+
fn self_handle(&self) -> Option<String> {
359+
self.handle.clone()
360+
}
361+
}
362+
363+
fn msg_from(sender: &str) -> ChannelMessage {
364+
ChannelMessage {
365+
id: "1".into(),
366+
sender: sender.into(),
367+
reply_target: String::new(),
368+
content: "hi".into(),
369+
channel: "stub".into(),
370+
timestamp: 0,
371+
thread_ts: None,
372+
interruption_scope_id: None,
373+
attachments: Vec::new(),
374+
}
375+
}
376+
377+
#[test]
378+
fn drop_self_messages_default_returns_false_when_handle_unknown() {
379+
let channel = StubChannel { handle: None };
380+
assert!(!channel.drop_self_messages(&msg_from("@anyone")));
381+
}
382+
383+
#[test]
384+
fn drop_self_messages_matches_exact_handle() {
385+
let channel = StubChannel {
386+
handle: Some("@my_bot".into()),
387+
};
388+
assert!(channel.drop_self_messages(&msg_from("@my_bot")));
389+
assert!(!channel.drop_self_messages(&msg_from("@other_bot")));
390+
}
391+
392+
#[test]
393+
fn drop_self_messages_normalizes_at_prefix_and_case() {
394+
let channel = StubChannel {
395+
handle: Some("My_Bot".into()),
396+
};
397+
// SDK delivered with @ prefix, handle stored without. Match.
398+
assert!(channel.drop_self_messages(&msg_from("@my_bot")));
399+
// Both with @, mixed case. Match.
400+
let channel = StubChannel {
401+
handle: Some("@My_Bot".into()),
402+
};
403+
assert!(channel.drop_self_messages(&msg_from("@MY_BOT")));
404+
}
405+
406+
#[test]
407+
fn drop_self_messages_does_not_match_empty_handle() {
408+
// A handle of "@" (effectively empty after normalization) must
409+
// not match every inbound message; the guard only fires when
410+
// the bot has a real handle to compare against.
411+
let channel = StubChannel {
412+
handle: Some("@".into()),
413+
};
414+
assert!(!channel.drop_self_messages(&msg_from("@anyone")));
415+
}
416+
}

crates/zeroclaw-api/src/memory_traits.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ pub struct MemoryEntry {
4444
/// If this entry was superseded by a newer conflicting entry.
4545
#[serde(default)]
4646
pub superseded_by: Option<String>,
47+
/// The UUID of the agent this row is attributed to, when the
48+
/// backend tracks per-agent attribution. Backends without per-agent
49+
/// columns (Markdown, Qdrant payload-less variants, None) leave
50+
/// this `None`; SQL-backed stores populate it from the
51+
/// `memories.agent_id` column. `AgentScopedMemory` uses this field
52+
/// to enforce the bound + allowlist boundary on read paths the
53+
/// trait does not expose an agent-aware form for (`get`, `list`).
54+
#[serde(default)]
55+
pub agent_id: Option<String>,
4756
}
4857

4958
fn default_namespace() -> String {
@@ -61,6 +70,7 @@ impl std::fmt::Debug for MemoryEntry {
6170
.field("score", &self.score)
6271
.field("namespace", &self.namespace)
6372
.field("importance", &self.importance)
73+
.field("agent_id", &self.agent_id)
6474
.finish_non_exhaustive()
6575
}
6676
}
@@ -289,6 +299,68 @@ pub trait Memory: Send + Sync {
289299
) -> anyhow::Result<()> {
290300
self.store(key, content, category, session_id).await
291301
}
302+
303+
/// Store a memory entry attributed to an explicit agent UUID.
304+
/// Every backend must implement this explicitly so the agent_id
305+
/// is never silently dropped at storage time. Backends with
306+
/// native agent_id columns (SqliteMemory, PostgresMemory,
307+
/// LucidMemory) persist the attribution in SQL; MarkdownMemory
308+
/// attributes via the per-agent directory path; QdrantMemory
309+
/// persists in the vector payload; NoneMemory is a no-op stub.
310+
/// `AgentScopedMemory` is the canonical caller.
311+
async fn store_with_agent(
312+
&self,
313+
key: &str,
314+
content: &str,
315+
category: MemoryCategory,
316+
session_id: Option<&str>,
317+
namespace: Option<&str>,
318+
importance: Option<f64>,
319+
agent_id: Option<&str>,
320+
) -> anyhow::Result<()>;
321+
322+
/// Recall memory entries scoped to a specific set of agent UUIDs.
323+
/// When `allowed_agent_ids` is non-empty, the backend filters its
324+
/// result set to rows whose `agent_id` matches one of the listed
325+
/// UUIDs (or is NULL, for legacy rows written before the agent_id
326+
/// column existed). Every backend must implement this explicitly
327+
/// so the allowlist is never silently dropped at read time.
328+
///
329+
/// For SQL-backed stores the filter is `WHERE agent_id IN (...)`.
330+
/// For Markdown the implementation walks the allowed agents'
331+
/// per-agent directories. For Qdrant it's a payload filter on
332+
/// the `agent_id` field. For None it returns an empty list.
333+
/// `AgentScopedMemory` is the canonical caller; direct invocation
334+
/// is also valid for read-only cross-agent queries that bypass
335+
/// the wrapper.
336+
///
337+
/// Cross-backend allowlist entries are rejected at config load
338+
/// (`agents.<alias>.workspace.read_memory_from` cannot point at a
339+
/// sibling on a different memory backend); backends therefore
340+
/// never need to handle a cross-backend recall.
341+
async fn recall_for_agents(
342+
&self,
343+
allowed_agent_ids: &[&str],
344+
query: &str,
345+
limit: usize,
346+
session_id: Option<&str>,
347+
since: Option<&str>,
348+
until: Option<&str>,
349+
) -> anyhow::Result<Vec<MemoryEntry>>;
350+
351+
/// Look up (or create) the identifier the backend uses to refer
352+
/// to the agent named by `alias`.
353+
///
354+
/// Backends with an `agents` table (SqliteMemory, PostgresMemory,
355+
/// LucidMemory) return the row's UUID, inserting if absent.
356+
/// Backends without (MarkdownMemory, QdrantMemory, NoneMemory)
357+
/// return the alias verbatim — there is no UUID indirection at
358+
/// the storage layer, so the alias serves as the agent_id.
359+
/// Default impl returns the alias unchanged; SQL backends
360+
/// override to do the real lookup.
361+
async fn ensure_agent_uuid(&self, alias: &str) -> anyhow::Result<String> {
362+
Ok(alias.to_string())
363+
}
292364
}
293365

294366
#[cfg(test)]
@@ -339,6 +411,7 @@ mod tests {
339411
namespace: "default".into(),
340412
importance: Some(0.7),
341413
superseded_by: None,
414+
agent_id: None,
342415
};
343416

344417
let json = serde_json::to_string(&entry).unwrap();

crates/zeroclaw-api/src/observability_traits.rs

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,6 @@ pub enum ObserverEvent {
8383
/// Human-readable error description. Must not contain secrets or tokens.
8484
message: String,
8585
},
86-
/// A hand has started execution.
87-
HandStarted { hand_name: String },
88-
/// A hand has completed execution successfully.
89-
HandCompleted {
90-
hand_name: String,
91-
duration_ms: u64,
92-
findings_count: usize,
93-
},
94-
/// A hand has failed during execution.
95-
HandFailed {
96-
hand_name: String,
97-
error: String,
98-
duration_ms: u64,
99-
},
10086
/// A deployment has started.
10187
DeploymentStarted {
10288
/// Identifier for the deployment (e.g., commit SHA or release tag).
@@ -132,15 +118,6 @@ pub enum ObserverMetric {
132118
ActiveSessions(u64),
133119
/// Current depth of the inbound message queue.
134120
QueueDepth(u64),
135-
/// Duration of a single hand run.
136-
HandRunDuration {
137-
hand_name: String,
138-
duration: Duration,
139-
},
140-
/// Number of findings produced by a hand run.
141-
HandFindingsCount { hand_name: String, count: u64 },
142-
/// Records a hand run outcome for success-rate tracking.
143-
HandSuccessRate { hand_name: String, success: bool },
144121
/// Time elapsed from commit to deployment (lead time for changes).
145122
DeploymentLeadTime(Duration),
146123
/// Time elapsed to recover from a failed deployment.
@@ -261,67 +238,4 @@ mod tests {
261238
assert!(matches!(cloned_event, ObserverEvent::ToolCall { .. }));
262239
assert!(matches!(cloned_metric, ObserverMetric::RequestLatency(_)));
263240
}
264-
265-
#[test]
266-
fn hand_events_recordable() {
267-
let observer = DummyObserver::default();
268-
269-
observer.record_event(&ObserverEvent::HandStarted {
270-
hand_name: "review".into(),
271-
});
272-
observer.record_event(&ObserverEvent::HandCompleted {
273-
hand_name: "review".into(),
274-
duration_ms: 1500,
275-
findings_count: 3,
276-
});
277-
observer.record_event(&ObserverEvent::HandFailed {
278-
hand_name: "review".into(),
279-
error: "timeout".into(),
280-
duration_ms: 5000,
281-
});
282-
283-
assert_eq!(*observer.events.lock(), 3);
284-
}
285-
286-
#[test]
287-
fn hand_metrics_recordable() {
288-
let observer = DummyObserver::default();
289-
290-
observer.record_metric(&ObserverMetric::HandRunDuration {
291-
hand_name: "review".into(),
292-
duration: Duration::from_millis(1500),
293-
});
294-
observer.record_metric(&ObserverMetric::HandFindingsCount {
295-
hand_name: "review".into(),
296-
count: 3,
297-
});
298-
observer.record_metric(&ObserverMetric::HandSuccessRate {
299-
hand_name: "review".into(),
300-
success: true,
301-
});
302-
303-
assert_eq!(*observer.metrics.lock(), 3);
304-
}
305-
306-
#[test]
307-
fn hand_event_and_metric_are_cloneable() {
308-
let event = ObserverEvent::HandCompleted {
309-
hand_name: "review".into(),
310-
duration_ms: 500,
311-
findings_count: 2,
312-
};
313-
let metric = ObserverMetric::HandRunDuration {
314-
hand_name: "review".into(),
315-
duration: Duration::from_millis(500),
316-
};
317-
318-
let cloned_event = event.clone();
319-
let cloned_metric = metric.clone();
320-
321-
assert!(matches!(cloned_event, ObserverEvent::HandCompleted { .. }));
322-
assert!(matches!(
323-
cloned_metric,
324-
ObserverMetric::HandRunDuration { .. }
325-
));
326-
}
327241
}

crates/zeroclaw-channels/src/discord.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,17 @@ impl Channel for DiscordChannel {
901901
"discord"
902902
}
903903

904+
/// Discord bot tokens encode the bot's user ID in the first
905+
/// segment (`base64(user_id).timestamp.hmac`); decode on demand
906+
/// rather than caching since the result is deterministic and the
907+
/// orchestrator only calls `self_handle` on the inbound path.
908+
/// Returning the user ID engages the SDK self-loop guard against
909+
/// gateway events the bot itself produced (typing indicators,
910+
/// echoed message events from intent overlap, etc.).
911+
fn self_handle(&self) -> Option<String> {
912+
Self::bot_user_id_from_token(&self.bot_token)
913+
}
914+
904915
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
905916
let raw_content = crate::util::strip_tool_call_tags(&message.content);
906917
let (cleaned_content, parsed_attachments) = parse_attachment_markers(&raw_content);

crates/zeroclaw-channels/src/irc.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,18 @@ impl Channel for IrcChannel {
354354
"irc"
355355
}
356356

357+
/// IRC echoes the bot's own PRIVMSGs back through the same socket
358+
/// for any channel the bot is JOINed to. Returning the configured
359+
/// nickname here engages the SDK self-loop guard so those echoes
360+
/// drop before reaching the agent loop. The nickname is set at
361+
/// construction (`config.nickname`) and used as the preferred nick
362+
/// during NICK negotiation; if the server forces a different nick
363+
/// (collision fallback in `listen`), the agent-loop fallback
364+
/// catches the gap.
365+
fn self_handle(&self) -> Option<String> {
366+
Some(self.nickname.clone())
367+
}
368+
357369
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
358370
let mut guard = self.writer.lock().await;
359371
let writer = guard

0 commit comments

Comments
 (0)