Skip to content

Commit dc90553

Browse files
standardtoasterclaudeilblackdragon
authored
feat: multi-tenant auth with per-user workspace isolation (nearai#1118)
* feat: multi-tenant auth with per-user scoping Multi-user authentication and authorization for IronClaw gateway: - Token-based auth mapping tokens to user IDs via GATEWAY_USER_TOKENS - Per-user SSE broadcast scoping - Per-user rate limiting with poisoned lock recovery - Handler auth and ownership checks for jobs, settings, routines - Extension secrets scoped per-user - Chat handlers use authenticated identity - Reverse proxy deployment documentation - Comprehensive integration tests for auth, SSE, rate limiting, and job isolation * fix: scope memory tools per-user in multi-tenant mode Memory tools (search, write, read, tree) held a single workspace created at startup with GATEWAY_USER_ID. In multi-tenant mode, all users' tool calls searched the default user's scope. Add WorkspaceResolver trait that resolves workspaces per-request using JobContext.user_id. In single-user mode, returns the startup workspace. In multi-tenant mode (GATEWAY_USER_TOKENS configured), creates and caches per-user workspaces on demand. Includes regression tests for workspace resolution and user isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: comprehensive multi-tenant isolation audit Address all review findings from @serrrfirat plus 7 additional gaps found via full security audit: Reviewer findings (5): - WorkspacePool now applies search config, memory layers, embedding cache, identity read scopes, and global config scopes (was bare) - jobs_summary_handler uses per-user queries instead of global counters - jobs_prompt_handler restructured to not 404 agent jobs + ownership check - jobs_restart_handler agent branch now verifies user ownership - agent_job_summary_for_user added to Database trait + both backends Audit findings (7): - Delete dead handlers/memory.rs (stale copies with no auth) - Add AuthenticatedUser to logs_events, logs_level_get, logs_level_set - Add AuthenticatedUser to extensions_tools_handler, gateway_status_handler - Add auth + ownership checks to all 6 routines handlers - Add auth to all 4 skills handlers with audit logging on mutations - Scope extension setup SSE broadcast to user (broadcast_for_user) - Fix pre-existing test compilation errors in extensions/manager.rs 17 new multi-tenant isolation tests covering: - WorkspacePool config propagation and scope merging - Jobs handler per-user isolation (summary, restart, prompt, cancel) - Routines handler auth enforcement and cross-user rejection - Auth middleware enforcement on logs, skills, status endpoints Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: second-pass multi-tenant audit — scope SSE broadcasts, DB queries, dead handlers Second audit pass applying learned patterns across the codebase: - OAuth callback SSE broadcasts now use broadcast_for_user (lines 773, 912) - jobs_list_handler uses list_agent_jobs_for_user instead of fetching all users' jobs and filtering in Rust - list_agent_jobs_for_user added to Database trait + postgres + libsql - Dead handler files (extensions.rs, static_files.rs) hardened with AuthenticatedUser to prevent auth regression if migrated Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address review findings — token hashing, broadcast scoping, error handling Security fixes: - Hash tokens with SHA-256 at construction time so authentication compares fixed-size 32-byte digests, eliminating length-oracle timing leaks - Scope auth SSE broadcasts per-user in chat_auth_token_handler — AuthRequired/AuthCompleted events were leaking across tenants - Propagate DB errors in restart handlers instead of silently swallowing via `if let Ok(Some(...))` pattern Code quality: - Log SSE serialization failures instead of silently producing empty strings via unwrap_or_default() - Remove dead `pub type AuthState = MultiAuthState` alias - Replace `.unwrap()` with `Arc::clone(db)` in app.rs multi-tenant workspace setup (db is guaranteed Some in context, but unwrap violates project convention) - Fix telegram setup test to inject UserIdentity into request extensions (handler now requires AuthenticatedUser) - Add safety comments on test-only expect/unwrap calls for CI - Apply cargo fmt to fix pre-existing formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address review findings — unify workspace pool, fix SSE regression, cache job owners - Unify WorkspacePool and PerUserWorkspaceResolver: WorkspacePool now implements WorkspaceResolver, eliminating duplicate per-user workspace construction logic. app.rs uses WorkspacePool directly. - Fix sse_tx: None scheduler regression: change scheduler/worker SSE broadcasting from broadcast::Sender<SseEvent> to Arc<SseManager>, restoring SSE event delivery for scheduled agent jobs. - Cache job owner in orchestrator: add job_owner_cache to OrchestratorState so job_event_handler avoids a DB round-trip on every event after the first per job. - Deduplicate ext_user_id computation in main.rs. - Remove unused _gateway_state variable. - Fix pre-existing test: first_token() returns None in multi-user mode by design; align test assertion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix formatting in app.rs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: extract memory handlers back into handlers/memory.rs Move memory API handlers out of server.rs into their own module, consistent with how jobs, routines, and skills handlers are organized. The resolve_workspace() helper moves with them since it is only used by memory handlers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: ilblackdragon@gmail.com <ilblackdragon@gmail.com>
1 parent 070e8c7 commit dc90553

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5072
-1202
lines changed

src/agent/agent_loop.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ pub struct AgentDeps {
157157
pub hooks: Arc<HookRegistry>,
158158
/// Cost enforcement guardrails (daily budget, hourly rate limits).
159159
pub cost_guard: Arc<crate::agent::cost_guard::CostGuard>,
160-
/// SSE broadcast sender for live job event streaming to the web gateway.
161-
pub sse_tx: Option<tokio::sync::broadcast::Sender<crate::channels::web::types::SseEvent>>,
160+
/// SSE manager for live job event streaming to the web gateway.
161+
pub sse_tx: Option<Arc<crate::channels::web::sse::SseManager>>,
162162
/// HTTP interceptor for trace recording/replay.
163163
pub http_interceptor: Option<Arc<dyn crate::llm::recording::HttpInterceptor>>,
164164
/// Audio transcription middleware for voice messages.
@@ -235,8 +235,8 @@ impl Agent {
235235
hooks: deps.hooks.clone(),
236236
},
237237
);
238-
if let Some(ref tx) = deps.sse_tx {
239-
scheduler.set_sse_sender(tx.clone());
238+
if let Some(ref sse) = deps.sse_tx {
239+
scheduler.set_sse_sender(Arc::clone(sse));
240240
}
241241
if let Some(ref interceptor) = deps.http_interceptor {
242242
scheduler.set_http_interceptor(Arc::clone(interceptor));

src/agent/job_monitor.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub struct JobMonitorRoute {
4444
/// the main agent's context window).
4545
pub fn spawn_job_monitor(
4646
job_id: Uuid,
47-
event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
47+
event_rx: broadcast::Receiver<(Uuid, String, SseEvent)>,
4848
inject_tx: mpsc::Sender<IncomingMessage>,
4949
route: JobMonitorRoute,
5050
) -> JoinHandle<()> {
@@ -56,7 +56,7 @@ pub fn spawn_job_monitor(
5656
/// jobs don't stay `InProgress` forever in the `ContextManager`.
5757
pub fn spawn_job_monitor_with_context(
5858
job_id: Uuid,
59-
mut event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
59+
mut event_rx: broadcast::Receiver<(Uuid, String, SseEvent)>,
6060
inject_tx: mpsc::Sender<IncomingMessage>,
6161
route: JobMonitorRoute,
6262
context_manager: Option<Arc<ContextManager>>,
@@ -68,7 +68,7 @@ pub fn spawn_job_monitor_with_context(
6868

6969
loop {
7070
match event_rx.recv().await {
71-
Ok((ev_job_id, event)) => {
71+
Ok((ev_job_id, _user_id, event)) => {
7272
if ev_job_id != job_id {
7373
continue;
7474
}
@@ -162,15 +162,17 @@ pub fn spawn_job_monitor_with_context(
162162
/// inject messages into) but we still need to free the `max_jobs` slot.
163163
pub fn spawn_completion_watcher(
164164
job_id: Uuid,
165-
mut event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
165+
mut event_rx: broadcast::Receiver<(Uuid, String, SseEvent)>,
166166
context_manager: Arc<ContextManager>,
167167
) -> JoinHandle<()> {
168168
let short_id = job_id.to_string()[..8].to_string();
169169

170170
tokio::spawn(async move {
171171
loop {
172172
match event_rx.recv().await {
173-
Ok((ev_job_id, SseEvent::JobResult { status, .. })) if ev_job_id == job_id => {
173+
Ok((ev_job_id, _user_id, SseEvent::JobResult { status, .. }))
174+
if ev_job_id == job_id =>
175+
{
174176
let target = if status == "completed" {
175177
JobState::Completed
176178
} else {
@@ -227,7 +229,7 @@ mod tests {
227229

228230
#[tokio::test]
229231
async fn test_monitor_forwards_assistant_messages() {
230-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
232+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
231233
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
232234

233235
let job_id = Uuid::new_v4();
@@ -237,6 +239,7 @@ mod tests {
237239
event_tx
238240
.send((
239241
job_id,
242+
"test-user".to_string(),
240243
SseEvent::JobMessage {
241244
job_id: job_id.to_string(),
242245
role: "assistant".to_string(),
@@ -259,7 +262,7 @@ mod tests {
259262

260263
#[tokio::test]
261264
async fn test_monitor_ignores_other_jobs() {
262-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
265+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
263266
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
264267

265268
let job_id = Uuid::new_v4();
@@ -270,6 +273,7 @@ mod tests {
270273
event_tx
271274
.send((
272275
other_job_id,
276+
"test-user".to_string(),
273277
SseEvent::JobMessage {
274278
job_id: other_job_id.to_string(),
275279
role: "assistant".to_string(),
@@ -289,7 +293,7 @@ mod tests {
289293

290294
#[tokio::test]
291295
async fn test_monitor_exits_on_job_result() {
292-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
296+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
293297
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
294298

295299
let job_id = Uuid::new_v4();
@@ -299,6 +303,7 @@ mod tests {
299303
event_tx
300304
.send((
301305
job_id,
306+
"test-user".to_string(),
302307
SseEvent::JobResult {
303308
job_id: job_id.to_string(),
304309
status: "completed".to_string(),
@@ -324,7 +329,7 @@ mod tests {
324329

325330
#[tokio::test]
326331
async fn test_monitor_skips_tool_events() {
327-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
332+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
328333
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
329334

330335
let job_id = Uuid::new_v4();
@@ -334,6 +339,7 @@ mod tests {
334339
event_tx
335340
.send((
336341
job_id,
342+
"test-user".to_string(),
337343
SseEvent::JobToolUse {
338344
job_id: job_id.to_string(),
339345
tool_name: "shell".to_string(),
@@ -346,6 +352,7 @@ mod tests {
346352
event_tx
347353
.send((
348354
job_id,
355+
"test-user".to_string(),
349356
SseEvent::JobMessage {
350357
job_id: job_id.to_string(),
351358
role: "user".to_string(),
@@ -402,7 +409,7 @@ mod tests {
402409
.await
403410
.unwrap();
404411

405-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
412+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
406413
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
407414

408415
let handle = spawn_job_monitor_with_context(
@@ -417,6 +424,7 @@ mod tests {
417424
event_tx
418425
.send((
419426
job_id,
427+
"test-user".to_string(),
420428
SseEvent::JobResult {
421429
job_id: job_id.to_string(),
422430
status: "completed".to_string(),
@@ -450,7 +458,7 @@ mod tests {
450458
.await
451459
.unwrap();
452460

453-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
461+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
454462
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);
455463

456464
let handle = spawn_job_monitor_with_context(
@@ -465,6 +473,7 @@ mod tests {
465473
event_tx
466474
.send((
467475
job_id,
476+
"test-user".to_string(),
468477
SseEvent::JobResult {
469478
job_id: job_id.to_string(),
470479
status: "failed".to_string(),
@@ -498,12 +507,13 @@ mod tests {
498507
.await
499508
.unwrap();
500509

501-
let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
510+
let (event_tx, _) = broadcast::channel::<(Uuid, String, SseEvent)>(16);
502511
let handle = spawn_completion_watcher(job_id, event_tx.subscribe(), Arc::clone(&cm));
503512

504513
event_tx
505514
.send((
506515
job_id,
516+
"test-user".to_string(),
507517
SseEvent::JobResult {
508518
job_id: job_id.to_string(),
509519
status: "completed".to_string(),

src/agent/scheduler.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use tokio::task::JoinHandle;
99
use uuid::Uuid;
1010

1111
use crate::agent::task::{Task, TaskContext, TaskOutput};
12-
use crate::channels::web::types::SseEvent;
1312
use crate::config::AgentConfig;
1413
use crate::context::{ContextManager, JobContext, JobState};
1514
use crate::db::Database;
@@ -67,8 +66,8 @@ pub struct Scheduler {
6766
extension_manager: Option<Arc<ExtensionManager>>,
6867
store: Option<Arc<dyn Database>>,
6968
hooks: Arc<HookRegistry>,
70-
/// SSE broadcast sender for live job event streaming.
71-
sse_tx: Option<tokio::sync::broadcast::Sender<SseEvent>>,
69+
/// SSE manager for live job event streaming.
70+
sse_tx: Option<Arc<crate::channels::web::sse::SseManager>>,
7271
/// HTTP interceptor for trace recording/replay (propagated to workers).
7372
http_interceptor: Option<Arc<dyn crate::llm::recording::HttpInterceptor>>,
7473
/// Running jobs (main LLM-driven jobs).
@@ -102,9 +101,9 @@ impl Scheduler {
102101
}
103102
}
104103

105-
/// Set the SSE broadcast sender for live job event streaming.
106-
pub fn set_sse_sender(&mut self, tx: tokio::sync::broadcast::Sender<SseEvent>) {
107-
self.sse_tx = Some(tx);
104+
/// Set the SSE manager for live job event streaming.
105+
pub fn set_sse_sender(&mut self, sse: Arc<crate::channels::web::sse::SseManager>) {
106+
self.sse_tx = Some(sse);
108107
}
109108

110109
/// Set the HTTP interceptor for trace recording/replay.

src/agent/thread_ops.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1646,7 +1646,7 @@ impl Agent {
16461646
};
16471647

16481648
match ext_mgr
1649-
.configure_token(&pending.extension_name, token)
1649+
.configure_token(&pending.extension_name, token, &message.user_id)
16501650
.await
16511651
{
16521652
Ok(result) if result.activated => {

src/app.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ impl AppBuilder {
327327
.with_search_config(&self.config.search);
328328

329329
if let Some(ref emb) = embeddings {
330-
ws = ws.with_embeddings_cached(emb.clone(), emb_cache_config);
330+
ws = ws.with_embeddings_cached(emb.clone(), emb_cache_config.clone());
331331
}
332332

333333
// Wire workspace-level settings (read scopes, memory layers)
@@ -341,7 +341,35 @@ impl AppBuilder {
341341
}
342342
ws = ws.with_memory_layers(self.config.workspace.memory_layers.clone());
343343
let ws = Arc::new(ws);
344-
tools.register_memory_tools(Arc::clone(&ws));
344+
345+
// Detect multi-tenant mode: when GATEWAY_USER_TOKENS is configured,
346+
// each authenticated user needs their own workspace scope. Use
347+
// WorkspacePool (which implements WorkspaceResolver) to create
348+
// per-user workspaces on demand instead of sharing the startup
349+
// workspace across all users.
350+
let is_multi_tenant = self
351+
.config
352+
.channels
353+
.gateway
354+
.as_ref()
355+
.is_some_and(|gw| gw.user_tokens.is_some());
356+
357+
if is_multi_tenant {
358+
let pool = Arc::new(crate::channels::web::server::WorkspacePool::new(
359+
Arc::clone(db),
360+
embeddings.clone(),
361+
emb_cache_config,
362+
self.config.search.clone(),
363+
self.config.workspace.clone(),
364+
));
365+
tools.register_memory_tools_with_resolver(pool);
366+
tracing::info!(
367+
"Memory tools configured with per-user workspace resolver (multi-tenant mode)"
368+
);
369+
} else {
370+
tools.register_memory_tools(Arc::clone(&ws));
371+
}
372+
345373
Some(ws)
346374
} else {
347375
None

0 commit comments

Comments
 (0)