Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,26 @@ def _normalize_empty_agent_response(
return response


def _should_clear_resume_pending_after_turn(agent_result: dict) -> bool:
"""Return True only when a gateway turn really completed successfully.

Restart recovery uses ``resume_pending`` as a durable marker for sessions
interrupted during gateway drain. A soft interrupt can still bubble out as
a syntactically normal agent result with an empty final response; clearing
the marker in that case loses the recovery signal and startup auto-resume
has nothing to schedule.
"""
if not isinstance(agent_result, dict):
return False
if agent_result.get("interrupted"):
return False
if agent_result.get("failed") or agent_result.get("partial") or agent_result.get("error"):
return False
if agent_result.get("completed") is False:
return False
return True


class GatewayRunner:
"""
Main gateway controller.
Expand Down Expand Up @@ -2723,6 +2743,57 @@ async def _run_restart() -> None:
task.add_done_callback(self._background_tasks.discard)
return True

def _schedule_resume_pending_sessions(self) -> int:
"""Auto-continue fresh restart-interrupted sessions after startup.

``resume_pending`` already preserves the transcript and injects the
recovery system note on the next user message. This method closes the
restart UX gap by synthesizing that next message once adapters are back
online, so users do not have to send a placeholder ping after restart.
"""
try:
entries = self.session_store.list_resume_pending(
window_secs=_auto_continue_freshness_window(),
allowed_reasons={"restart_timeout", "shutdown_timeout"},
)
except Exception as exc:
logger.warning("Failed to list resume-pending sessions: %s", exc)
return 0

scheduled = 0
for entry in entries:
source = getattr(entry, "origin", None)
platform = getattr(source, "platform", None)
adapter = self.adapters.get(platform) if platform is not None else None
if source is None or adapter is None:
logger.debug(
"Skipping auto-resume for %s: adapter unavailable for %s",
getattr(entry, "session_key", "?"),
getattr(platform, "value", platform),
)
continue

event = MessageEvent(
text=(
"[System note: The gateway restarted after interrupting "
"this session. Resume the previous turn now. Reconcile "
"the transcript first: if tool results are already present, "
"process them before taking new action; never claim work "
"completed unless it is visible in the transcript/tool output.]"
),
message_type=MessageType.TEXT,
source=source,
internal=True,
)
task = asyncio.create_task(adapter.handle_message(event))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
scheduled += 1

if scheduled:
logger.info("Scheduled auto-resume for %d restart-interrupted session(s)", scheduled)
return scheduled

async def start(self) -> bool:
"""
Start the gateway and all configured platform adapters.
Expand Down Expand Up @@ -3111,6 +3182,12 @@ async def start(self) -> bool:
skip_targets=skip_home_targets,
)

# Automatically continue fresh sessions that were interrupted by the
# previous gateway restart/shutdown. The resume_pending flag is cleared
# by the normal successful-turn path, so a failed auto-resume remains
# visible for manual recovery on the next user message.
self._schedule_resume_pending_sessions()

# Drain any recovered process watchers (from crash recovery checkpoint)
try:
from tools.process_registry import process_registry
Expand Down Expand Up @@ -6485,7 +6562,7 @@ async def _handle_message_with_agent(self, event, source, _quick_key: str, run_g
# shutdown) — the turn ran to completion, so recovery
# succeeded and subsequent messages should no longer receive
# the restart-interruption system note.
if session_key:
if session_key and _should_clear_resume_pending_after_turn(agent_result):
self._clear_restart_failure_count(session_key)
try:
self.session_store.clear_resume_pending(session_key)
Expand Down Expand Up @@ -13809,6 +13886,11 @@ def _approval_notify_sync(approval_data: dict) -> None:
"messages": result.get("messages", []),
"api_calls": result.get("api_calls", 0),
"failed": result.get("failed", False),
"partial": result.get("partial", False),
"completed": result.get("completed"),
"interrupted": result.get("interrupted", False),
"interrupt_message": result.get("interrupt_message"),
"error": result.get("error"),
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
Expand Down Expand Up @@ -13924,6 +14006,11 @@ def _approval_notify_sync(approval_data: dict) -> None:
"last_reasoning": result.get("last_reasoning"),
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
"completed": result_holder[0].get("completed") if result_holder[0] else None,
"interrupted": result_holder[0].get("interrupted", False) if result_holder[0] else False,
"partial": result_holder[0].get("partial", False) if result_holder[0] else False,
"error": result_holder[0].get("error") if result_holder[0] else None,
"interrupt_message": result_holder[0].get("interrupt_message") if result_holder[0] else None,
"tools": tools_holder[0] or [],
"history_offset": _effective_history_offset,
"last_prompt_tokens": _last_prompt_toks,
Expand Down
36 changes: 36 additions & 0 deletions gateway/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,42 @@ def clear_resume_pending(self, session_key: str) -> bool:
self._save()
return True

def list_resume_pending(
self,
*,
window_secs: Optional[float] = None,
now: Optional[float] = None,
allowed_reasons: Optional[set[str]] = None,
) -> List[SessionEntry]:
"""Return fresh restart-interrupted sessions eligible for resume.

Only entries that still have an origin are returned; the gateway needs
that origin to route continuation back through the original
platform/chat/thread. ``suspended`` entries are excluded because
explicit suspension/stuck-loop escalation must win over resume.
"""
current = datetime.fromtimestamp(now) if now is not None else _now()
window = float(window_secs) if window_secs is not None else None

with self._lock:
self._ensure_loaded_locked()
entries = list(self._entries.values())

pending: List[SessionEntry] = []
for entry in entries:
if not entry.resume_pending or entry.suspended or entry.origin is None:
continue
if allowed_reasons is not None and entry.resume_reason not in allowed_reasons:
continue
if window is not None and window > 0:
marker = entry.last_resume_marked_at or entry.updated_at
if marker is not None and (current - marker).total_seconds() > window:
continue
pending.append(entry)

pending.sort(key=lambda entry: entry.last_resume_marked_at or entry.updated_at)
return pending

def prune_old_entries(self, max_age_days: int) -> int:
"""Drop SessionEntry records older than max_age_days.

Expand Down
116 changes: 115 additions & 1 deletion tests/gateway/test_restart_resume_pending.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import pytest

from gateway.config import GatewayConfig, HomeChannel, Platform, PlatformConfig
from gateway.platforms.base import SendResult
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.run import (
_auto_continue_freshness_window,
_coerce_gateway_timestamp,
_is_fresh_gateway_interruption,
_last_transcript_timestamp,
_should_clear_resume_pending_after_turn,
)
from gateway.session import SessionEntry, SessionSource, SessionStore
from tests.gateway.restart_test_helpers import (
Expand All @@ -52,6 +53,23 @@
# ---------------------------------------------------------------------------


def test_resume_pending_is_cleared_only_after_successful_turn():
"""Interrupted/failed drain results must keep the restart recovery marker.

Regression for dogfood failure: during gateway restart the interrupted run
returned an empty final response and was normalized into a user-facing
fallback, but the gateway cleared ``resume_pending`` before startup could
auto-resume it.
"""
assert _should_clear_resume_pending_after_turn({"final_response": "done"}) is True
assert _should_clear_resume_pending_after_turn({"completed": True}) is True
assert _should_clear_resume_pending_after_turn({"interrupted": True}) is False
assert _should_clear_resume_pending_after_turn({"completed": False}) is False
assert _should_clear_resume_pending_after_turn({"failed": True}) is False
assert _should_clear_resume_pending_after_turn({"partial": True}) is False
assert _should_clear_resume_pending_after_turn({"error": "boom"}) is False


def _make_source(platform=Platform.TELEGRAM, chat_id="123", user_id="u1"):
return SessionSource(platform=platform, chat_id=chat_id, user_id=user_id)

Expand Down Expand Up @@ -227,6 +245,30 @@ def test_malformed_timestamp_is_tolerated(self):


class TestMarkResumePending:
def test_list_resume_pending_returns_fresh_entries_with_origins(self, tmp_path):
store = _make_store(tmp_path)
fresh = store.get_or_create_session(_make_source(chat_id="fresh"))
stale = store.get_or_create_session(_make_source(chat_id="stale"))
missing_origin = store.get_or_create_session(_make_source(chat_id="missing-origin"))
suspended = store.get_or_create_session(_make_source(chat_id="suspended"))

store.mark_resume_pending(fresh.session_key, reason="restart_timeout")
store.mark_resume_pending(stale.session_key, reason="restart_timeout")
store.mark_resume_pending(missing_origin.session_key, reason="restart_timeout")
store.mark_resume_pending(suspended.session_key, reason="restart_timeout")
old = datetime.now() - timedelta(hours=3)
store._entries[stale.session_key].last_resume_marked_at = old
store._entries[missing_origin.session_key].origin = None
store._entries[suspended.session_key].suspended = True

pending = store.list_resume_pending(
window_secs=3600,
now=datetime.now().timestamp(),
allowed_reasons={"restart_timeout"},
)

assert [entry.session_key for entry in pending] == [fresh.session_key]

def test_marks_existing_session(self, tmp_path):
store = _make_store(tmp_path)
source = _make_source()
Expand Down Expand Up @@ -910,6 +952,78 @@ async def test_drain_timeout_skips_pending_sentinel_sessions():
assert marked == {session_key_real}


# ---------------------------------------------------------------------------
# Gateway startup auto-resume
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_startup_auto_resume_schedules_fresh_pending_sessions():
"""Fresh resume_pending sessions should continue automatically after startup.

This closes the UX gap where restart recovery only happened if the user sent
another message after the gateway came back.
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="resume-chat", thread_id="topic-1")
pending_entry = SessionEntry(
session_key="agent:main:telegram:group:resume-chat:topic-1",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="group",
resume_pending=True,
resume_reason="restart_timeout",
last_resume_marked_at=datetime.now(),
)
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
adapter.handle_message = AsyncMock()

scheduled = runner._schedule_resume_pending_sessions()
await asyncio.sleep(0)

assert scheduled == 1
runner.session_store.list_resume_pending.assert_called_once_with(
window_secs=_auto_continue_freshness_window(),
allowed_reasons={"restart_timeout", "shutdown_timeout"},
)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert isinstance(event, MessageEvent)
assert event.internal is True
assert event.message_type == MessageType.TEXT
assert event.source == source
assert event.text.startswith("[System note: The gateway restarted")


@pytest.mark.asyncio
async def test_startup_auto_resume_skips_when_adapter_unavailable():
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="resume-chat")
pending_entry = SessionEntry(
session_key="agent:main:telegram:dm:resume-chat",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_timeout",
last_resume_marked_at=datetime.now(),
)
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
runner.adapters = {}
adapter.handle_message = AsyncMock()

scheduled = runner._schedule_resume_pending_sessions()

assert scheduled == 0
adapter.handle_message.assert_not_called()


# ---------------------------------------------------------------------------
# Shutdown banner wording
# ---------------------------------------------------------------------------
Expand Down