Skip to content

Commit d3976b9

Browse files
authored
fix: replace recursive session filewatch (#1474)
1 parent 7fc138f commit d3976b9

5 files changed

Lines changed: 231 additions & 31 deletions

File tree

bun.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
"@opentui/core": "0.1.103",
5454
"@opentui/react": "0.1.103",
5555
"@tauri-apps/api": "2.10.1",
56+
"chokidar": "^5.0.0",
5657
"commander": "12.1.0",
5758
"ignore": "7.0.5",
5859
"js-yaml": "4.1.1",

src/lib/session-filewatch.test.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from 'node:fs';
1919
import { tmpdir } from 'node:os';
2020
import { join } from 'node:path';
2121
import type { FilewatchDeps } from './session-filewatch.js';
22-
import { handleFileChange, isForeignKeyViolation, resetUnrecoverableSessions } from './session-filewatch.js';
22+
import {
23+
extractSessionInfo,
24+
handleFileChange,
25+
isForeignKeyViolation,
26+
resetUnrecoverableSessions,
27+
} from './session-filewatch.js';
2328

2429
// ============================================================================
2530
// Test helpers
@@ -78,6 +83,44 @@ describe('isForeignKeyViolation', () => {
7883
});
7984
});
8085

86+
// ============================================================================
87+
// extractSessionInfo — Claude JSONL path layouts
88+
// ============================================================================
89+
90+
describe('extractSessionInfo', () => {
91+
test('parses root-level Claude project JSONL paths', () => {
92+
expect(extractSessionInfo('/tmp/claude/projects/project-hash/session-123.jsonl')).toEqual({
93+
sessionId: 'session-123',
94+
projectPath: '/tmp/claude/projects/project-hash',
95+
parentSessionId: null,
96+
isSubagent: false,
97+
});
98+
});
99+
100+
test('parses legacy sessions directory JSONL paths', () => {
101+
expect(extractSessionInfo('/tmp/claude/projects/project-hash/sessions/session-456.jsonl')).toEqual({
102+
sessionId: 'session-456',
103+
projectPath: '/tmp/claude/projects/project-hash',
104+
parentSessionId: null,
105+
isSubagent: false,
106+
});
107+
});
108+
109+
test('parses subagent JSONL paths', () => {
110+
expect(extractSessionInfo('/tmp/claude/projects/project-hash/parent-123/subagents/session-789.jsonl')).toEqual({
111+
sessionId: 'session-789',
112+
projectPath: '/tmp/claude/projects/project-hash',
113+
parentSessionId: 'parent-123',
114+
isSubagent: true,
115+
});
116+
});
117+
118+
test('rejects non-session JSONL paths outside projects', () => {
119+
expect(extractSessionInfo('/tmp/claude/not-projects/session-123.jsonl')).toBeNull();
120+
expect(extractSessionInfo('/tmp/claude/projects/project-hash/notes/session-123.jsonl')).toBeNull();
121+
});
122+
});
123+
81124
// ============================================================================
82125
// handleFileChange — FK circuit breaker (Bug A regression)
83126
// ============================================================================

src/lib/session-filewatch.ts

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/**
2-
* Session Filewatch — Event-driven JSONL capture via fs.watch.
2+
* Session Filewatch — Event-driven JSONL capture via chokidar.
33
*
4-
* Watches ~/.claude/projects/ recursively for JSONL changes.
4+
* Watches ~/.claude/projects/ for JSONL changes.
55
* Reacts only when a file is written — zero CPU when idle.
66
* Reads incrementally from stored offset, debounced 500ms per file.
77
*/
88

9-
import { type FSWatcher, watch } from 'node:fs';
109
import { homedir } from 'node:os';
11-
import { basename, join } from 'node:path';
10+
import { basename, isAbsolute, join, resolve } from 'node:path';
11+
import { type FSWatcher, watch } from 'chokidar';
1212

1313
import { buildWorkerMap, ingestFileFull, setLiveWorkPending } from './session-capture.js';
1414

@@ -23,6 +23,7 @@ let watcher: FSWatcher | null = null;
2323
const offsetCache = new Map<string, number>();
2424
const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>();
2525
const DEBOUNCE_MS = 500;
26+
const WATCH_DEPTH = 4;
2627

2728
/**
2829
* Sessions where ingest raised an unrecoverable (FK) error — logged once,
@@ -72,10 +73,11 @@ async function loadOffsets(sql: SqlClient): Promise<void> {
7273
// File event handler
7374
// ============================================================================
7475

75-
function extractSessionInfo(
76+
export function extractSessionInfo(
7677
filePath: string,
7778
): { sessionId: string; projectPath: string; parentSessionId: string | null; isSubagent: boolean } | null {
78-
// Main: ~/.claude/projects/<hash>/sessions/<id>.jsonl
79+
// Main: ~/.claude/projects/<hash>/<id>.jsonl
80+
// Legacy main: ~/.claude/projects/<hash>/sessions/<id>.jsonl
7981
// Subagent: ~/.claude/projects/<hash>/<parent-id>/subagents/<id>.jsonl
8082
if (!filePath.endsWith('.jsonl')) return null;
8183

@@ -93,11 +95,18 @@ function extractSessionInfo(
9395
}
9496

9597
if (sessionsIdx > 0) {
96-
// Main session
98+
// Legacy main session
9799
const projectPath = parts.slice(0, sessionsIdx).join('/');
98100
return { sessionId, projectPath, parentSessionId: null, isSubagent: false };
99101
}
100102

103+
const projectIdx = parts.lastIndexOf('projects');
104+
if (projectIdx >= 0 && parts.length === projectIdx + 3) {
105+
// Main session
106+
const projectPath = parts.slice(0, projectIdx + 2).join('/');
107+
return { sessionId, projectPath, parentSessionId: null, isSubagent: false };
108+
}
109+
101110
return null;
102111
}
103112

@@ -163,6 +172,55 @@ export async function handleFileChange(
163172
}
164173
}
165174

175+
function shouldIgnoreWatchPath(path: string, stats?: { isFile: () => boolean }): boolean {
176+
return stats?.isFile() === true && !path.endsWith('.jsonl');
177+
}
178+
179+
function normalizeWatchEventPath(claudeDir: string, filePath: string): string {
180+
return isAbsolute(filePath) ? filePath : resolve(claudeDir, filePath);
181+
}
182+
183+
function scheduleFileChange(filePath: string, sql: SqlClient): void {
184+
if (!filePath.endsWith('.jsonl')) return;
185+
186+
const existing = debounceTimers.get(filePath);
187+
if (existing) clearTimeout(existing);
188+
189+
debounceTimers.set(
190+
filePath,
191+
setTimeout(() => {
192+
debounceTimers.delete(filePath);
193+
handleFileChange(filePath, sql).catch((err) => {
194+
const message = err instanceof Error ? err.message : String(err);
195+
console.error(`[filewatch] unhandled error for ${filePath}: ${message}`);
196+
});
197+
}, DEBOUNCE_MS),
198+
);
199+
}
200+
201+
export function createJsonlWatcher(claudeDir: string, onJsonlChange: (filePath: string) => void): FSWatcher {
202+
const jsonlWatcher = watch(claudeDir, {
203+
ignoreInitial: true,
204+
depth: WATCH_DEPTH,
205+
ignored: shouldIgnoreWatchPath,
206+
awaitWriteFinish: {
207+
stabilityThreshold: DEBOUNCE_MS,
208+
pollInterval: 100,
209+
},
210+
atomic: true,
211+
});
212+
213+
const emitJsonlChange = (filePath: string): void => {
214+
if (!filePath.endsWith('.jsonl')) return;
215+
onJsonlChange(normalizeWatchEventPath(claudeDir, filePath));
216+
};
217+
218+
jsonlWatcher.on('add', emitJsonlChange);
219+
jsonlWatcher.on('change', emitJsonlChange);
220+
221+
return jsonlWatcher;
222+
}
223+
166224
// ============================================================================
167225
// Start / Stop
168226
// ============================================================================
@@ -176,30 +234,11 @@ export async function startFilewatch(sql: SqlClient): Promise<boolean> {
176234
await loadOffsets(sql);
177235

178236
try {
179-
watcher = watch(claudeDir, { recursive: true }, (_eventType, filename) => {
180-
if (!filename || !filename.endsWith('.jsonl')) return;
181-
182-
const fullPath = join(claudeDir, filename);
183-
184-
// Debounce per file — Claude writes multiple lines per turn
185-
const existing = debounceTimers.get(fullPath);
186-
if (existing) clearTimeout(existing);
187-
188-
debounceTimers.set(
189-
fullPath,
190-
setTimeout(() => {
191-
debounceTimers.delete(fullPath);
192-
handleFileChange(fullPath, sql).catch((err) => {
193-
const message = err instanceof Error ? err.message : String(err);
194-
console.error(`[filewatch] unhandled error for ${fullPath}: ${message}`);
195-
});
196-
}, DEBOUNCE_MS),
197-
);
198-
});
237+
watcher = createJsonlWatcher(claudeDir, (fullPath) => scheduleFileChange(fullPath, sql));
199238

200239
watcher.on('error', (err) => {
201-
console.error('[filewatch] watcher error:', err.message);
202-
// Could fall back to polling here in the future
240+
const message = err instanceof Error ? err.message : String(err);
241+
console.error('[filewatch] watcher error:', message);
203242
});
204243

205244
console.log(`[filewatch] watching ${claudeDir} (${offsetCache.size} sessions cached)`);
@@ -213,7 +252,7 @@ export async function startFilewatch(sql: SqlClient): Promise<boolean> {
213252

214253
export function stopFilewatch(): void {
215254
if (watcher) {
216-
watcher.close();
255+
void watcher.close();
217256
watcher = null;
218257
}
219258
for (const timer of debounceTimers.values()) {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
4+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
5+
REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
6+
7+
cd "$REPO_ROOT"
8+
9+
BENCH_PROJECTS="${BENCH_PROJECTS:-125}"
10+
BENCH_JSONLS_PER_PROJECT="${BENCH_JSONLS_PER_PROJECT:-8}"
11+
BENCH_APPENDS="${BENCH_APPENDS:-50}"
12+
BENCH_MAX_WAKEUPS_PER_SEC="${BENCH_MAX_WAKEUPS_PER_SEC:-4}"
13+
BENCH_MAX_WAKEUPS="${BENCH_MAX_WAKEUPS:-3}"
14+
15+
BENCH_PROJECTS="$BENCH_PROJECTS" \
16+
BENCH_JSONLS_PER_PROJECT="$BENCH_JSONLS_PER_PROJECT" \
17+
BENCH_APPENDS="$BENCH_APPENDS" \
18+
BENCH_MAX_WAKEUPS_PER_SEC="$BENCH_MAX_WAKEUPS_PER_SEC" \
19+
BENCH_MAX_WAKEUPS="$BENCH_MAX_WAKEUPS" \
20+
bun --eval '
21+
import { appendFile, mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
22+
import { tmpdir } from "node:os";
23+
import { join } from "node:path";
24+
import { pathToFileURL } from "node:url";
25+
26+
const projects = Number(process.env.BENCH_PROJECTS ?? 125);
27+
const jsonlsPerProject = Number(process.env.BENCH_JSONLS_PER_PROJECT ?? 8);
28+
const appends = Number(process.env.BENCH_APPENDS ?? 50);
29+
const maxWakeupsPerSec = Number(process.env.BENCH_MAX_WAKEUPS_PER_SEC ?? 4);
30+
const maxWakeups = Number(process.env.BENCH_MAX_WAKEUPS ?? 3);
31+
const repoRoot = process.cwd();
32+
const { createJsonlWatcher } = await import(pathToFileURL(join(repoRoot, "src/lib/session-filewatch.ts")).href);
33+
34+
function sleep(ms) {
35+
return new Promise((resolve) => setTimeout(resolve, ms));
36+
}
37+
38+
async function waitForWatcherReady(watcher) {
39+
await new Promise((resolve, reject) => {
40+
const timer = setTimeout(() => reject(new Error("watcher did not become ready within 10s")), 10_000);
41+
watcher.once("ready", () => {
42+
clearTimeout(timer);
43+
resolve();
44+
});
45+
watcher.once("error", (err) => {
46+
clearTimeout(timer);
47+
reject(err);
48+
});
49+
});
50+
}
51+
52+
const tmpRoot = await mkdtemp(join(tmpdir(), "genie-filewatch-bench-"));
53+
const projectsDir = join(tmpRoot, "projects");
54+
55+
try {
56+
await mkdir(projectsDir, { recursive: true });
57+
58+
let hotPath = "";
59+
for (let projectIndex = 0; projectIndex < projects; projectIndex++) {
60+
const projectDir = join(projectsDir, `project-${String(projectIndex).padStart(4, "0")}`);
61+
await mkdir(projectDir, { recursive: true });
62+
63+
for (let sessionIndex = 0; sessionIndex < jsonlsPerProject; sessionIndex++) {
64+
if (sessionIndex % 2 === 0) {
65+
const filePath = join(projectDir, `session-${sessionIndex}.jsonl`);
66+
await writeFile(filePath, "{\"type\":\"summary\"}\n");
67+
hotPath ||= filePath;
68+
} else {
69+
const subagentDir = join(projectDir, `parent-${sessionIndex}`, "subagents");
70+
await mkdir(subagentDir, { recursive: true });
71+
await writeFile(join(subagentDir, `child-${sessionIndex}.jsonl`), "{\"type\":\"summary\"}\n");
72+
}
73+
}
74+
}
75+
76+
let wakeups = 0;
77+
const watcher = createJsonlWatcher(projectsDir, () => {
78+
wakeups++;
79+
});
80+
81+
await waitForWatcherReady(watcher);
82+
83+
const start = process.hrtime.bigint();
84+
for (let index = 0; index < appends; index++) {
85+
await appendFile(hotPath, `{"type":"assistant","index":${index}}\n`);
86+
}
87+
await sleep(1500);
88+
const elapsedSeconds = Number(process.hrtime.bigint() - start) / 1_000_000_000;
89+
90+
await watcher.close();
91+
92+
const wakeupsPerSec = wakeups / elapsedSeconds;
93+
const result = {
94+
projects,
95+
jsonls: projects * jsonlsPerProject,
96+
appends,
97+
wakeups,
98+
elapsed_seconds: Number(elapsedSeconds.toFixed(3)),
99+
wakeups_per_sec: Number(wakeupsPerSec.toFixed(3)),
100+
max_wakeups: maxWakeups,
101+
max_wakeups_per_sec: maxWakeupsPerSec,
102+
};
103+
console.log(JSON.stringify(result, null, 2));
104+
105+
if (wakeups > maxWakeups || wakeupsPerSec > maxWakeupsPerSec) {
106+
console.error("[filewatch-bench] wakeup threshold exceeded");
107+
process.exit(1);
108+
}
109+
} finally {
110+
await rm(tmpRoot, { recursive: true, force: true });
111+
}
112+
'

0 commit comments

Comments
 (0)