Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions crates/zeroclaw-channels/src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ struct ChannelRuntimeContext {
query_classification: zeroclaw_config::schema::QueryClassificationConfig,
ack_reactions: bool,
show_tool_calls: bool,
session_store: Option<Arc<zeroclaw_infra::session_store::SessionStore>>,
session_store: Option<Arc<dyn zeroclaw_infra::session_backend::SessionBackend>>,
/// Non-interactive approval manager for channel-driven runs.
/// Enforces `auto_approve` / `always_ask` / supervised policy from
/// `[autonomy]` config; auto-denies tools that would need interactive
Expand Down Expand Up @@ -5769,10 +5769,16 @@ pub async fn start_channels(
ack_reactions: config.channels.ack_reactions,
show_tool_calls: config.channels.show_tool_calls,
session_store: if config.channels.session_persistence {
match zeroclaw_infra::session_store::SessionStore::new(&config.workspace_dir) {
Ok(store) => {
tracing::info!("📂 Session persistence enabled");
Some(Arc::new(store))
match zeroclaw_infra::make_session_backend(
&config.workspace_dir,
&config.channels.session_backend,
) {
Ok(backend) => {
tracing::info!(
"📂 Session persistence enabled (backend: {})",
config.channels.session_backend
);
Some(backend)
}
Err(e) => {
tracing::warn!("Session persistence disabled: {e}");
Expand Down Expand Up @@ -5808,22 +5814,15 @@ pub async fn start_channels(
if let Some(ref store) = runtime_ctx.session_store {
let mut hydrated = 0usize;
let mut orphans_closed = 0usize;
let session_keys = store.list_sessions();

// Sort by file mtime (most recently modified first) for predictable hydration.
// Collect mtimes up front to avoid repeated FS reads inside the comparator.
let mut keyed: Vec<_> = session_keys
.into_iter()
.map(|k| {
let mt = store
.session_mtime(&k)
.unwrap_or(std::time::SystemTime::UNIX_EPOCH);
(k, mt)
})
.collect();
keyed.sort_by_key(|entry| std::cmp::Reverse(entry.1));
keyed.truncate(MAX_CONVERSATION_SENDERS);
let session_keys: Vec<String> = keyed.into_iter().map(|(k, _)| k).collect();
// Sort by last activity (most recent first) for predictable hydration.
// The SessionBackend trait carries last_activity in metadata, so any
// backend (JSONL, SQLite) can answer this question without a side
// call to a backend-specific mtime method.
let mut metadata = store.list_sessions_with_metadata();
metadata.sort_by_key(|m| std::cmp::Reverse(m.last_activity));
metadata.truncate(MAX_CONVERSATION_SENDERS);
let session_keys: Vec<String> = metadata.into_iter().map(|m| m.key).collect();

let mut histories = runtime_ctx
.conversation_histories
Expand Down Expand Up @@ -6603,7 +6602,8 @@ mod tests {
#[test]
fn rollback_orphan_user_turn_also_removes_from_session_store() {
let tmp = tempfile::TempDir::new().unwrap();
let store = Arc::new(zeroclaw_infra::session_store::SessionStore::new(tmp.path()).unwrap());
let store: Arc<dyn zeroclaw_infra::session_backend::SessionBackend> =
Arc::new(zeroclaw_infra::session_store::SessionStore::new(tmp.path()).unwrap());

let sender = "telegram_u4".to_string();

Expand Down
22 changes: 16 additions & 6 deletions crates/zeroclaw-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use zeroclaw_channels::{
use zeroclaw_config::policy::SecurityPolicy;
use zeroclaw_config::schema::Config;
use zeroclaw_infra::session_backend::SessionBackend;
use zeroclaw_infra::session_sqlite::SqliteSessionBackend;
use zeroclaw_memory::{self, Memory, MemoryCategory};
use zeroclaw_providers::{self, Provider};
use zeroclaw_runtime::cost::CostTracker;
Expand Down Expand Up @@ -768,17 +767,28 @@ pub async fn run_gateway(
.map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));

// ── Session persistence for WS chat ─────────────────────
// Routes through `make_session_backend` so `[channels].session_backend`
// is the single source of truth for which backend stores sessions.
// Picking `"jsonl"` would otherwise leave gateway WS sessions writing
// to SQLite while channel + tool reads went to JSONL — the original
// #5769 split, just on a different backend pairing.
let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
match SqliteSessionBackend::new(&config.workspace_dir) {
Ok(b) => {
tracing::info!("Gateway session persistence enabled (SQLite)");
match zeroclaw_infra::make_session_backend(
&config.workspace_dir,
&config.channels.session_backend,
) {
Ok(backend) => {
tracing::info!(
"Gateway session persistence enabled (backend={})",
config.channels.session_backend,
);
if config.gateway.session_ttl_hours > 0
&& let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours)
&& let Ok(cleaned) = backend.cleanup_stale(config.gateway.session_ttl_hours)
&& cleaned > 0
{
tracing::info!("Cleaned up {cleaned} stale gateway sessions");
}
Some(Arc::new(b))
Some(backend)
}
Err(e) => {
tracing::warn!("Session persistence disabled: {e}");
Expand Down
141 changes: 141 additions & 0 deletions crates/zeroclaw-infra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,144 @@ pub mod session_backend;
pub mod session_sqlite;
pub mod session_store;
pub mod stall_watchdog;

use std::path::Path;
use std::sync::Arc;

use crate::session_backend::SessionBackend;

/// Construct the configured session-persistence backend.
///
/// `backend` is the value of `[channels].session_backend` from config:
/// `"sqlite"` (default) opens `{workspace}/sessions/sessions.db`, `"jsonl"`
/// opens `{workspace}/sessions/*.jsonl`. Unknown values fall back to
/// SQLite with a warning so a typo in config never silently disables
/// persistence. The `Arc<dyn SessionBackend>` return type keeps every
/// call site (channel orchestrator, runtime tools) reading from the
/// same store.
///
/// Errors propagate from the underlying backend constructor (typically
/// filesystem permissions on the sessions directory).
pub fn make_session_backend(
workspace_dir: &Path,
backend: &str,
) -> std::io::Result<Arc<dyn SessionBackend>> {
match backend {
"jsonl" => {
let store = session_store::SessionStore::new(workspace_dir)?;
Ok(Arc::new(store))
}
"sqlite" => Ok(Arc::new(open_sqlite_with_jsonl_import(workspace_dir)?)),
other => {
tracing::warn!(
"Unknown session_backend '{other}'; falling back to sqlite. \
Valid values: 'sqlite' (default), 'jsonl'."
);
Ok(Arc::new(open_sqlite_with_jsonl_import(workspace_dir)?))
}
}
}

/// Open the SQLite backend and, on first open, import any pre-existing
/// `sessions/*.jsonl` files left over from the legacy JSONL store. Renames
/// the imported files to `*.jsonl.migrated` so re-runs are no-ops; preserves
/// them on disk so an operator can roll back without data loss. Errors from
/// the import path are logged and skipped — the SQLite backend itself still
/// opens, since blocking startup on a best-effort migration would be worse
/// than a partial migration.
fn open_sqlite_with_jsonl_import(
workspace_dir: &Path,
) -> std::io::Result<session_sqlite::SqliteSessionBackend> {
let backend = session_sqlite::SqliteSessionBackend::new(workspace_dir)
.map_err(|e| std::io::Error::other(e.to_string()))?;
match backend.migrate_from_jsonl(workspace_dir) {
Ok(0) => {}
Ok(n) => tracing::info!(
"session_backend=sqlite: imported {n} legacy JSONL session(s) from \
{}/sessions; renamed to *.jsonl.migrated.",
workspace_dir.display(),
),
Err(e) => tracing::warn!(
"session_backend=sqlite: JSONL import skipped: {e}. Existing JSONL \
sessions remain on disk; switch to session_backend = \"jsonl\" if \
you need them visible immediately."
),
}
Ok(backend)
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use zeroclaw_api::provider::ChatMessage;

fn user_msg(content: &str) -> ChatMessage {
ChatMessage::user(content)
}

#[test]
fn make_session_backend_jsonl_round_trips_through_session_store() {
let tmp = TempDir::new().unwrap();
let backend = make_session_backend(tmp.path(), "jsonl").unwrap();
backend.append("k1", &user_msg("hello-jsonl")).unwrap();
let loaded = backend.load("k1");
assert_eq!(loaded.len(), 1);
// The JSONL backend writes one file per session key.
let jsonl = tmp.path().join("sessions").join("k1.jsonl");
assert!(jsonl.exists(), "jsonl file must be written under sessions/");
}

#[test]
fn make_session_backend_sqlite_round_trips_through_sqlite_db() {
let tmp = TempDir::new().unwrap();
let backend = make_session_backend(tmp.path(), "sqlite").unwrap();
backend.append("k1", &user_msg("hello-sqlite")).unwrap();
let loaded = backend.load("k1");
assert_eq!(loaded.len(), 1);
let db = tmp.path().join("sessions").join("sessions.db");
assert!(db.exists(), "sqlite db must be written under sessions/");
// The JSONL companion file must NOT have been created.
assert!(!tmp.path().join("sessions").join("k1.jsonl").exists());
}

#[test]
fn make_session_backend_unknown_value_falls_back_to_sqlite() {
let tmp = TempDir::new().unwrap();
let backend = make_session_backend(tmp.path(), "totally-not-a-backend").unwrap();
backend.append("k1", &user_msg("hello-fallback")).unwrap();
let db = tmp.path().join("sessions").join("sessions.db");
assert!(
db.exists(),
"unknown value must fall back to sqlite, not error"
);
}

#[test]
fn make_session_backend_sqlite_imports_legacy_jsonl_on_first_open() {
// Seed JSONL session files, then open SQLite — the .jsonl files must
// be migrated and the imported sessions must be visible via the new
// backend. The .jsonl files get renamed to .jsonl.migrated so the
// operator can roll back.
let tmp = TempDir::new().unwrap();
{
let jsonl = make_session_backend(tmp.path(), "jsonl").unwrap();
jsonl.append("legacy", &user_msg("from-jsonl")).unwrap();
}
let sqlite = make_session_backend(tmp.path(), "sqlite").unwrap();
let loaded = sqlite.load("legacy");
assert_eq!(
loaded.len(),
1,
"legacy JSONL session must hydrate via SQLite"
);
// .jsonl renamed to .jsonl.migrated; original gone.
let jsonl_orig = tmp.path().join("sessions").join("legacy.jsonl");
let jsonl_migrated = tmp.path().join("sessions").join("legacy.jsonl.migrated");
assert!(!jsonl_orig.exists(), "original .jsonl should be renamed");
assert!(
jsonl_migrated.exists(),
".jsonl.migrated rollback file should remain"
);
}
}
25 changes: 25 additions & 0 deletions crates/zeroclaw-infra/src/session_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,31 @@ impl SessionBackend for SessionStore {
self.list_sessions()
}

/// Override the trait default so JSONL-backed channel hydration picks
/// the most-recent sessions when truncating to MAX_CONVERSATION_SENDERS.
/// The trait default stamps every key with `Utc::now()`, which makes
/// the orchestrator's `sort_by_key(|m| Reverse(m.last_activity))`
/// arbitrary once more than that many sessions are persisted.
fn list_sessions_with_metadata(&self) -> Vec<crate::session_backend::SessionMetadata> {
use chrono::{DateTime, Utc};
self.list_sessions()
.into_iter()
.map(|key| {
let last_activity: DateTime<Utc> = self
.session_mtime(&key)
.map(DateTime::<Utc>::from)
.unwrap_or_else(Utc::now);
crate::session_backend::SessionMetadata {
name: None,
created_at: last_activity,
last_activity,
message_count: 0,
key,
}
})
.collect()
}

fn compact(&self, session_key: &str) -> std::io::Result<()> {
self.compact(session_key)
}
Expand Down
15 changes: 10 additions & 5 deletions crates/zeroclaw-runtime/src/tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,16 @@ pub fn all_tools_with_runtime(
tool_arcs.push(Arc::new(ScreenshotTool::new(security.clone())));
tool_arcs.push(Arc::new(ImageInfoTool::new(security.clone())));

// Session tools (JSONL path). If a separate SQLite registration path is
// added, these tools need to be registered there too.
if let Ok(session_store) = zeroclaw_infra::session_store::SessionStore::new(workspace_dir) {
let backend: Arc<dyn zeroclaw_infra::session_backend::SessionBackend> =
Arc::new(session_store);
// Session tools share the channel orchestrator's backend via the
// `make_session_backend` factory, keyed off `[channels].session_backend`.
// Previously the tools opened the JSONL `SessionStore` while the
// gateway WS path opened `SqliteSessionBackend`, so any session
// created via /ws/chat was invisible to `sessions_list` /
// `sessions_history`. Routing both call sites through the factory
// closes that gap and honors the operator's configured backend.
if let Ok(backend) =
zeroclaw_infra::make_session_backend(workspace_dir, &config.channels.session_backend)
{
tool_arcs.push(Arc::new(SessionsCurrentTool::new(backend.clone())));
tool_arcs.push(Arc::new(SessionsListTool::new(backend.clone())));
tool_arcs.push(Arc::new(SessionsHistoryTool::new(
Expand Down
Loading