Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 149 additions & 23 deletions crates/gateway/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,36 @@ fn log_startup_config_storage_diagnostics() {
}
}

async fn maybe_deliver_cron_output(
outbound: Option<Arc<dyn moltis_channels::ChannelOutbound>>,
req: &moltis_cron::service::AgentTurnRequest,
delivery_text: &str,
) {
if !req.deliver || delivery_text.trim().is_empty() {
return;
}

let (Some(channel_account), Some(chat_id)) = (&req.channel, &req.to) else {
return;
};

if let Some(outbound) = outbound {
if let Err(error) = outbound
.send_text(channel_account, chat_id, delivery_text, None)
.await
{
tracing::warn!(
channel = %channel_account,
to = %chat_id,
error = %error,
"cron job channel delivery failed"
);
}
} else {
tracing::debug!("cron job delivery requested but no channel outbound configured");
}
}

/// A fully wired gateway (app router + shared state), ready to be served.
///
/// Created by [`prepare_gateway`]. Callers bind their own TCP listener and
Expand Down Expand Up @@ -2260,29 +2290,8 @@ pub async fn prepare_gateway(
text.clone()
};

// Deliver output to a channel if requested.
if req.deliver
&& !delivery_text.trim().is_empty()
&& let (Some(channel_account), Some(chat_id)) = (&req.channel, &req.to)
{
if let Some(outbound) = state.services.channel_outbound_arc() {
if let Err(e) = outbound
.send_text(channel_account, chat_id, &delivery_text, None)
.await
{
tracing::warn!(
channel = %channel_account,
to = %chat_id,
error = %e,
"cron job channel delivery failed"
);
}
} else {
tracing::debug!(
"cron job delivery requested but no channel outbound configured"
);
}
}
maybe_deliver_cron_output(state.services.channel_outbound_arc(), &req, &delivery_text)
.await;

Ok(moltis_cron::service::AgentTurnResult {
output: text,
Expand Down Expand Up @@ -6312,9 +6321,126 @@ pub(crate) async fn discover_and_build_hooks(
mod tests {
use {
super::*,
async_trait::async_trait,
moltis_common::types::ReplyPayload,
std::collections::{HashMap, HashSet},
tokio::sync::Mutex,
};

#[derive(Debug, Clone, PartialEq, Eq)]
struct DeliveredMessage {
account_id: String,
to: String,
text: String,
reply_to: Option<String>,
}

#[derive(Default)]
struct RecordingChannelOutbound {
delivered: Mutex<Vec<DeliveredMessage>>,
}

#[async_trait]
impl moltis_channels::ChannelOutbound for RecordingChannelOutbound {
async fn send_text(
&self,
account_id: &str,
to: &str,
text: &str,
reply_to: Option<&str>,
) -> moltis_channels::Result<()> {
self.delivered.lock().await.push(DeliveredMessage {
account_id: account_id.to_string(),
to: to.to_string(),
text: text.to_string(),
reply_to: reply_to.map(ToString::to_string),
});
Ok(())
}

async fn send_media(
&self,
_account_id: &str,
_to: &str,
_payload: &ReplyPayload,
_reply_to: Option<&str>,
) -> moltis_channels::Result<()> {
Ok(())
}
}

fn cron_delivery_request() -> moltis_cron::service::AgentTurnRequest {
moltis_cron::service::AgentTurnRequest {
message: "Run background summary".to_string(),
model: None,
timeout_secs: None,
deliver: true,
channel: Some("bot-main".to_string()),
to: Some("123456".to_string()),
session_target: moltis_cron::types::SessionTarget::Isolated,
sandbox: moltis_cron::types::CronSandboxConfig::default(),
}
}

#[tokio::test]
async fn maybe_deliver_cron_output_sends_to_configured_channel() {
let outbound = Arc::new(RecordingChannelOutbound::default());
let req = cron_delivery_request();

maybe_deliver_cron_output(
Some(outbound.clone() as Arc<dyn moltis_channels::ChannelOutbound>),
&req,
"Daily digest ready",
)
.await;

let delivered = outbound.delivered.lock().await.clone();
assert_eq!(delivered, vec![DeliveredMessage {
account_id: "bot-main".to_string(),
to: "123456".to_string(),
text: "Daily digest ready".to_string(),
reply_to: None,
}]);
}

#[tokio::test]
async fn maybe_deliver_cron_output_skips_blank_messages() {
let outbound = Arc::new(RecordingChannelOutbound::default());
let req = cron_delivery_request();

maybe_deliver_cron_output(
Some(outbound.clone() as Arc<dyn moltis_channels::ChannelOutbound>),
&req,
" ",
)
.await;

assert!(outbound.delivered.lock().await.is_empty());
}
Comment thread
penso marked this conversation as resolved.

#[tokio::test]
async fn maybe_deliver_cron_output_skips_when_deliver_is_false() {
let outbound = Arc::new(RecordingChannelOutbound::default());
let mut req = cron_delivery_request();
req.deliver = false;

maybe_deliver_cron_output(
Some(outbound.clone() as Arc<dyn moltis_channels::ChannelOutbound>),
&req,
"should not be sent",
)
.await;

assert!(outbound.delivered.lock().await.is_empty());
}

#[tokio::test]
async fn maybe_deliver_cron_output_skips_when_no_outbound_configured() {
let req = cron_delivery_request();

maybe_deliver_cron_output(None, &req, "Daily digest ready").await;
}

#[test]
fn summarize_model_ids_for_logs_returns_all_when_within_limit() {
let model_ids = vec!["a", "b", "c"]
Expand Down
67 changes: 67 additions & 0 deletions crates/tools/src/cron_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,34 @@ mod tests {
assert_eq!(add_result["sandbox"]["image"], "ubuntu:25.10");
}

#[tokio::test]
async fn test_add_accepts_delivery_fields_for_agent_turn() {
let tool = make_tool();
let add_result = tool
.execute(json!({
"action": "add",
"job": {
"name": "delivered run",
"schedule": { "kind": "every", "every_ms": 60000 },
"payload": {
"kind": "agentTurn",
"message": "post an update",
"deliver": true,
"channel": "bot-main",
"to": "123456"
},
"sessionTarget": "isolated"
}
}))
.await
.unwrap();

assert_eq!(add_result["payload"]["kind"], "agentTurn");
assert_eq!(add_result["payload"]["deliver"], true);
assert_eq!(add_result["payload"]["channel"], "bot-main");
assert_eq!(add_result["payload"]["to"], "123456");
}

#[tokio::test]
async fn test_update_accepts_host_execution_string() {
let tool = make_tool();
Expand Down Expand Up @@ -1094,6 +1122,45 @@ mod tests {
assert!(updated["sandbox"]["image"].is_null());
}

#[tokio::test]
async fn test_update_accepts_delivery_fields_in_patch() {
let tool = make_tool();
let add_result = tool
.execute(json!({
"action": "add",
"job": {
"name": "toggle delivery",
"schedule": { "kind": "every", "every_ms": 60000 },
"payload": { "kind": "agentTurn", "message": "run task" },
"sessionTarget": "isolated"
}
}))
.await
.unwrap();
let id = add_result["id"].as_str().unwrap();

let updated = tool
.execute(json!({
"action": "update",
"id": id,
"patch": {
"payload": {
"kind": "agentTurn",
"message": "run task",
"deliver": true,
"channel": "bot-main",
"to": "123456"
}
}
}))
.await
.unwrap();

assert_eq!(updated["payload"]["deliver"], true);
assert_eq!(updated["payload"]["channel"], "bot-main");
assert_eq!(updated["payload"]["to"], "123456");
}

#[test]
fn test_parameters_schema_has_no_one_of() {
fn contains_one_of(value: &Value) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion crates/web/ui/e2e/specs/websocket.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ test.describe("WebSocket connection lifecycle", () => {
await navigateAndWait(page, "/chats/main");
await waitForWsConnected(page);
await waitForChatSessionReady(page);
await expectRpcOk(page, "chat.clear", {});
await clearChatAndWait(page);
await mockRpcErrorResponse(page, "sessions.voice.generate", "Voice generation failed for test.");

await expectRpcOk(page, "system-event", {
Expand All @@ -407,6 +407,7 @@ test.describe("WebSocket connection lifecycle", () => {
});

var assistant = page.locator("#messages .msg.assistant").last();
await expect(assistant).toContainText("try generating voice now");
await expect(assistant.locator(".msg-voice-action")).toHaveText("Voice it");
await assistant.locator(".msg-voice-action").click();
await expect(assistant.locator(".msg-voice-action")).toHaveText("Retry voice");
Expand Down
22 changes: 22 additions & 0 deletions docs/src/channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ For detailed configuration, see the per-channel pages:

You can also use the web UI's **Channels** tab for guided setup with each platform.

## Proactive Outbound Messaging

Agents are not limited to replying in the current chat. Moltis supports three
main outbound patterns:

- **`send_message` tool** for direct proactive messages to any configured channel account/chat
- **Cron job delivery** for background jobs that should post their final output to a channel
- **Heartbeat delivery** for periodic heartbeat acknowledgements sent to a chosen chat

Example `send_message` tool call:

```json
{
"account_id": "my-telegram-bot",
"to": "123456789",
"text": "Deployment finished successfully."
}
```

`account_id` is the configured channel account name from `moltis.toml`, and
`to` is the destination chat, peer, or room identifier for that platform.

## Access Control

All channels share the same access control model with three settings:
Expand Down
Loading
Loading