-
Notifications
You must be signed in to change notification settings - Fork 49
fix(filewatch): replace fs.watch recursive with chokidar (Mac CPU partial mitigation) #1474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,14 @@ | ||
| /** | ||
| * Session Filewatch — Event-driven JSONL capture via fs.watch. | ||
| * Session Filewatch — Event-driven JSONL capture via chokidar. | ||
| * | ||
| * Watches ~/.claude/projects/ recursively for JSONL changes. | ||
| * Watches ~/.claude/projects/ for JSONL changes. | ||
| * Reacts only when a file is written — zero CPU when idle. | ||
| * Reads incrementally from stored offset, debounced 500ms per file. | ||
| */ | ||
|
|
||
| import { type FSWatcher, watch } from 'node:fs'; | ||
| import { homedir } from 'node:os'; | ||
| import { basename, join } from 'node:path'; | ||
| import { basename, isAbsolute, join, resolve } from 'node:path'; | ||
| import { type FSWatcher, watch } from 'chokidar'; | ||
|
|
||
| import { buildWorkerMap, ingestFileFull, setLiveWorkPending } from './session-capture.js'; | ||
|
|
||
|
|
@@ -23,6 +23,7 @@ let watcher: FSWatcher | null = null; | |
| const offsetCache = new Map<string, number>(); | ||
| const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>(); | ||
| const DEBOUNCE_MS = 500; | ||
| const WATCH_DEPTH = 4; | ||
|
|
||
| /** | ||
| * Sessions where ingest raised an unrecoverable (FK) error — logged once, | ||
|
|
@@ -72,10 +73,11 @@ async function loadOffsets(sql: SqlClient): Promise<void> { | |
| // File event handler | ||
| // ============================================================================ | ||
|
|
||
| function extractSessionInfo( | ||
| export function extractSessionInfo( | ||
| filePath: string, | ||
| ): { sessionId: string; projectPath: string; parentSessionId: string | null; isSubagent: boolean } | null { | ||
| // Main: ~/.claude/projects/<hash>/sessions/<id>.jsonl | ||
| // Main: ~/.claude/projects/<hash>/<id>.jsonl | ||
| // Legacy main: ~/.claude/projects/<hash>/sessions/<id>.jsonl | ||
| // Subagent: ~/.claude/projects/<hash>/<parent-id>/subagents/<id>.jsonl | ||
| if (!filePath.endsWith('.jsonl')) return null; | ||
|
|
||
|
|
@@ -93,11 +95,18 @@ function extractSessionInfo( | |
| } | ||
|
|
||
| if (sessionsIdx > 0) { | ||
| // Main session | ||
| // Legacy main session | ||
| const projectPath = parts.slice(0, sessionsIdx).join('/'); | ||
| return { sessionId, projectPath, parentSessionId: null, isSubagent: false }; | ||
| } | ||
|
|
||
| const projectIdx = parts.lastIndexOf('projects'); | ||
| if (projectIdx >= 0 && parts.length === projectIdx + 3) { | ||
| // Main session | ||
| const projectPath = parts.slice(0, projectIdx + 2).join('/'); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return { sessionId, projectPath, parentSessionId: null, isSubagent: false }; | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
|
|
||
|
|
@@ -163,6 +172,55 @@ export async function handleFileChange( | |
| } | ||
| } | ||
|
|
||
| function shouldIgnoreWatchPath(path: string, stats?: { isFile: () => boolean }): boolean { | ||
| return stats?.isFile() === true && !path.endsWith('.jsonl'); | ||
| } | ||
|
|
||
| function normalizeWatchEventPath(claudeDir: string, filePath: string): string { | ||
| return isAbsolute(filePath) ? filePath : resolve(claudeDir, filePath); | ||
| } | ||
|
|
||
| function scheduleFileChange(filePath: string, sql: SqlClient): void { | ||
| if (!filePath.endsWith('.jsonl')) return; | ||
|
|
||
| const existing = debounceTimers.get(filePath); | ||
| if (existing) clearTimeout(existing); | ||
|
|
||
| debounceTimers.set( | ||
| filePath, | ||
| setTimeout(() => { | ||
| debounceTimers.delete(filePath); | ||
| handleFileChange(filePath, sql).catch((err) => { | ||
| const message = err instanceof Error ? err.message : String(err); | ||
| console.error(`[filewatch] unhandled error for ${filePath}: ${message}`); | ||
| }); | ||
| }, DEBOUNCE_MS), | ||
| ); | ||
| } | ||
|
Comment on lines
+183
to
+199
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The manual debouncing logic in scheduleFileChange appears redundant because createJsonlWatcher is already configured with awaitWriteFinish and a stabilityThreshold of DEBOUNCE_MS (500ms). This results in a cumulative delay of 1 second (500ms for file stability + 500ms in setTimeout) before processing a change. Consider removing the manual setTimeout and calling handleFileChange directly from the watcher events, while retaining the error handling. |
||
|
|
||
| export function createJsonlWatcher(claudeDir: string, onJsonlChange: (filePath: string) => void): FSWatcher { | ||
| const jsonlWatcher = watch(claudeDir, { | ||
| ignoreInitial: true, | ||
| depth: WATCH_DEPTH, | ||
| ignored: shouldIgnoreWatchPath, | ||
| awaitWriteFinish: { | ||
| stabilityThreshold: DEBOUNCE_MS, | ||
| pollInterval: 100, | ||
| }, | ||
| atomic: true, | ||
| }); | ||
|
|
||
| const emitJsonlChange = (filePath: string): void => { | ||
| if (!filePath.endsWith('.jsonl')) return; | ||
| onJsonlChange(normalizeWatchEventPath(claudeDir, filePath)); | ||
| }; | ||
|
|
||
| jsonlWatcher.on('add', emitJsonlChange); | ||
| jsonlWatcher.on('change', emitJsonlChange); | ||
|
|
||
| return jsonlWatcher; | ||
| } | ||
|
|
||
| // ============================================================================ | ||
| // Start / Stop | ||
| // ============================================================================ | ||
|
|
@@ -176,30 +234,11 @@ export async function startFilewatch(sql: SqlClient): Promise<boolean> { | |
| await loadOffsets(sql); | ||
|
|
||
| try { | ||
| watcher = watch(claudeDir, { recursive: true }, (_eventType, filename) => { | ||
| if (!filename || !filename.endsWith('.jsonl')) return; | ||
|
|
||
| const fullPath = join(claudeDir, filename); | ||
|
|
||
| // Debounce per file — Claude writes multiple lines per turn | ||
| const existing = debounceTimers.get(fullPath); | ||
| if (existing) clearTimeout(existing); | ||
|
|
||
| debounceTimers.set( | ||
| fullPath, | ||
| setTimeout(() => { | ||
| debounceTimers.delete(fullPath); | ||
| handleFileChange(fullPath, sql).catch((err) => { | ||
| const message = err instanceof Error ? err.message : String(err); | ||
| console.error(`[filewatch] unhandled error for ${fullPath}: ${message}`); | ||
| }); | ||
| }, DEBOUNCE_MS), | ||
| ); | ||
| }); | ||
| watcher = createJsonlWatcher(claudeDir, (fullPath) => scheduleFileChange(fullPath, sql)); | ||
|
|
||
| watcher.on('error', (err) => { | ||
| console.error('[filewatch] watcher error:', err.message); | ||
| // Could fall back to polling here in the future | ||
| const message = err instanceof Error ? err.message : String(err); | ||
| console.error('[filewatch] watcher error:', message); | ||
| }); | ||
|
|
||
| console.log(`[filewatch] watching ${claudeDir} (${offsetCache.size} sessions cached)`); | ||
|
Comment on lines
+237
to
244
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
|
|
@@ -213,7 +252,7 @@ export async function startFilewatch(sql: SqlClient): Promise<boolean> { | |
|
|
||
| export function stopFilewatch(): void { | ||
| if (watcher) { | ||
| watcher.close(); | ||
| void watcher.close(); | ||
| watcher = null; | ||
| } | ||
| for (const timer of debounceTimers.values()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| #!/usr/bin/env bash | ||
| set -euo pipefail | ||
|
|
||
| SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" | ||
| REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" | ||
|
|
||
| cd "$REPO_ROOT" | ||
|
|
||
| BENCH_PROJECTS="${BENCH_PROJECTS:-125}" | ||
| BENCH_JSONLS_PER_PROJECT="${BENCH_JSONLS_PER_PROJECT:-8}" | ||
| BENCH_APPENDS="${BENCH_APPENDS:-50}" | ||
| BENCH_MAX_WAKEUPS_PER_SEC="${BENCH_MAX_WAKEUPS_PER_SEC:-4}" | ||
| BENCH_MAX_WAKEUPS="${BENCH_MAX_WAKEUPS:-3}" | ||
|
|
||
| BENCH_PROJECTS="$BENCH_PROJECTS" \ | ||
| BENCH_JSONLS_PER_PROJECT="$BENCH_JSONLS_PER_PROJECT" \ | ||
| BENCH_APPENDS="$BENCH_APPENDS" \ | ||
| BENCH_MAX_WAKEUPS_PER_SEC="$BENCH_MAX_WAKEUPS_PER_SEC" \ | ||
| BENCH_MAX_WAKEUPS="$BENCH_MAX_WAKEUPS" \ | ||
| bun --eval ' | ||
| import { appendFile, mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; | ||
| import { tmpdir } from "node:os"; | ||
| import { join } from "node:path"; | ||
| import { pathToFileURL } from "node:url"; | ||
|
|
||
| const projects = Number(process.env.BENCH_PROJECTS ?? 125); | ||
| const jsonlsPerProject = Number(process.env.BENCH_JSONLS_PER_PROJECT ?? 8); | ||
| const appends = Number(process.env.BENCH_APPENDS ?? 50); | ||
| const maxWakeupsPerSec = Number(process.env.BENCH_MAX_WAKEUPS_PER_SEC ?? 4); | ||
| const maxWakeups = Number(process.env.BENCH_MAX_WAKEUPS ?? 3); | ||
| const repoRoot = process.cwd(); | ||
| const { createJsonlWatcher } = await import(pathToFileURL(join(repoRoot, "src/lib/session-filewatch.ts")).href); | ||
|
|
||
| function sleep(ms) { | ||
| return new Promise((resolve) => setTimeout(resolve, ms)); | ||
| } | ||
|
|
||
| async function waitForWatcherReady(watcher) { | ||
| await new Promise((resolve, reject) => { | ||
| const timer = setTimeout(() => reject(new Error("watcher did not become ready within 10s")), 10_000); | ||
| watcher.once("ready", () => { | ||
| clearTimeout(timer); | ||
| resolve(); | ||
| }); | ||
| watcher.once("error", (err) => { | ||
| clearTimeout(timer); | ||
| reject(err); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| const tmpRoot = await mkdtemp(join(tmpdir(), "genie-filewatch-bench-")); | ||
| const projectsDir = join(tmpRoot, "projects"); | ||
|
|
||
| try { | ||
| await mkdir(projectsDir, { recursive: true }); | ||
|
|
||
| let hotPath = ""; | ||
| for (let projectIndex = 0; projectIndex < projects; projectIndex++) { | ||
| const projectDir = join(projectsDir, `project-${String(projectIndex).padStart(4, "0")}`); | ||
| await mkdir(projectDir, { recursive: true }); | ||
|
|
||
| for (let sessionIndex = 0; sessionIndex < jsonlsPerProject; sessionIndex++) { | ||
| if (sessionIndex % 2 === 0) { | ||
| const filePath = join(projectDir, `session-${sessionIndex}.jsonl`); | ||
| await writeFile(filePath, "{\"type\":\"summary\"}\n"); | ||
| hotPath ||= filePath; | ||
| } else { | ||
| const subagentDir = join(projectDir, `parent-${sessionIndex}`, "subagents"); | ||
| await mkdir(subagentDir, { recursive: true }); | ||
| await writeFile(join(subagentDir, `child-${sessionIndex}.jsonl`), "{\"type\":\"summary\"}\n"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let wakeups = 0; | ||
| const watcher = createJsonlWatcher(projectsDir, () => { | ||
| wakeups++; | ||
| }); | ||
|
|
||
| await waitForWatcherReady(watcher); | ||
|
|
||
| const start = process.hrtime.bigint(); | ||
| for (let index = 0; index < appends; index++) { | ||
| await appendFile(hotPath, `{"type":"assistant","index":${index}}\n`); | ||
| } | ||
| await sleep(1500); | ||
| const elapsedSeconds = Number(process.hrtime.bigint() - start) / 1_000_000_000; | ||
|
|
||
| await watcher.close(); | ||
|
|
||
| const wakeupsPerSec = wakeups / elapsedSeconds; | ||
| const result = { | ||
| projects, | ||
| jsonls: projects * jsonlsPerProject, | ||
| appends, | ||
| wakeups, | ||
| elapsed_seconds: Number(elapsedSeconds.toFixed(3)), | ||
| wakeups_per_sec: Number(wakeupsPerSec.toFixed(3)), | ||
| max_wakeups: maxWakeups, | ||
| max_wakeups_per_sec: maxWakeupsPerSec, | ||
| }; | ||
| console.log(JSON.stringify(result, null, 2)); | ||
|
|
||
| if (wakeups > maxWakeups || wakeupsPerSec > maxWakeupsPerSec) { | ||
| console.error("[filewatch-bench] wakeup threshold exceeded"); | ||
| process.exit(1); | ||
| } | ||
| } finally { | ||
| await rm(tmpRoot, { recursive: true, force: true }); | ||
| } | ||
| ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcoding .join('/') for path construction is not cross-platform friendly. Use path.join to ensure the correct platform-specific separator is used.