Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2511,7 +2511,12 @@ async def _notify_active_sessions_of_shutdown(self) -> None:
platform_str, chat_id, e,
)

for platform, adapter in self.adapters.items():
# Snapshot adapters up front: adapter.send() can hit a fatal error
# path that pops the adapter from self.adapters (see _handle_fatal
# elsewhere), which would otherwise trigger
# ``RuntimeError: dictionary changed size during iteration`` —
# observed in a user report during gateway shutdown.
for platform, adapter in list(self.adapters.items()):
home = self.config.get_home_channel(platform)
if not home or not home.chat_id:
continue
Expand Down
168 changes: 155 additions & 13 deletions hermes_cli/kanban_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,77 @@ class DispatchResult:
"""Task ids whose workers exceeded ``max_runtime_seconds``."""


# Bounded registry of recently-reaped worker child exits, populated by the
# reap loop at the top of ``dispatch_once`` and consulted by
# ``detect_crashed_workers`` to classify a dead-pid task.
#
# Entry: ``pid -> (raw_wait_status, reaped_at_epoch)``. We keep raw status
# so both ``os.WIFEXITED`` / ``os.WEXITSTATUS`` and ``os.WIFSIGNALED`` can
# be consulted. Entries are trimmed by age (and total size cap as a
# belt-and-braces against unbounded growth on exotic platforms).
_RECENT_WORKER_EXIT_TTL_SECONDS = 600
_RECENT_WORKER_EXITS_MAX = 4096
_recent_worker_exits: "dict[int, tuple[int, float]]" = {}


def _record_worker_exit(pid: int, raw_status: int) -> None:
"""Record a reaped child's exit status for later classification.

Called from the reap loop in ``dispatch_once``. Safe to call many
times; duplicate pids overwrite (pids can cycle, latest wins).
"""
if not pid or pid <= 0:
return
now = time.time()
_recent_worker_exits[int(pid)] = (int(raw_status), now)
# Age-based trim: drop entries older than the TTL.
if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX // 2:
cutoff = now - _RECENT_WORKER_EXIT_TTL_SECONDS
for _pid in [p for p, (_s, t) in _recent_worker_exits.items() if t < cutoff]:
_recent_worker_exits.pop(_pid, None)
# Size cap as a final guard.
if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX:
# Drop oldest half.
ordered = sorted(_recent_worker_exits.items(), key=lambda kv: kv[1][1])
for _pid, _ in ordered[: len(ordered) // 2]:
_recent_worker_exits.pop(_pid, None)


def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
"""Classify a recently-reaped worker by pid.

Returns ``(kind, code)`` where ``kind`` is one of:

* ``"clean_exit"`` — ``WIFEXITED`` with ``WEXITSTATUS == 0``. When the
task is still ``running`` in the DB, this is a protocol violation
(worker exited without calling ``kanban_complete`` / ``kanban_block``)
and should be auto-blocked immediately — retrying will just loop.
* ``"nonzero_exit"`` — ``WIFEXITED`` with non-zero status. Real error.
* ``"signaled"`` — ``WIFSIGNALED`` (OOM killer, SIGKILL, etc). Real crash.
* ``"unknown"`` — pid was not in the reap registry (either reaped by
something else, or died between reap tick and liveness check). Fall
back to existing crashed-counter behavior.

``code`` is the exit status (for ``clean_exit`` / ``nonzero_exit``) or
the signal number (for ``signaled``), or ``None`` for ``unknown``.
"""
entry = _recent_worker_exits.get(int(pid))
if entry is None:
return ("unknown", None)
raw, _ = entry
try:
if os.WIFEXITED(raw):
code = os.WEXITSTATUS(raw)
if code == 0:
return ("clean_exit", 0)
return ("nonzero_exit", code)
if os.WIFSIGNALED(raw):
return ("signaled", os.WTERMSIG(raw))
except Exception:
pass
return ("unknown", None)


def _pid_alive(pid: Optional[int]) -> bool:
"""Return True if ``pid`` is still running on this host.

Expand Down Expand Up @@ -2924,12 +2995,22 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
are meaningless here. The host-local check is enough because
``_default_spawn`` always runs the worker on the same host as the
dispatcher (the whole design is single-host).

When the reap registry shows the worker exited cleanly (rc=0) but
the task was still ``running`` in the DB, treat it as a protocol
violation (worker answered conversationally without calling
``kanban_complete`` / ``kanban_block``) and trip the circuit breaker
on the first occurrence — retrying a worker whose CLI keeps
returning 0 without a terminal transition just loops forever.
"""
crashed: list[str] = []
# Per-crash details collected inside the main txn, used after it
# closes to run ``_record_task_failure`` (which needs its own
# write_txn so can't nest).
crash_details: list[tuple[str, int, str]] = [] # (task_id, pid, claimer)
# write_txn so can't nest). ``protocol_violation`` flags the
# clean-exit-but-still-running case so we can trip the breaker
# immediately instead of incrementing by 1.
crash_details: list[tuple[str, int, str, bool, str]] = []
# (task_id, pid, claimer, protocol_violation, error_text)
with write_txn(conn):
rows = conn.execute(
"SELECT id, worker_pid, claim_lock FROM tasks "
Expand All @@ -2943,6 +3024,39 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
continue
if _pid_alive(row["worker_pid"]):
continue

pid = int(row["worker_pid"])
kind, code = _classify_worker_exit(pid)
if kind == "clean_exit":
# Worker subprocess returned 0 but its task is still
# ``running`` in the DB — it exited without calling
# ``kanban_complete`` / ``kanban_block``. Retrying won't
# help.
protocol_violation = True
error_text = (
"worker exited cleanly (rc=0) without calling "
"kanban_complete or kanban_block — protocol violation"
)
event_kind = "protocol_violation"
event_payload = {
"pid": pid,
"claimer": row["claim_lock"],
"exit_code": code,
}
else:
protocol_violation = False
if kind == "nonzero_exit":
error_text = f"pid {pid} exited with code {code}"
elif kind == "signaled":
error_text = f"pid {pid} killed by signal {code}"
else:
error_text = f"pid {pid} not alive"
event_kind = "crashed"
event_payload = {"pid": pid, "claimer": row["claim_lock"]}
if code is not None and kind != "unknown":
event_payload["exit_kind"] = kind
event_payload["exit_code"] = code

cur = conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL "
Expand All @@ -2953,34 +3067,47 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
run_id = _end_run(
conn, row["id"],
outcome="crashed", status="crashed",
error=f"pid {int(row['worker_pid'])} not alive",
metadata={
"pid": int(row["worker_pid"]),
"claimer": row["claim_lock"],
},
error=error_text,
metadata=dict(event_payload),
)
_append_event(
conn, row["id"], "crashed",
{"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]},
conn, row["id"], event_kind,
event_payload,
run_id=run_id,
)
crashed.append(row["id"])
crash_details.append(
(row["id"], int(row["worker_pid"]), row["claim_lock"])
(row["id"], pid, row["claim_lock"],
protocol_violation, error_text)
)
# Outside the main txn: increment the unified failure counter for
# each crashed task. If the breaker trips, the task transitions
# ready → blocked with a ``gave_up`` event on top of the ``crashed``
# event we already emitted.
for tid, pid, claimer in crash_details:
_record_task_failure(
#
# Protocol-violation crashes force an immediate trip (failure_limit=1)
# because clean-exit-without-transition is deterministic: the next
# respawn will do exactly the same thing. Better to surface to a
# human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT``
# times first.
auto_blocked: list[str] = []
for tid, pid, claimer, protocol_violation, error_text in crash_details:
tripped = _record_task_failure(
conn, tid,
error=f"pid {pid} not alive",
error=error_text,
outcome="crashed",
failure_limit=(1 if protocol_violation else None),
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "claimer": claimer},
)
if tripped:
auto_blocked.append(tid)
# Stash auto-blocked ids on the function for the dispatch loop to pick up.
# Keeps the public return type (``list[str]``) stable for direct callers
# and tests that destructure the result; ``dispatch_once`` reads this
# side-channel attribute to populate ``DispatchResult.auto_blocked``.
detect_crashed_workers._last_auto_blocked = auto_blocked # type: ignore[attr-defined]
return crashed


Expand Down Expand Up @@ -3242,6 +3369,12 @@ def dispatch_once(
# exit. WNOHANG keeps this non-blocking; ChildProcessError means no
# children to reap. Bounded: at most one tick's worth of completions
# can be in <defunct> at once.
#
# We also record the exit status keyed by pid, so
# ``detect_crashed_workers`` can distinguish a worker that exited
# cleanly without calling ``kanban_complete`` / ``kanban_block``
# (protocol violation — auto-block) from a real crash (OOM killer,
# SIGKILL, non-zero exit — existing counter behavior).
try:
while True:
try:
Expand All @@ -3250,12 +3383,21 @@ def dispatch_once(
break
if _pid == 0:
break
_record_worker_exit(_pid, _status)
except Exception:
pass

result = DispatchResult()
result.reclaimed = release_stale_claims(conn)
result.crashed = detect_crashed_workers(conn)
# detect_crashed_workers stashes protocol-violation auto-blocks on
# itself so the public list-return stays stable. Pull them into the
# DispatchResult here so telemetry / tests see the trip.
_crash_auto_blocked = getattr(
detect_crashed_workers, "_last_auto_blocked", []
)
if _crash_auto_blocked:
result.auto_blocked.extend(_crash_auto_blocked)
result.timed_out = enforce_max_runtime(conn)
result.promoted = recompute_ready(conn)

Expand Down
94 changes: 94 additions & 0 deletions tests/hermes_cli/test_kanban_core_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -3636,6 +3636,100 @@ def test_detect_crashed_workers_increments_counter(kanban_home):
conn.close()


def test_detect_crashed_workers_protocol_violation_auto_blocks(kanban_home):
"""A worker that exited rc=0 while its task was still ``running``
is a protocol violation (agent answered conversationally without
calling kanban_complete / kanban_block). Retrying will just loop,
so auto-block immediately instead of waiting for the breaker to
trip at ``DEFAULT_FAILURE_LIMIT``.

Regression test for the respawn-loop-after-completion bug reported
against small local models (gemma4-e2b q4) where the model writes
the answer as plain text and the CLI exits rc=0 cleanly.
"""
import hermes_cli.kanban_db as _kb
conn = kb.connect()
try:
tid = kb.create_task(conn, title="quiet", assignee="worker")
host_prefix = _kb._claimer_id().split(":", 1)[0]
lock = f"{host_prefix}:mock"
kb.claim_task(conn, tid, claimer=lock)
fake_pid = 999998
kb._set_worker_pid(conn, tid, fake_pid)

# Simulate the reap loop having recorded a clean exit for this pid.
# os.W_EXITCODE(status=0, signal=0) == 0 on POSIX.
_kb._record_worker_exit(fake_pid, 0)
# Force liveness check to say "dead" for the fake pid.
original_alive = _kb._pid_alive
_kb._pid_alive = lambda p: False
try:
result_crashed = kb.detect_crashed_workers(conn)
finally:
_kb._pid_alive = original_alive

assert tid in result_crashed, "should be detected as crashed"
task = kb.get_task(conn, tid)
assert task.status == "blocked", (
f"protocol violation should auto-block on first occurrence, "
f"got status={task.status}"
)
assert "kanban_complete" in (task.last_failure_error or ""), (
f"expected protocol-violation message, got {task.last_failure_error!r}"
)

events = kb.list_events(conn, tid)
kinds = [e.kind for e in events]
assert "protocol_violation" in kinds, (
f"expected 'protocol_violation' event, got {kinds}"
)
# The ``crashed`` event would be misleading here — the worker
# didn't crash, it returned 0.
assert "crashed" not in kinds, (
f"should NOT emit 'crashed' event on clean exit, got {kinds}"
)
assert "gave_up" in kinds, (
f"breaker should trip, expected 'gave_up' event, got {kinds}"
)
finally:
conn.close()


def test_detect_crashed_workers_nonzero_exit_uses_default_limit(kanban_home):
"""A worker that exited non-zero (real error / crash) uses the
normal counter path — one failure doesn't trip the breaker.
"""
import hermes_cli.kanban_db as _kb
conn = kb.connect()
try:
tid = kb.create_task(conn, title="crashy", assignee="worker")
host_prefix = _kb._claimer_id().split(":", 1)[0]
kb.claim_task(conn, tid, claimer=f"{host_prefix}:mock")
fake_pid = 999997
kb._set_worker_pid(conn, tid, fake_pid)

# W_EXITCODE(1, 0) == 256 — WIFEXITED True, WEXITSTATUS == 1.
_kb._record_worker_exit(fake_pid, 256)
original_alive = _kb._pid_alive
_kb._pid_alive = lambda p: False
try:
kb.detect_crashed_workers(conn)
finally:
_kb._pid_alive = original_alive

task = kb.get_task(conn, tid)
assert task.status == "ready", (
f"single non-zero crash shouldn't auto-block, got {task.status}"
)
assert task.consecutive_failures == 1
events = kb.list_events(conn, tid)
kinds = [e.kind for e in events]
assert "crashed" in kinds
assert "protocol_violation" not in kinds
finally:
conn.close()


def test_reclaim_task_clears_failure_counter(kanban_home):
"""Operator reclaim wipes the counter so the next retry gets a fresh
budget."""
Expand Down
Loading