Skip to content

Commit 21aafe8

Browse files
Add retry_interval to FastMCP and test_reconnection to everything-server (SEP-1699)
- Add retry_interval parameter to FastMCP for SSE polling control - Add InMemoryEventStore and test_reconnection tool to everything-server - Enables SSE polling conformance test to pass (server-sse-polling scenario)
1 parent fdcd8f5 commit 21aafe8

File tree

2 files changed

+60
-0
lines changed
  • examples/servers/everything-server/mcp_everything_server
  • src/mcp/server/fastmcp

2 files changed

+60
-0
lines changed

examples/servers/everything-server/mcp_everything_server/server.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
from mcp.server.fastmcp import Context, FastMCP
1515
from mcp.server.fastmcp.prompts.base import UserMessage
1616
from mcp.server.session import ServerSession
17+
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
1718
from mcp.types import (
1819
AudioContent,
1920
Completion,
2021
CompletionArgument,
2122
CompletionContext,
2223
EmbeddedResource,
2324
ImageContent,
25+
JSONRPCMessage,
2426
PromptReference,
2527
ResourceTemplateReference,
2628
SamplingMessage,
@@ -31,6 +33,43 @@
3133

3234
logger = logging.getLogger(__name__)
3335

36+
# Type aliases for event store
37+
StreamId = str
38+
EventId = str
39+
40+
41+
class InMemoryEventStore(EventStore):
42+
"""Simple in-memory event store for SSE resumability testing."""
43+
44+
def __init__(self) -> None:
45+
self._events: list[tuple[StreamId, EventId, JSONRPCMessage | None]] = []
46+
self._event_id_counter = 0
47+
48+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
49+
"""Store an event and return its ID."""
50+
self._event_id_counter += 1
51+
event_id = str(self._event_id_counter)
52+
self._events.append((stream_id, event_id, message))
53+
return event_id
54+
55+
async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
56+
"""Replay events after the specified ID."""
57+
target_stream_id = None
58+
for stream_id, event_id, _ in self._events:
59+
if event_id == last_event_id:
60+
target_stream_id = stream_id
61+
break
62+
if target_stream_id is None:
63+
return None
64+
last_event_id_int = int(last_event_id)
65+
for stream_id, event_id, message in self._events:
66+
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
67+
# Skip priming events (None message)
68+
if message is not None:
69+
await send_callback(EventMessage(message, event_id))
70+
return target_stream_id
71+
72+
3473
# Test data
3574
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
3675
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
@@ -39,8 +78,13 @@
3978
resource_subscriptions: set[str] = set()
4079
watched_resource_content = "Watched resource content"
4180

81+
# Create event store for SSE resumability (SEP-1699)
82+
event_store = InMemoryEventStore()
83+
4284
mcp = FastMCP(
4385
name="mcp-conformance-test-server",
86+
event_store=event_store,
87+
retry_interval=100, # 100ms retry interval for SSE polling
4488
)
4589

4690

@@ -263,6 +307,19 @@ def test_error_handling() -> str:
263307
raise RuntimeError("This tool intentionally returns an error for testing")
264308

265309

310+
@mcp.tool()
311+
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
312+
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
313+
await ctx.info("Before disconnect")
314+
315+
await ctx.close_sse_stream()
316+
317+
await asyncio.sleep(0.2) # Wait for client to reconnect
318+
319+
await ctx.info("After reconnect")
320+
return "Reconnection test completed"
321+
322+
266323
# Resources
267324
@mcp.resource("test://static-text")
268325
def static_text_resource() -> str:

src/mcp/server/fastmcp/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def __init__( # noqa: PLR0913
153153
auth_server_provider: (OAuthAuthorizationServerProvider[Any, Any, Any] | None) = None,
154154
token_verifier: TokenVerifier | None = None,
155155
event_store: EventStore | None = None,
156+
retry_interval: int | None = None,
156157
*,
157158
tools: list[Tool] | None = None,
158159
debug: bool = False,
@@ -221,6 +222,7 @@ def __init__( # noqa: PLR0913
221222
if auth_server_provider and not token_verifier: # pragma: no cover
222223
self._token_verifier = ProviderTokenVerifier(auth_server_provider)
223224
self._event_store = event_store
225+
self._retry_interval = retry_interval
224226
self._custom_starlette_routes: list[Route] = []
225227
self.dependencies = self.settings.dependencies
226228
self._session_manager: StreamableHTTPSessionManager | None = None
@@ -940,6 +942,7 @@ def streamable_http_app(self) -> Starlette:
940942
self._session_manager = StreamableHTTPSessionManager(
941943
app=self._mcp_server,
942944
event_store=self._event_store,
945+
retry_interval=self._retry_interval,
943946
json_response=self.settings.json_response,
944947
stateless=self.settings.stateless_http, # Use the stateless setting
945948
security_settings=self.settings.transport_security,

0 commit comments

Comments
 (0)