Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/db/session-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export function openOutboundDb(dbPath: string): Database.Database {
return db;
}

/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
export function openOutboundDbRw(dbPath: string): Database.Database {
const db = new Database(dbPath);
db.pragma('journal_mode = DELETE');
db.pragma('busy_timeout = 5000');
return db;
}

export function upsertSessionRouting(
db: Database.Database,
routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null },
Expand Down
17 changes: 13 additions & 4 deletions src/host-sweep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
type ContainerState,
} from './db/session-db.js';
import { log } from './log.js';
import { openInboundDb, openOutboundDb, inboundDbPath, heartbeatPath } from './session-manager.js';
import { openInboundDb, openOutboundDb, openOutboundDbRw, inboundDbPath, heartbeatPath } from './session-manager.js';
import { isContainerRunning, killContainer, wakeContainer } from './container-runner.js';
import type { Session } from './types.js';

Expand Down Expand Up @@ -302,8 +302,17 @@ function resetStuckProcessingRows(
// agent-runner has a chance to run clearStaleProcessingAcks() on startup.
// We're safe to write outbound.db here because we just killed the container
// that owned it (or it crashed and left no writer behind).
const cleared = deleteOrphanProcessingClaims(outDb);
if (cleared > 0) {
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
// outDb was opened readonly for reads above; reopen with write access for this delete.
let outDbRw: Database.Database | null = null;
try {
outDbRw = openOutboundDbRw(session.agent_group_id, session.id);
const cleared = deleteOrphanProcessingClaims(outDbRw);
if (cleared > 0) {
log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason });
}
} catch (err) {
log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err });
} finally {
outDbRw?.close();
}
}
6 changes: 6 additions & 0 deletions src/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
ensureSchema,
openInboundDb as openInboundDbRaw,
openOutboundDb as openOutboundDbRaw,
openOutboundDbRw as openOutboundDbRwRaw,
upsertSessionRouting,
insertMessage,
migrateMessagesInTable,
Expand Down Expand Up @@ -355,6 +356,11 @@ export function openOutboundDb(agentGroupId: string, sessionId: string): Databas
return openOutboundDbRaw(outboundDbPath(agentGroupId, sessionId));
}

/** Open the outbound DB for a session with write access. Only safe to call when no container is running. */
export function openOutboundDbRw(agentGroupId: string, sessionId: string): Database.Database {
return openOutboundDbRwRaw(outboundDbPath(agentGroupId, sessionId));
}

/**
* Write a message directly to a session's outbound DB so the host delivery
* loop picks it up. Used by the command gate to send denial responses
Expand Down