Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/container-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,12 @@ function buildVolumeMounts(
}

// Sync skills from container/skills/ into each group's .claude/skills/
// Clear existing skills first to remove stale ones from previous runs
const skillsSrc = path.join(process.cwd(), 'container', 'skills');
const skillsDst = path.join(groupSessionsDir, 'skills');
if (fs.existsSync(skillsDst)) {
fs.rmSync(skillsDst, { recursive: true, force: true });
}
if (fs.existsSync(skillsSrc)) {
for (const skillDir of fs.readdirSync(skillsSrc)) {
const srcDir = path.join(skillsSrc, skillDir);
Expand Down
4 changes: 4 additions & 0 deletions src/group-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export class GroupQueue {
return state;
}

isActive(groupJid: string): boolean {
return this.groups.get(groupJid)?.active ?? false;
}

setProcessMessagesFn(fn: (groupJid: string) => Promise<boolean>): void {
this.processMessagesFn = fn;
}
Expand Down
47 changes: 46 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ let lastTimestamp = '';
let sessions: Record<string, string> = {};
let registeredGroups: Record<string, RegisteredGroup> = {};
let lastAgentTimestamp: Record<string, string> = {};
let cursorBeforePipe: Record<string, string> = {};
let messageLoopRunning = false;

const channels: Channel[] = [];
Expand All @@ -81,6 +82,13 @@ function loadState(): void {
logger.warn('Corrupted last_agent_timestamp in DB, resetting');
lastAgentTimestamp = {};
}
const pipeCursor = getRouterState('cursor_before_pipe');
try {
cursorBeforePipe = pipeCursor ? JSON.parse(pipeCursor) : {};
} catch {
logger.warn('Corrupted cursor_before_pipe in DB, resetting');
cursorBeforePipe = {};
}
sessions = getAllSessions();
registeredGroups = getAllRegisteredGroups();
logger.info(
Expand All @@ -92,6 +100,7 @@ function loadState(): void {
function saveState(): void {
setRouterState('last_timestamp', lastTimestamp);
setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp));
setRouterState('cursor_before_pipe', JSON.stringify(cursorBeforePipe));
}

function registerGroup(jid: string, group: RegisteredGroup): void {
Expand Down Expand Up @@ -182,8 +191,9 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const prompt = formatMessages(missedMessages, TIMEZONE);

// Advance cursor so the piping path in startMessageLoop won't re-fetch
// these messages. Save the old cursor so we can roll back on error.
// these messages. Save the old cursor so we can roll back on error or crash.
const previousCursor = lastAgentTimestamp[chatJid] || '';
cursorBeforePipe[chatJid] = previousCursor;
lastAgentTimestamp[chatJid] =
missedMessages[missedMessages.length - 1].timestamp;
saveState();
Expand Down Expand Up @@ -230,6 +240,11 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
}

if (result.status === 'success') {
// Clear pipe rollback cursor now that output was delivered.
// Don't wait for processGroupMessages to return — if the process
// dies between here and the return, we'd wrongly roll back.
delete cursorBeforePipe[chatJid];
saveState();
queue.notifyIdle(chatJid);
}

Expand Down Expand Up @@ -451,6 +466,31 @@ async function startMessageLoop(): Promise<void> {
* Handles crash between advancing lastTimestamp and processing messages.
*/
function recoverPendingMessages(): void {
// Roll back any piped-message cursors that were persisted before a crash.
// This ensures messages piped to a now-dead container are re-fetched.
// IMPORTANT: Only roll back if the container is no longer running — rolling
// back while the container is alive causes duplicate processing.
let rolledBack = false;
for (const [chatJid, savedCursor] of Object.entries(cursorBeforePipe)) {
if (queue.isActive(chatJid)) {
logger.debug(
{ chatJid },
'Recovery: skipping piped-cursor rollback, container still active',
);
continue;
}
logger.info(
{ chatJid, rolledBackTo: savedCursor },
'Recovery: rolling back piped-message cursor',
);
lastAgentTimestamp[chatJid] = savedCursor;
delete cursorBeforePipe[chatJid];
rolledBack = true;
}
if (rolledBack) {
saveState();
}

for (const [chatJid, group] of Object.entries(registeredGroups)) {
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
Expand Down Expand Up @@ -487,6 +527,11 @@ async function main(): Promise<void> {
logger.info({ signal }, 'Shutdown signal received');
proxyServer.close();
await queue.shutdown(10000);
// Clear piped-message rollback cursors on clean shutdown.
// Containers are detached (not killed), so their work is done or abandoned.
// Without this, recovery rolls back cursors and re-processes handled messages.
cursorBeforePipe = {};
saveState();
for (const ch of channels) await ch.disconnect();
process.exit(0);
};
Expand Down