Skip to content

Commit a24c6e1

Browse files
committed
fix(kanban): address @erosika's pre-merge review (issue #16102)
Six concrete bugs + two cheap v2 extensions from the review at #16102 (comment) Larger items (structured comments as session substrate, taxonomy reorg) deferred to v2 with reply posted on the issue. Pre-merge bug fixes - unblock_task: close any stale current_run_id pointer with a reclaimed run inside the unblock txn. Defensive; the invariant holds under current data paths (block_task already closes the run) but a future or external write that leaves the pointer dangling would otherwise persist across the ready->blocked-> ready cycle. Mirrors the same pattern in claim_task + archive_task. - Migration backfill: wrap the in-flight backfill loop in write_txn and add a CAS guard (`current_run_id IS NULL`) on the pointer UPDATE, with a cleanup path that marks any orphan run row reclaimed if the CAS fails. Prevents races against a concurrent dispatcher between SELECT and INSERT. - Notifier sub leak on non-done terminals: unsub on the last delivered event's kind being terminal (completed / blocked / gave_up / crashed / timed_out), not just on task.status in (done, archived). blocked / gave_up / crashed / timed_out used to fire one ping then strand the subscription row forever. - Notifier thrashes dead chats: per-subscription send-failure counter keyed on (task_id, platform, chat_id, thread_id). After 3 consecutive adapter.send exceptions, drop the sub automatically. Counter resets on any successful send. Daemon ops visibility - run_daemon on_tick now tracks consecutive ticks where the ready queue is non-empty but 0 spawns succeeded. After 6 such ticks (default ~30s at interval=5), emits a WARN line to stderr pointing at profile health (venv, PATH, credentials) and `hermes kanban list --status blocked`. Rate-limited to one message per 5 minutes so a persistent outage doesn't spam logs. v2 extensions shipped in scope (pure upside) - build_worker_context: new "Recent work by @assignee" section surfacing the 5 most-recent completed runs for the current task's assignee (excluding this task). Bounded, cached by the natural LIMIT, no new dependencies. Skipped when the task has no assignee. - Gateway notifier message prefix: terminal pings now lead with `@<assignee>` so fleets (one chat subscribing to many tasks with different workers) stay legible at a glance. One-line template change. Deferred to v2 (noted in reply to erosika) - recompute_ready full-scan starvation at 10k+ tasks: dirty-set approach is a real refactor; fine as follow-up. - Skill ↔ assignee validation for routing: depends on skill introspection surface that isn't nailed down. - Structured comments (in_reply_to / addressed_to / kind) as multi-peer session substrate: schema-affecting, exactly the v2-scope design vulcan flagged shouldn't cram into this PR. - Pattern vs mechanism taxonomy split in docs: pure docs reorg, low urgency. Tests (+6 in core functionality) - unblock_invariant_recovery (engineered leak, defensive close) - unblock_normal_path_no_spurious_run (no run created on happy block->unblock; erosika's main concern) - migration_backfill_idempotent_under_re_run (3x init_db on a legacy-shape DB yields exactly 1 run row, not 3) - build_worker_context_includes_role_history (role continuity) - build_worker_context_role_history_skipped_when_no_assignee - build_worker_context_role_history_bounded_to_5 180/180 kanban suite pass under scripts/run_tests.sh. Live-smoke exercised all three kernel fixes end-to-end with isolated HERMES_HOME.
1 parent 7206eed commit a24c6e1

4 files changed

Lines changed: 360 additions & 40 deletions

File tree

gateway/run.py

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2490,6 +2490,23 @@ async def _kanban_notifier_watcher(self, interval: float = 5.0) -> None:
24902490
return
24912491

24922492
TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out")
2493+
# Terminal event kinds trigger automatic unsubscription — the task
2494+
# is done, blocked, or in a retry-needed state that the human
2495+
# shouldn't keep pinging a stale chat for. Previously we only
2496+
# unsubbed when task.status in ('done', 'archived'), which left
2497+
# subscriptions on 'blocked' / 'gave_up' / 'crashed' / 'timed_out'
2498+
# tasks stranded forever.
2499+
TERMINAL_EVENT_KINDS = TERMINAL_KINDS
2500+
# Per-subscription send-failure counter. Adapter.send raising
2501+
# means the chat is dead (deleted, bot kicked, etc.) — after N
2502+
# consecutive send failures the sub is dropped so we don't spin
2503+
# against a dead chat every 5 seconds forever.
2504+
MAX_SEND_FAILURES = 3
2505+
sub_fail_counts: dict[tuple, int] = getattr(
2506+
self, "_kanban_sub_fail_counts", {}
2507+
)
2508+
self._kanban_sub_fail_counts = sub_fail_counts
2509+
24932510
# Initial delay so the gateway can finish wiring adapters.
24942511
await asyncio.sleep(5)
24952512

@@ -2546,6 +2563,11 @@ def _collect():
25462563
title = (task.title if task else sub["task_id"])[:120]
25472564
for ev in d["events"]:
25482565
kind = ev.kind
2566+
# Identity prefix: attribute terminal pings to the
2567+
# worker that did the work. Makes fleets (where one
2568+
# chat subscribes to many tasks) legible at a glance.
2569+
who = (task.assignee if task and task.assignee else None)
2570+
tag = f"@{who} " if who else ""
25492571
if kind == "completed":
25502572
# Prefer the run's summary (the worker's
25512573
# intentional human-facing handoff, carried
@@ -2563,57 +2585,83 @@ def _collect():
25632585
r = task.result.strip().splitlines()[0][:160]
25642586
handoff = f"\n{r}"
25652587
msg = (
2566-
f"✔ Kanban {sub['task_id']} done"
2588+
f"✔ {tag}Kanban {sub['task_id']} done"
25672589
f" — {title}{handoff}"
25682590
)
25692591
elif kind == "blocked":
25702592
reason = ""
25712593
if ev.payload and ev.payload.get("reason"):
25722594
reason = f": {str(ev.payload['reason'])[:160]}"
2573-
msg = f"⏸ Kanban {sub['task_id']} blocked{reason}"
2595+
msg = f"⏸ {tag}Kanban {sub['task_id']} blocked{reason}"
25742596
elif kind == "gave_up":
25752597
err = ""
25762598
if ev.payload and ev.payload.get("error"):
25772599
err = f"\n{str(ev.payload['error'])[:200]}"
25782600
msg = (
2579-
f"✖ Kanban {sub['task_id']} gave up "
2601+
f"✖ {tag}Kanban {sub['task_id']} gave up "
25802602
f"after repeated spawn failures{err}"
25812603
)
25822604
elif kind == "crashed":
25832605
msg = (
2584-
f"✖ Kanban {sub['task_id']} worker crashed "
2606+
f"✖ {tag}Kanban {sub['task_id']} worker crashed "
25852607
f"(pid gone); dispatcher will retry"
25862608
)
25872609
elif kind == "timed_out":
25882610
limit = 0
25892611
if ev.payload and ev.payload.get("limit_seconds"):
25902612
limit = int(ev.payload["limit_seconds"])
25912613
msg = (
2592-
f"⏱ Kanban {sub['task_id']} timed out "
2614+
f"⏱ {tag}Kanban {sub['task_id']} timed out "
25932615
f"(max_runtime={limit}s); will retry"
25942616
)
25952617
else:
25962618
continue
25972619
metadata: dict[str, Any] = {}
25982620
if sub.get("thread_id"):
25992621
metadata["thread_id"] = sub["thread_id"]
2622+
sub_key = (
2623+
sub["task_id"], sub["platform"],
2624+
sub["chat_id"], sub.get("thread_id") or "",
2625+
)
26002626
try:
26012627
await adapter.send(
26022628
sub["chat_id"], msg, metadata=metadata,
26032629
)
2630+
# Reset the failure counter on success.
2631+
sub_fail_counts.pop(sub_key, None)
26042632
except Exception as exc:
2633+
fails = sub_fail_counts.get(sub_key, 0) + 1
2634+
sub_fail_counts[sub_key] = fails
26052635
logger.warning(
2606-
"kanban notifier: send failed for %s on %s: %s",
2607-
sub["task_id"], platform_str, exc,
2636+
"kanban notifier: send failed for %s on %s "
2637+
"(attempt %d/%d): %s",
2638+
sub["task_id"], platform_str, fails,
2639+
MAX_SEND_FAILURES, exc,
26082640
)
2641+
if fails >= MAX_SEND_FAILURES:
2642+
logger.warning(
2643+
"kanban notifier: dropping subscription "
2644+
"%s on %s after %d consecutive send failures",
2645+
sub["task_id"], platform_str, fails,
2646+
)
2647+
await asyncio.to_thread(self._kanban_unsub, sub)
2648+
sub_fail_counts.pop(sub_key, None)
26092649
# Don't advance cursor on send failure — retry next tick.
26102650
break
26112651
else:
26122652
# All events delivered; advance cursor + maybe unsub.
26132653
await asyncio.to_thread(
26142654
self._kanban_advance, sub, d["cursor"],
26152655
)
2616-
if task and task.status in ("done", "archived"):
2656+
# Unsubscribe when the LAST delivered event is a
2657+
# terminal kind (the task hit a "no further updates"
2658+
# state), not just on task.status in {done, archived}.
2659+
# Covers blocked / gave_up / crashed / timed_out which
2660+
# used to leak subs forever.
2661+
last_kind = d["events"][-1].kind if d["events"] else None
2662+
task_terminal = task and task.status in ("done", "archived")
2663+
event_terminal = last_kind in TERMINAL_EVENT_KINDS
2664+
if task_terminal or event_terminal:
26172665
await asyncio.to_thread(
26182666
self._kanban_unsub, sub,
26192667
)

hermes_cli/kanban.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,38 @@ def _cmd_daemon(args: argparse.Namespace) -> int:
924924
print(f"Kanban dispatcher running (interval={args.interval}s, pid={os.getpid()}). "
925925
f"Ctrl-C to stop.")
926926

927+
# Health telemetry: warn when every tick finds ready work but fails to
928+
# spawn any worker. Catches broken profiles, PATH drift, missing venv,
929+
# credential loss — cases where the per-task circuit breaker auto-blocks
930+
# each task quietly but the operator has no signal that the dispatcher
931+
# itself is dysfunctional.
932+
HEALTH_WINDOW = 6 # ticks (default 30s at interval=5)
933+
health_state = {"bad_ticks": 0, "last_warn_at": 0}
934+
927935
def _on_tick(res):
936+
ready_pending = bool(res.skipped_unassigned) or _ready_queue_nonempty()
937+
spawned_any = bool(res.spawned)
938+
if ready_pending and not spawned_any:
939+
health_state["bad_ticks"] += 1
940+
else:
941+
health_state["bad_ticks"] = 0
942+
# Emit a warning once per HEALTH_WINDOW bad ticks (not every tick)
943+
# so log volume stays bounded while the problem persists.
944+
if health_state["bad_ticks"] >= HEALTH_WINDOW:
945+
now = int(time.time())
946+
# Rate-limit repeats: at most one warning per 5 minutes.
947+
if now - health_state["last_warn_at"] >= 300:
948+
print(
949+
f"[{_fmt_ts(now)}] WARN dispatcher stuck: "
950+
f"ready queue non-empty for {health_state['bad_ticks']} "
951+
f"consecutive ticks but 0 workers spawned successfully. "
952+
f"Check profile health (venv, PATH, credentials) and "
953+
f"`hermes kanban list --status ready` / "
954+
f"`hermes kanban list --status blocked` for recent "
955+
f"spawn_failed tasks.",
956+
file=sys.stderr, flush=True,
957+
)
958+
health_state["last_warn_at"] = now
928959
if not verbose:
929960
return
930961
did_work = (
@@ -941,6 +972,20 @@ def _on_tick(res):
941972
flush=True,
942973
)
943974

975+
def _ready_queue_nonempty() -> bool:
976+
"""Cheap SELECT — just asks whether there's at least one ready
977+
task with an assignee that the dispatcher could have picked up."""
978+
try:
979+
with kb.connect() as conn:
980+
row = conn.execute(
981+
"SELECT 1 FROM tasks "
982+
"WHERE status = 'ready' AND assignee IS NOT NULL "
983+
" AND claim_lock IS NULL LIMIT 1"
984+
).fetchone()
985+
return row is not None
986+
except Exception:
987+
return False
988+
944989
try:
945990
kb.run_daemon(
946991
interval=args.interval,

hermes_cli/kanban_db.py

Lines changed: 100 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -436,40 +436,56 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
436436
# One-shot backfill: any task that is 'running' before runs existed
437437
# had its claim_lock / claim_expires / worker_pid on the task row.
438438
# Synthesize a matching task_runs row so subsequent end-run / heartbeat
439-
# calls have something to write to. Safe to re-run: the check below
440-
# skips tasks that already have a current_run_id.
439+
# calls have something to write to. Wrapped in write_txn to serialize
440+
# against any concurrent dispatcher, and the per-row UPDATE uses
441+
# ``current_run_id IS NULL`` as a CAS guard so a racing claim can't
442+
# produce an orphaned row if it interleaves with the backfill pass.
441443
runs_exist = conn.execute(
442444
"SELECT name FROM sqlite_master WHERE type='table' AND name='task_runs'"
443445
).fetchone() is not None
444446
if runs_exist:
445-
inflight = conn.execute(
446-
"SELECT id, assignee, claim_lock, claim_expires, worker_pid, "
447-
" max_runtime_seconds, last_heartbeat_at, started_at "
448-
"FROM tasks "
449-
"WHERE status = 'running' AND (current_run_id IS NULL)"
450-
).fetchall()
451-
for row in inflight:
452-
started = row["started_at"] or int(time.time())
453-
cur = conn.execute(
454-
"""
455-
INSERT INTO task_runs (
456-
task_id, profile, status,
457-
claim_lock, claim_expires, worker_pid,
458-
max_runtime_seconds, last_heartbeat_at,
459-
started_at
460-
) VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?)
461-
""",
462-
(
463-
row["id"], row["assignee"], row["claim_lock"],
464-
row["claim_expires"], row["worker_pid"],
465-
row["max_runtime_seconds"], row["last_heartbeat_at"],
466-
started,
467-
),
468-
)
469-
conn.execute(
470-
"UPDATE tasks SET current_run_id = ? WHERE id = ?",
471-
(cur.lastrowid, row["id"]),
472-
)
447+
with write_txn(conn):
448+
inflight = conn.execute(
449+
"SELECT id, assignee, claim_lock, claim_expires, worker_pid, "
450+
" max_runtime_seconds, last_heartbeat_at, started_at "
451+
"FROM tasks "
452+
"WHERE status = 'running' AND current_run_id IS NULL"
453+
).fetchall()
454+
for row in inflight:
455+
started = row["started_at"] or int(time.time())
456+
cur = conn.execute(
457+
"""
458+
INSERT INTO task_runs (
459+
task_id, profile, status,
460+
claim_lock, claim_expires, worker_pid,
461+
max_runtime_seconds, last_heartbeat_at,
462+
started_at
463+
) VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?)
464+
""",
465+
(
466+
row["id"], row["assignee"], row["claim_lock"],
467+
row["claim_expires"], row["worker_pid"],
468+
row["max_runtime_seconds"], row["last_heartbeat_at"],
469+
started,
470+
),
471+
)
472+
# CAS: only install the pointer if nothing else claimed
473+
# the task between our SELECT and here (shouldn't happen
474+
# under the write_txn, but belt-and-suspenders). If the
475+
# CAS fails we've got an orphan run_row — mark it
476+
# reclaimed so it doesn't look in-flight.
477+
upd = conn.execute(
478+
"UPDATE tasks SET current_run_id = ? "
479+
"WHERE id = ? AND current_run_id IS NULL",
480+
(cur.lastrowid, row["id"]),
481+
)
482+
if upd.rowcount != 1:
483+
conn.execute(
484+
"UPDATE task_runs SET status = 'reclaimed', "
485+
" outcome = 'reclaimed', ended_at = ? "
486+
"WHERE id = ?",
487+
(int(time.time()), cur.lastrowid),
488+
)
473489

474490
# One-shot event-kind rename pass. The old names ("ready", "priority",
475491
# "spawn_auto_blocked") still worked but were awkward on the wire;
@@ -1356,10 +1372,36 @@ def block_task(
13561372

13571373

13581374
def unblock_task(conn: sqlite3.Connection, task_id: str) -> bool:
1359-
"""Transition ``blocked -> ready``."""
1375+
"""Transition ``blocked -> ready``.
1376+
1377+
Defensively closes any stale ``current_run_id`` pointer before flipping
1378+
status. In the common path (``block_task`` closed the run already) this
1379+
is a no-op. If a future or external write left the pointer dangling,
1380+
the leaked run is closed as ``reclaimed`` inside the same txn so the
1381+
runs invariant (``current_run_id IS NULL`` ⇔ run row in terminal
1382+
state) holds for the rest of this function's lifetime.
1383+
"""
1384+
now = int(time.time())
13601385
with write_txn(conn):
1386+
stale = conn.execute(
1387+
"SELECT current_run_id FROM tasks WHERE id = ? AND status = 'blocked'",
1388+
(task_id,),
1389+
).fetchone()
1390+
if stale and stale["current_run_id"]:
1391+
conn.execute(
1392+
"""
1393+
UPDATE task_runs
1394+
SET status = 'reclaimed', outcome = 'reclaimed',
1395+
summary = COALESCE(summary, 'invariant recovery on unblock'),
1396+
ended_at = ?,
1397+
claim_lock = NULL, claim_expires = NULL, worker_pid = NULL
1398+
WHERE id = ? AND ended_at IS NULL
1399+
""",
1400+
(now, int(stale["current_run_id"])),
1401+
)
13611402
cur = conn.execute(
1362-
"UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'blocked'",
1403+
"UPDATE tasks SET status = 'ready', current_run_id = NULL "
1404+
"WHERE id = ? AND status = 'blocked'",
13631405
(task_id,),
13641406
)
13651407
if cur.rowcount != 1:
@@ -2100,6 +2142,32 @@ def build_worker_context(conn: sqlite3.Connection, task_id: str) -> str:
21002142
lines.extend(body_lines)
21012143
lines.append("")
21022144

2145+
# Cross-task role history: what else has THIS assignee completed
2146+
# recently? Gives the worker implicit continuity — "I'm the reviewer
2147+
# and my last three reviews focused on security" — without forcing
2148+
# the user to wire anything into SOUL.md / MEMORY.md. Bounded to the
2149+
# most recent 5 completed runs, excluding this task so the retry
2150+
# section above isn't duplicated. Safe on assignee=None (skipped).
2151+
if task.assignee:
2152+
role_rows = conn.execute(
2153+
"SELECT t.id, t.title, r.summary, r.ended_at "
2154+
"FROM task_runs r JOIN tasks t ON r.task_id = t.id "
2155+
"WHERE r.profile = ? AND r.task_id != ? "
2156+
" AND r.outcome = 'completed' "
2157+
"ORDER BY r.ended_at DESC LIMIT 5",
2158+
(task.assignee, task_id),
2159+
).fetchall()
2160+
if role_rows:
2161+
lines.append(f"## Recent work by @{task.assignee}")
2162+
for row in role_rows:
2163+
ts = time.strftime(
2164+
"%Y-%m-%d %H:%M", time.localtime(int(row["ended_at"]))
2165+
)
2166+
s = (row["summary"] or "").strip().splitlines()
2167+
first = s[0][:200] if s else "(no summary)"
2168+
lines.append(f"- {row['id']}{row['title']} ({ts}): {first}")
2169+
lines.append("")
2170+
21032171
comments = list_comments(conn, task_id)
21042172
if comments:
21052173
lines.append("## Comment thread")

0 commit comments

Comments
 (0)