Skip to content

Commit 2c510ae

Browse files
teknium1kyan12
authored andcommitted
refactor(gateway): simplify auto-resume + extend to crash recovery
Follow-up on top of @kyan12's PR NousResearch#20888 — same feature, cleaner shape, wider coverage. Changes: - Drop the synthetic '[System note: ...]' in the internal MessageEvent. The existing _is_resume_pending branch in _handle_message_with_agent (run.py ~L13738) already injects a reason-aware recovery system note on the next turn. With kyan's text in place the model saw two stacked system notes. Now the event text is empty and the existing injection path owns the wording. - Drop SessionStore.list_resume_pending() as a new public method. The filter is 8 lines inline in _schedule_resume_pending_sessions() — one caller, no other pluggability need. - Add 'restart_interrupted' to the auto-resume reason set. That's the reason SessionStore.suspend_recently_active() stamps on sessions recovered from a crash/OOM/SIGKILL (no .clean_shutdown marker). Previously those sessions had to wait for a real user message to auto-resume; now they continue automatically at startup like drain-timeout interruptions do. - Reasons live in a _AUTO_RESUME_REASONS frozenset at class scope so future reasons (e.g. 'manual_resume_request') can be opted in with one line. Test coverage added: - drain-timeout + crash-recovery both scheduled - stale entries skipped (outside freshness window) - suspended entries skipped (suspended > resume_pending) - originless entries skipped (no routing target) - disallowed reasons skipped (graceful forward-compat) E2E verified end-to-end with a real on-disk SessionStore: 2 eligible sessions scheduled, 2 ineligible skipped, empty-text internal events delivered to the adapter. Co-authored-by: Kevin Yan <kevyan1998@gmail.com>
1 parent b775f52 commit 2c510ae

4 files changed

Lines changed: 194 additions & 93 deletions

File tree

gateway/run.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2760,44 +2760,67 @@ async def _run_restart() -> None:
27602760
task.add_done_callback(self._background_tasks.discard)
27612761
return True
27622762

2763+
# Drain-timeout reasons set by _stop_impl() when a still-running turn is
2764+
# force-interrupted; "restart_interrupted" is set by
2765+
# SessionStore.suspend_recently_active() on crash recovery (no
2766+
# .clean_shutdown marker). All three mean "the agent was mid-turn and
2767+
# we killed it" — eligible for startup auto-resume.
2768+
_AUTO_RESUME_REASONS = frozenset(
2769+
{"restart_timeout", "shutdown_timeout", "restart_interrupted"}
2770+
)
2771+
27632772
def _schedule_resume_pending_sessions(self) -> int:
27642773
"""Auto-continue fresh restart-interrupted sessions after startup.
27652774

2766-
``resume_pending`` already preserves the transcript and injects the
2767-
recovery system note on the next user message. This method closes the
2768-
restart UX gap by synthesizing that next message once adapters are back
2769-
online, so users do not have to send a placeholder ping after restart.
2775+
``resume_pending`` already preserves the transcript AND the existing
2776+
``_is_resume_pending`` branch in ``_handle_message_with_agent``
2777+
injects a reason-aware recovery system note on the next turn. This
2778+
method closes the UX gap by synthesizing that next turn once
2779+
adapters are back online — the event text is empty so the existing
2780+
injection path owns the wording and we never double up.
2781+
2782+
Adapters that are not yet ready (adapter missing from
2783+
``self.adapters``) are skipped silently; their sessions stay
2784+
``resume_pending`` and will auto-resume on the next real user
2785+
message, or on the next gateway startup.
27702786
"""
2771-
try:
2772-
entries = self.session_store.list_resume_pending(
2773-
window_secs=_auto_continue_freshness_window(),
2774-
allowed_reasons={"restart_timeout", "shutdown_timeout"},
2775-
)
2787+
window = _auto_continue_freshness_window()
2788+
try:
2789+
with self.session_store._lock: # noqa: SLF001 — snapshot under lock
2790+
self.session_store._ensure_loaded_locked() # noqa: SLF001
2791+
candidates = [
2792+
entry for entry in self.session_store._entries.values() # noqa: SLF001
2793+
if entry.resume_pending
2794+
and not entry.suspended
2795+
and entry.origin is not None
2796+
and entry.resume_reason in self._AUTO_RESUME_REASONS
2797+
]
27762798
except Exception as exc:
2777-
logger.warning("Failed to list resume-pending sessions: %s", exc)
2799+
logger.warning("Failed to enumerate resume-pending sessions: %s", exc)
27782800
return 0
27792801

2802+
now = datetime.now()
27802803
scheduled = 0
2781-
for entry in entries:
2782-
source = getattr(entry, "origin", None)
2783-
platform = getattr(source, "platform", None)
2784-
adapter = self.adapters.get(platform) if platform is not None else None
2785-
if source is None or adapter is None:
2804+
for entry in candidates:
2805+
marker = entry.last_resume_marked_at or entry.updated_at
2806+
if marker is not None and (now - marker).total_seconds() > window:
2807+
continue
2808+
2809+
source = entry.origin
2810+
adapter = self.adapters.get(source.platform)
2811+
if adapter is None:
27862812
logger.debug(
2787-
"Skipping auto-resume for %s: adapter unavailable for %s",
2788-
getattr(entry, "session_key", "?"),
2789-
getattr(platform, "value", platform),
2813+
"Skipping auto-resume for %s: adapter not ready for %s",
2814+
entry.session_key,
2815+
getattr(source.platform, "value", source.platform),
27902816
)
27912817
continue
27922818

2819+
# Empty-text internal event — the _is_resume_pending branch in
2820+
# _handle_message_with_agent prepends the proper reason-aware
2821+
# system note before the turn runs.
27932822
event = MessageEvent(
2794-
text=(
2795-
"[System note: The gateway restarted after interrupting "
2796-
"this session. Resume the previous turn now. Reconcile "
2797-
"the transcript first: if tool results are already present, "
2798-
"process them before taking new action; never claim work "
2799-
"completed unless it is visible in the transcript/tool output.]"
2800-
),
2823+
text="",
28012824
message_type=MessageType.TEXT,
28022825
source=source,
28032826
internal=True,
@@ -2808,7 +2831,10 @@ def _schedule_resume_pending_sessions(self) -> int:
28082831
scheduled += 1
28092832

28102833
if scheduled:
2811-
logger.info("Scheduled auto-resume for %d restart-interrupted session(s)", scheduled)
2834+
logger.info(
2835+
"Scheduled auto-resume for %d restart-interrupted session(s)",
2836+
scheduled,
2837+
)
28122838
return scheduled
28132839

28142840
async def start(self) -> bool:

gateway/session.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,42 +1028,6 @@ def clear_resume_pending(self, session_key: str) -> bool:
10281028
self._save()
10291029
return True
10301030

1031-
def list_resume_pending(
1032-
self,
1033-
*,
1034-
window_secs: Optional[float] = None,
1035-
now: Optional[float] = None,
1036-
allowed_reasons: Optional[set[str]] = None,
1037-
) -> List[SessionEntry]:
1038-
"""Return fresh restart-interrupted sessions eligible for resume.
1039-
1040-
Only entries that still have an origin are returned; the gateway needs
1041-
that origin to route continuation back through the original
1042-
platform/chat/thread. ``suspended`` entries are excluded because
1043-
explicit suspension/stuck-loop escalation must win over resume.
1044-
"""
1045-
current = datetime.fromtimestamp(now) if now is not None else _now()
1046-
window = float(window_secs) if window_secs is not None else None
1047-
1048-
with self._lock:
1049-
self._ensure_loaded_locked()
1050-
entries = list(self._entries.values())
1051-
1052-
pending: List[SessionEntry] = []
1053-
for entry in entries:
1054-
if not entry.resume_pending or entry.suspended or entry.origin is None:
1055-
continue
1056-
if allowed_reasons is not None and entry.resume_reason not in allowed_reasons:
1057-
continue
1058-
if window is not None and window > 0:
1059-
marker = entry.last_resume_marked_at or entry.updated_at
1060-
if marker is not None and (current - marker).total_seconds() > window:
1061-
continue
1062-
pending.append(entry)
1063-
1064-
pending.sort(key=lambda entry: entry.last_resume_marked_at or entry.updated_at)
1065-
return pending
1066-
10671031
def prune_old_entries(self, max_age_days: int) -> int:
10681032
"""Drop SessionEntry records older than max_age_days.
10691033

scripts/release.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"ngusev@astralinux.ru": "NikolayGusev-astra",
5555
"liuguangyong201@hellobike.com": "liuguangyong93",
5656
"2093036+exiao@users.noreply.github.com": "exiao",
57+
"kevyan1998@gmail.com": "kyan12",
5758
"rylen.anil@gmail.com": "rylena",
5859
"godnanijatin@gmail.com": "jatingodnani",
5960
"252811164+adybag14-cyber@users.noreply.github.com": "adybag14-cyber",

tests/gateway/test_restart_resume_pending.py

Lines changed: 141 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -245,30 +245,6 @@ def test_malformed_timestamp_is_tolerated(self):
245245

246246

247247
class TestMarkResumePending:
248-
def test_list_resume_pending_returns_fresh_entries_with_origins(self, tmp_path):
249-
store = _make_store(tmp_path)
250-
fresh = store.get_or_create_session(_make_source(chat_id="fresh"))
251-
stale = store.get_or_create_session(_make_source(chat_id="stale"))
252-
missing_origin = store.get_or_create_session(_make_source(chat_id="missing-origin"))
253-
suspended = store.get_or_create_session(_make_source(chat_id="suspended"))
254-
255-
store.mark_resume_pending(fresh.session_key, reason="restart_timeout")
256-
store.mark_resume_pending(stale.session_key, reason="restart_timeout")
257-
store.mark_resume_pending(missing_origin.session_key, reason="restart_timeout")
258-
store.mark_resume_pending(suspended.session_key, reason="restart_timeout")
259-
old = datetime.now() - timedelta(hours=3)
260-
store._entries[stale.session_key].last_resume_marked_at = old
261-
store._entries[missing_origin.session_key].origin = None
262-
store._entries[suspended.session_key].suspended = True
263-
264-
pending = store.list_resume_pending(
265-
window_secs=3600,
266-
now=datetime.now().timestamp(),
267-
allowed_reasons={"restart_timeout"},
268-
)
269-
270-
assert [entry.session_key for entry in pending] == [fresh.session_key]
271-
272248
def test_marks_existing_session(self, tmp_path):
273249
store = _make_store(tmp_path)
274250
source = _make_source()
@@ -978,24 +954,158 @@ async def test_startup_auto_resume_schedules_fresh_pending_sessions():
978954
resume_reason="restart_timeout",
979955
last_resume_marked_at=datetime.now(),
980956
)
981-
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
957+
runner.session_store._entries = {pending_entry.session_key: pending_entry}
982958
adapter.handle_message = AsyncMock()
983959

984960
scheduled = runner._schedule_resume_pending_sessions()
985961
await asyncio.sleep(0)
986962

987963
assert scheduled == 1
988-
runner.session_store.list_resume_pending.assert_called_once_with(
989-
window_secs=_auto_continue_freshness_window(),
990-
allowed_reasons={"restart_timeout", "shutdown_timeout"},
991-
)
992964
adapter.handle_message.assert_awaited_once()
993965
event = adapter.handle_message.await_args.args[0]
994966
assert isinstance(event, MessageEvent)
995967
assert event.internal is True
996968
assert event.message_type == MessageType.TEXT
997969
assert event.source == source
998-
assert event.text.startswith("[System note: The gateway restarted")
970+
# Text is empty — the existing _is_resume_pending branch in
971+
# _handle_message_with_agent owns the system-note injection so we don't
972+
# double it up.
973+
assert event.text == ""
974+
975+
976+
@pytest.mark.asyncio
977+
async def test_startup_auto_resume_includes_crash_recovery():
978+
"""Crash-recovered sessions (reason=restart_interrupted) are also auto-resumed.
979+
980+
suspend_recently_active() marks in-flight sessions with resume_reason
981+
"restart_interrupted" when the previous gateway exit was not clean
982+
(crash/SIGKILL/OOM). These should get the same magic continuation as
983+
drain-timeout interruptions.
984+
"""
985+
runner, adapter = make_restart_runner()
986+
source = make_restart_source(chat_id="crash-chat")
987+
pending_entry = SessionEntry(
988+
session_key="agent:main:telegram:dm:crash-chat",
989+
session_id="sid",
990+
created_at=datetime.now(),
991+
updated_at=datetime.now(),
992+
origin=source,
993+
platform=Platform.TELEGRAM,
994+
chat_type="dm",
995+
resume_pending=True,
996+
resume_reason="restart_interrupted",
997+
last_resume_marked_at=datetime.now(),
998+
)
999+
runner.session_store._entries = {pending_entry.session_key: pending_entry}
1000+
adapter.handle_message = AsyncMock()
1001+
1002+
scheduled = runner._schedule_resume_pending_sessions()
1003+
await asyncio.sleep(0)
1004+
1005+
assert scheduled == 1
1006+
adapter.handle_message.assert_awaited_once()
1007+
1008+
1009+
@pytest.mark.asyncio
1010+
async def test_startup_auto_resume_skips_stale_entries():
1011+
"""Entries older than the freshness window must not be auto-resumed."""
1012+
runner, adapter = make_restart_runner()
1013+
source = make_restart_source(chat_id="stale-chat")
1014+
stale_marker = datetime.now() - timedelta(
1015+
seconds=_auto_continue_freshness_window() + 60
1016+
)
1017+
stale_entry = SessionEntry(
1018+
session_key="agent:main:telegram:dm:stale-chat",
1019+
session_id="sid",
1020+
created_at=stale_marker,
1021+
updated_at=stale_marker,
1022+
origin=source,
1023+
platform=Platform.TELEGRAM,
1024+
chat_type="dm",
1025+
resume_pending=True,
1026+
resume_reason="restart_timeout",
1027+
last_resume_marked_at=stale_marker,
1028+
)
1029+
runner.session_store._entries = {stale_entry.session_key: stale_entry}
1030+
adapter.handle_message = AsyncMock()
1031+
1032+
scheduled = runner._schedule_resume_pending_sessions()
1033+
1034+
assert scheduled == 0
1035+
adapter.handle_message.assert_not_called()
1036+
1037+
1038+
@pytest.mark.asyncio
1039+
async def test_startup_auto_resume_skips_suspended_and_originless():
1040+
"""suspended entries and entries with no origin are excluded."""
1041+
runner, adapter = make_restart_runner()
1042+
source = make_restart_source(chat_id="ok")
1043+
suspended_entry = SessionEntry(
1044+
session_key="agent:main:telegram:dm:suspended",
1045+
session_id="sid-s",
1046+
created_at=datetime.now(),
1047+
updated_at=datetime.now(),
1048+
origin=source,
1049+
platform=Platform.TELEGRAM,
1050+
chat_type="dm",
1051+
resume_pending=True,
1052+
resume_reason="restart_timeout",
1053+
suspended=True,
1054+
last_resume_marked_at=datetime.now(),
1055+
)
1056+
originless = SessionEntry(
1057+
session_key="agent:main:telegram:dm:originless",
1058+
session_id="sid-o",
1059+
created_at=datetime.now(),
1060+
updated_at=datetime.now(),
1061+
origin=None,
1062+
platform=Platform.TELEGRAM,
1063+
chat_type="dm",
1064+
resume_pending=True,
1065+
resume_reason="restart_timeout",
1066+
last_resume_marked_at=datetime.now(),
1067+
)
1068+
runner.session_store._entries = {
1069+
suspended_entry.session_key: suspended_entry,
1070+
originless.session_key: originless,
1071+
}
1072+
adapter.handle_message = AsyncMock()
1073+
1074+
scheduled = runner._schedule_resume_pending_sessions()
1075+
1076+
assert scheduled == 0
1077+
adapter.handle_message.assert_not_called()
1078+
1079+
1080+
@pytest.mark.asyncio
1081+
async def test_startup_auto_resume_skips_disallowed_reasons():
1082+
"""Reasons outside the auto-resume set (e.g. a future custom reason) are skipped.
1083+
1084+
These sessions still auto-resume on the next real user message via the
1085+
existing _is_resume_pending branch — we just don't synthesize a turn
1086+
for them at startup.
1087+
"""
1088+
runner, adapter = make_restart_runner()
1089+
source = make_restart_source(chat_id="other")
1090+
other_entry = SessionEntry(
1091+
session_key="agent:main:telegram:dm:other",
1092+
session_id="sid",
1093+
created_at=datetime.now(),
1094+
updated_at=datetime.now(),
1095+
origin=source,
1096+
platform=Platform.TELEGRAM,
1097+
chat_type="dm",
1098+
resume_pending=True,
1099+
resume_reason="manual_resume_request",
1100+
last_resume_marked_at=datetime.now(),
1101+
)
1102+
runner.session_store._entries = {other_entry.session_key: other_entry}
1103+
adapter.handle_message = AsyncMock()
1104+
1105+
scheduled = runner._schedule_resume_pending_sessions()
1106+
1107+
assert scheduled == 0
1108+
adapter.handle_message.assert_not_called()
9991109

10001110

10011111
@pytest.mark.asyncio
@@ -1014,7 +1124,7 @@ async def test_startup_auto_resume_skips_when_adapter_unavailable():
10141124
resume_reason="restart_timeout",
10151125
last_resume_marked_at=datetime.now(),
10161126
)
1017-
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
1127+
runner.session_store._entries = {pending_entry.session_key: pending_entry}
10181128
runner.adapters = {}
10191129
adapter.handle_message = AsyncMock()
10201130

0 commit comments

Comments
 (0)