@@ -2618,6 +2618,77 @@ class DispatchResult:
26182618 """Task ids whose workers exceeded ``max_runtime_seconds``."""
26192619
26202620
2621+ # Bounded registry of recently-reaped worker child exits, populated by the
2622+ # reap loop at the top of ``dispatch_once`` and consulted by
2623+ # ``detect_crashed_workers`` to classify a dead-pid task.
2624+ #
2625+ # Entry: ``pid -> (raw_wait_status, reaped_at_epoch)``. We keep raw status
2626+ # so both ``os.WIFEXITED`` / ``os.WEXITSTATUS`` and ``os.WIFSIGNALED`` can
2627+ # be consulted. Entries are trimmed by age (and total size cap as a
2628+ # belt-and-braces against unbounded growth on exotic platforms).
2629+ _RECENT_WORKER_EXIT_TTL_SECONDS = 600
2630+ _RECENT_WORKER_EXITS_MAX = 4096
2631+ _recent_worker_exits : "dict[int, tuple[int, float]]" = {}
2632+
2633+
2634+ def _record_worker_exit (pid : int , raw_status : int ) -> None :
2635+ """Record a reaped child's exit status for later classification.
2636+
2637+ Called from the reap loop in ``dispatch_once``. Safe to call many
2638+ times; duplicate pids overwrite (pids can cycle, latest wins).
2639+ """
2640+ if not pid or pid <= 0 :
2641+ return
2642+ now = time .time ()
2643+ _recent_worker_exits [int (pid )] = (int (raw_status ), now )
2644+ # Age-based trim: drop entries older than the TTL.
2645+ if len (_recent_worker_exits ) > _RECENT_WORKER_EXITS_MAX // 2 :
2646+ cutoff = now - _RECENT_WORKER_EXIT_TTL_SECONDS
2647+ for _pid in [p for p , (_s , t ) in _recent_worker_exits .items () if t < cutoff ]:
2648+ _recent_worker_exits .pop (_pid , None )
2649+ # Size cap as a final guard.
2650+ if len (_recent_worker_exits ) > _RECENT_WORKER_EXITS_MAX :
2651+ # Drop oldest half.
2652+ ordered = sorted (_recent_worker_exits .items (), key = lambda kv : kv [1 ][1 ])
2653+ for _pid , _ in ordered [: len (ordered ) // 2 ]:
2654+ _recent_worker_exits .pop (_pid , None )
2655+
2656+
2657+ def _classify_worker_exit (pid : int ) -> "tuple[str, Optional[int]]" :
2658+ """Classify a recently-reaped worker by pid.
2659+
2660+ Returns ``(kind, code)`` where ``kind`` is one of:
2661+
2662+ * ``"clean_exit"`` — ``WIFEXITED`` with ``WEXITSTATUS == 0``. When the
2663+ task is still ``running`` in the DB, this is a protocol violation
2664+ (worker exited without calling ``kanban_complete`` / ``kanban_block``)
2665+ and should be auto-blocked immediately — retrying will just loop.
2666+ * ``"nonzero_exit"`` — ``WIFEXITED`` with non-zero status. Real error.
2667+ * ``"signaled"`` — ``WIFSIGNALED`` (OOM killer, SIGKILL, etc). Real crash.
2668+ * ``"unknown"`` — pid was not in the reap registry (either reaped by
2669+ something else, or died between reap tick and liveness check). Fall
2670+ back to existing crashed-counter behavior.
2671+
2672+ ``code`` is the exit status (for ``clean_exit`` / ``nonzero_exit``) or
2673+ the signal number (for ``signaled``), or ``None`` for ``unknown``.
2674+ """
2675+ entry = _recent_worker_exits .get (int (pid ))
2676+ if entry is None :
2677+ return ("unknown" , None )
2678+ raw , _ = entry
2679+ try :
2680+ if os .WIFEXITED (raw ):
2681+ code = os .WEXITSTATUS (raw )
2682+ if code == 0 :
2683+ return ("clean_exit" , 0 )
2684+ return ("nonzero_exit" , code )
2685+ if os .WIFSIGNALED (raw ):
2686+ return ("signaled" , os .WTERMSIG (raw ))
2687+ except Exception :
2688+ pass
2689+ return ("unknown" , None )
2690+
2691+
26212692def _pid_alive (pid : Optional [int ]) -> bool :
26222693 """Return True if ``pid`` is still running on this host.
26232694
@@ -2924,12 +2995,22 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
29242995 are meaningless here. The host-local check is enough because
29252996 ``_default_spawn`` always runs the worker on the same host as the
29262997 dispatcher (the whole design is single-host).
2998+
2999+ When the reap registry shows the worker exited cleanly (rc=0) but
3000+ the task was still ``running`` in the DB, treat it as a protocol
3001+ violation (worker answered conversationally without calling
3002+ ``kanban_complete`` / ``kanban_block``) and trip the circuit breaker
3003+ on the first occurrence — retrying a worker whose CLI keeps
3004+ returning 0 without a terminal transition just loops forever.
29273005 """
29283006 crashed : list [str ] = []
29293007 # Per-crash details collected inside the main txn, used after it
29303008 # closes to run ``_record_task_failure`` (which needs its own
2931- # write_txn so can't nest).
2932- crash_details : list [tuple [str , int , str ]] = [] # (task_id, pid, claimer)
3009+ # write_txn so can't nest). ``protocol_violation`` flags the
3010+ # clean-exit-but-still-running case so we can trip the breaker
3011+ # immediately instead of incrementing by 1.
3012+ crash_details : list [tuple [str , int , str , bool , str ]] = []
3013+ # (task_id, pid, claimer, protocol_violation, error_text)
29333014 with write_txn (conn ):
29343015 rows = conn .execute (
29353016 "SELECT id, worker_pid, claim_lock FROM tasks "
@@ -2943,6 +3024,39 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
29433024 continue
29443025 if _pid_alive (row ["worker_pid" ]):
29453026 continue
3027+
3028+ pid = int (row ["worker_pid" ])
3029+ kind , code = _classify_worker_exit (pid )
3030+ if kind == "clean_exit" :
3031+ # Worker subprocess returned 0 but its task is still
3032+ # ``running`` in the DB — it exited without calling
3033+ # ``kanban_complete`` / ``kanban_block``. Retrying won't
3034+ # help.
3035+ protocol_violation = True
3036+ error_text = (
3037+ "worker exited cleanly (rc=0) without calling "
3038+ "kanban_complete or kanban_block — protocol violation"
3039+ )
3040+ event_kind = "protocol_violation"
3041+ event_payload = {
3042+ "pid" : pid ,
3043+ "claimer" : row ["claim_lock" ],
3044+ "exit_code" : code ,
3045+ }
3046+ else :
3047+ protocol_violation = False
3048+ if kind == "nonzero_exit" :
3049+ error_text = f"pid { pid } exited with code { code } "
3050+ elif kind == "signaled" :
3051+ error_text = f"pid { pid } killed by signal { code } "
3052+ else :
3053+ error_text = f"pid { pid } not alive"
3054+ event_kind = "crashed"
3055+ event_payload = {"pid" : pid , "claimer" : row ["claim_lock" ]}
3056+ if code is not None and kind != "unknown" :
3057+ event_payload ["exit_kind" ] = kind
3058+ event_payload ["exit_code" ] = code
3059+
29463060 cur = conn .execute (
29473061 "UPDATE tasks SET status = 'ready', claim_lock = NULL, "
29483062 "claim_expires = NULL, worker_pid = NULL "
@@ -2953,34 +3067,47 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
29533067 run_id = _end_run (
29543068 conn , row ["id" ],
29553069 outcome = "crashed" , status = "crashed" ,
2956- error = f"pid { int (row ['worker_pid' ])} not alive" ,
2957- metadata = {
2958- "pid" : int (row ["worker_pid" ]),
2959- "claimer" : row ["claim_lock" ],
2960- },
3070+ error = error_text ,
3071+ metadata = dict (event_payload ),
29613072 )
29623073 _append_event (
2963- conn , row ["id" ], "crashed" ,
2964- { "pid" : int ( row [ "worker_pid" ]), "claimer" : row [ "claim_lock" ]} ,
3074+ conn , row ["id" ], event_kind ,
3075+ event_payload ,
29653076 run_id = run_id ,
29663077 )
29673078 crashed .append (row ["id" ])
29683079 crash_details .append (
2969- (row ["id" ], int (row ["worker_pid" ]), row ["claim_lock" ])
3080+ (row ["id" ], pid , row ["claim_lock" ],
3081+ protocol_violation , error_text )
29703082 )
29713083 # Outside the main txn: increment the unified failure counter for
29723084 # each crashed task. If the breaker trips, the task transitions
29733085 # ready → blocked with a ``gave_up`` event on top of the ``crashed``
29743086 # event we already emitted.
2975- for tid , pid , claimer in crash_details :
2976- _record_task_failure (
3087+ #
3088+ # Protocol-violation crashes force an immediate trip (failure_limit=1)
3089+ # because clean-exit-without-transition is deterministic: the next
3090+ # respawn will do exactly the same thing. Better to surface to a
3091+ # human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT``
3092+ # times first.
3093+ auto_blocked : list [str ] = []
3094+ for tid , pid , claimer , protocol_violation , error_text in crash_details :
3095+ tripped = _record_task_failure (
29773096 conn , tid ,
2978- error = f"pid { pid } not alive" ,
3097+ error = error_text ,
29793098 outcome = "crashed" ,
3099+ failure_limit = (1 if protocol_violation else None ),
29803100 release_claim = False ,
29813101 end_run = False ,
29823102 event_payload_extra = {"pid" : pid , "claimer" : claimer },
29833103 )
3104+ if tripped :
3105+ auto_blocked .append (tid )
3106+ # Stash auto-blocked ids on the function for the dispatch loop to pick up.
3107+ # Keeps the public return type (``list[str]``) stable for direct callers
3108+ # and tests that destructure the result; ``dispatch_once`` reads this
3109+ # side-channel attribute to populate ``DispatchResult.auto_blocked``.
3110+ detect_crashed_workers ._last_auto_blocked = auto_blocked # type: ignore[attr-defined]
29843111 return crashed
29853112
29863113
@@ -3242,6 +3369,12 @@ def dispatch_once(
32423369 # exit. WNOHANG keeps this non-blocking; ChildProcessError means no
32433370 # children to reap. Bounded: at most one tick's worth of completions
32443371 # can be in <defunct> at once.
3372+ #
3373+ # We also record the exit status keyed by pid, so
3374+ # ``detect_crashed_workers`` can distinguish a worker that exited
3375+ # cleanly without calling ``kanban_complete`` / ``kanban_block``
3376+ # (protocol violation — auto-block) from a real crash (OOM killer,
3377+ # SIGKILL, non-zero exit — existing counter behavior).
32453378 try :
32463379 while True :
32473380 try :
@@ -3250,12 +3383,21 @@ def dispatch_once(
32503383 break
32513384 if _pid == 0 :
32523385 break
3386+ _record_worker_exit (_pid , _status )
32533387 except Exception :
32543388 pass
32553389
32563390 result = DispatchResult ()
32573391 result .reclaimed = release_stale_claims (conn )
32583392 result .crashed = detect_crashed_workers (conn )
3393+ # detect_crashed_workers stashes protocol-violation auto-blocks on
3394+ # itself so the public list-return stays stable. Pull them into the
3395+ # DispatchResult here so telemetry / tests see the trip.
3396+ _crash_auto_blocked = getattr (
3397+ detect_crashed_workers , "_last_auto_blocked" , []
3398+ )
3399+ if _crash_auto_blocked :
3400+ result .auto_blocked .extend (_crash_auto_blocked )
32593401 result .timed_out = enforce_max_runtime (conn )
32603402 result .promoted = recompute_ready (conn )
32613403
0 commit comments