|
3 | 3 | * ACTION-ITEMS item 9. Lives on the pure helper `decideStuckAction` so we |
4 | 4 | * don't have to mock the filesystem or the container runner. |
5 | 5 | */ |
| 6 | +import Database from 'better-sqlite3'; |
6 | 7 | import { describe, expect, it } from 'vitest'; |
7 | 8 |
|
8 | | -import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, decideStuckAction } from './host-sweep.js'; |
| 9 | +import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js'; |
| 10 | +import { |
| 11 | + ABSOLUTE_CEILING_MS, |
| 12 | + CLAIM_STUCK_MS, |
| 13 | + _resetStuckProcessingRowsForTesting, |
| 14 | + decideStuckAction, |
| 15 | +} from './host-sweep.js'; |
| 16 | +import type { Session } from './types.js'; |
9 | 17 |
|
10 | 18 | const BASE = Date.parse('2026-04-20T12:00:00.000Z'); |
11 | 19 |
|
@@ -144,3 +152,143 @@ describe('decideStuckAction', () => { |
144 | 152 | expect(res.action).toBe('ok'); |
145 | 153 | }); |
146 | 154 | }); |
| 155 | + |
| 156 | +// ───────────────────────────────────────────────────────────────────────────── |
| 157 | +// Orphan claim cleanup (regression test for the SIGKILL → claim-stuck loop) |
| 158 | +// |
| 159 | +// Repro of the production bug seen 2026-04-30: container A claimed message M |
| 160 | +// (writes processing_ack row with status='processing'). Host kills A by |
| 161 | +// absolute-ceiling. Old behavior: messages_in.M was reset to pending but |
| 162 | +// processing_ack.M survived. On the next sweep tick, wakeContainer spawned B, |
| 163 | +// the same-tick SLA check saw M's stale claim age (hours), and SIGKILL'd B |
| 164 | +// before agent-runner could run clearStaleProcessingAcks(). Loop. The fix |
| 165 | +// deletes processing_ack 'processing' rows when the host kills/cleans the |
| 166 | +// container, breaking the loop atomically. |
| 167 | +// ───────────────────────────────────────────────────────────────────────────── |
| 168 | + |
| 169 | +function makeSessionDbs(): { inDb: Database.Database; outDb: Database.Database } { |
| 170 | + const inDb = new Database(':memory:'); |
| 171 | + inDb.exec(` |
| 172 | + CREATE TABLE messages_in ( |
| 173 | + id TEXT PRIMARY KEY, |
| 174 | + seq INTEGER UNIQUE, |
| 175 | + kind TEXT NOT NULL, |
| 176 | + timestamp TEXT NOT NULL, |
| 177 | + status TEXT DEFAULT 'pending', |
| 178 | + process_after TEXT, |
| 179 | + recurrence TEXT, |
| 180 | + series_id TEXT, |
| 181 | + tries INTEGER DEFAULT 0, |
| 182 | + trigger INTEGER NOT NULL DEFAULT 1, |
| 183 | + platform_id TEXT, |
| 184 | + channel_type TEXT, |
| 185 | + thread_id TEXT, |
| 186 | + content TEXT NOT NULL |
| 187 | + ); |
| 188 | + `); |
| 189 | + const outDb = new Database(':memory:'); |
| 190 | + outDb.exec(` |
| 191 | + CREATE TABLE processing_ack ( |
| 192 | + message_id TEXT PRIMARY KEY, |
| 193 | + status TEXT NOT NULL, |
| 194 | + status_changed TEXT NOT NULL |
| 195 | + ); |
| 196 | + `); |
| 197 | + return { inDb, outDb }; |
| 198 | +} |
| 199 | + |
| 200 | +function fakeSession(): Session { |
| 201 | + return { |
| 202 | + id: 'sess-test', |
| 203 | + agent_group_id: 'ag-test', |
| 204 | + messaging_group_id: null, |
| 205 | + thread_id: null, |
| 206 | + agent_provider: null, |
| 207 | + status: 'active', |
| 208 | + container_status: 'stopped', |
| 209 | + last_active: null, |
| 210 | + created_at: new Date().toISOString(), |
| 211 | + }; |
| 212 | +} |
| 213 | + |
| 214 | +describe('deleteOrphanProcessingClaims', () => { |
| 215 | + it('removes only processing rows, leaves completed/failed alone', () => { |
| 216 | + const { outDb } = makeSessionDbs(); |
| 217 | + const ts = new Date().toISOString(); |
| 218 | + outDb.prepare("INSERT INTO processing_ack VALUES ('m-proc', 'processing', ?)").run(ts); |
| 219 | + outDb.prepare("INSERT INTO processing_ack VALUES ('m-done', 'completed', ?)").run(ts); |
| 220 | + outDb.prepare("INSERT INTO processing_ack VALUES ('m-fail', 'failed', ?)").run(ts); |
| 221 | + |
| 222 | + const removed = deleteOrphanProcessingClaims(outDb); |
| 223 | + |
| 224 | + expect(removed).toBe(1); |
| 225 | + const remaining = outDb.prepare('SELECT message_id, status FROM processing_ack ORDER BY message_id').all(); |
| 226 | + expect(remaining).toEqual([ |
| 227 | + { message_id: 'm-done', status: 'completed' }, |
| 228 | + { message_id: 'm-fail', status: 'failed' }, |
| 229 | + ]); |
| 230 | + }); |
| 231 | + |
| 232 | + it('returns 0 when nothing to clear', () => { |
| 233 | + const { outDb } = makeSessionDbs(); |
| 234 | + expect(deleteOrphanProcessingClaims(outDb)).toBe(0); |
| 235 | + }); |
| 236 | +}); |
| 237 | + |
| 238 | +describe('resetStuckProcessingRows — orphan claim cleanup', () => { |
| 239 | + it('deletes orphan processing_ack rows so next sweep tick does not see them', () => { |
| 240 | + const { inDb, outDb } = makeSessionDbs(); |
| 241 | + const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); // 2h ago |
| 242 | + |
| 243 | + // messages_in.status stays 'pending' during processing — only the |
| 244 | + // container's processing_ack moves to 'processing'. See |
| 245 | + // src/db/schema.ts header comment on processing_ack. |
| 246 | + inDb |
| 247 | + .prepare( |
| 248 | + "INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES ('m-1', 1, 'chat', ?, 'pending', '{}')", |
| 249 | + ) |
| 250 | + .run(claimedAt); |
| 251 | + outDb.prepare("INSERT INTO processing_ack VALUES ('m-1', 'processing', ?)").run(claimedAt); |
| 252 | + |
| 253 | + // Sanity: the orphan claim is what would trip claim-stuck. |
| 254 | + expect(getProcessingClaims(outDb)).toHaveLength(1); |
| 255 | + |
| 256 | + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling'); |
| 257 | + |
| 258 | + // Regression assertion: orphan claim is gone — next sweep tick will see |
| 259 | + // an empty claims list and not kill the freshly respawned container. |
| 260 | + expect(getProcessingClaims(outDb)).toEqual([]); |
| 261 | + |
| 262 | + // And the message itself was rescheduled with backoff (existing behavior). |
| 263 | + const row = inDb.prepare('SELECT status, tries, process_after FROM messages_in WHERE id = ?').get('m-1') as { |
| 264 | + status: string; |
| 265 | + tries: number; |
| 266 | + process_after: string | null; |
| 267 | + }; |
| 268 | + expect(row.status).toBe('pending'); |
| 269 | + expect(row.tries).toBe(1); |
| 270 | + expect(row.process_after).not.toBeNull(); |
| 271 | + }); |
| 272 | + |
| 273 | + it('still clears orphan claims even when the inbound message has already been retried (skip path)', () => { |
| 274 | + // Edge case: the inbound row was already rescheduled (process_after in |
| 275 | + // future), so the per-message retry loop skips it. The orphan in |
| 276 | + // processing_ack must still be removed — otherwise the bug remains. |
| 277 | + const { inDb, outDb } = makeSessionDbs(); |
| 278 | + const claimedAt = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); |
| 279 | + const future = new Date(Date.now() + 60_000).toISOString(); |
| 280 | + |
| 281 | + inDb |
| 282 | + .prepare( |
| 283 | + "INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, tries, content) VALUES ('m-2', 2, 'chat', ?, 'pending', ?, 1, '{}')", |
| 284 | + ) |
| 285 | + .run(claimedAt, future); |
| 286 | + outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt); |
| 287 | + |
| 288 | + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck'); |
| 289 | + |
| 290 | + expect(getProcessingClaims(outDb)).toEqual([]); |
| 291 | + const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number }; |
| 292 | + expect(row.tries).toBe(1); // not bumped, the skip path held |
| 293 | + }); |
| 294 | +}); |
0 commit comments