Skip to content

Commit e3a1c06

Browse files
Add SSE polling demo server and client examples (SEP-1699)
Demonstrates the SSE polling pattern with close_sse_stream(): - Server: process_batch tool that checkpoints periodically - Client: auto-reconnects transparently with Last-Event-ID - Shows priming events, retry interval, and event replay
1 parent e893de9 commit e3a1c06

File tree

10 files changed

+526
-0
lines changed

10 files changed

+526
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# MCP SSE Polling Demo Client
2+
3+
Demonstrates client-side auto-reconnect for the SSE polling pattern (SEP-1699).
4+
5+
## Features
6+
7+
- Connects to SSE polling demo server
8+
- Automatically reconnects when server closes SSE stream
9+
- Resumes from Last-Event-ID to avoid missing messages
10+
- Respects server-provided retry interval
11+
12+
## Usage
13+
14+
```bash
15+
# First start the server:
16+
uv run mcp-sse-polling-demo --port 3000
17+
18+
# Then run this client:
19+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
20+
21+
# Custom options:
22+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp --items 20 --checkpoint-every 5
23+
```
24+
25+
## Options
26+
27+
- `--url`: Server URL (default: http://localhost:3000/mcp)
28+
- `--items`: Number of items to process (default: 10)
29+
- `--checkpoint-every`: Checkpoint interval (default: 3)
30+
- `--log-level`: Logging level (default: DEBUG)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""SSE Polling Demo Client - demonstrates auto-reconnect for long-running tasks."""
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""
2+
SSE Polling Demo Client
3+
4+
Demonstrates the client-side auto-reconnect for SSE polling pattern.
5+
6+
This client connects to the SSE Polling Demo server and calls process_batch,
7+
which triggers periodic server-side stream closes. The client automatically
8+
reconnects using Last-Event-ID and resumes receiving messages.
9+
10+
Run with:
11+
# First start the server:
12+
uv run mcp-sse-polling-demo --port 3000
13+
14+
# Then run this client:
15+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
16+
"""
17+
18+
import asyncio
19+
import logging
20+
21+
import click
22+
from mcp import ClientSession
23+
from mcp.client.streamable_http import streamablehttp_client
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
async def run_demo(url: str, items: int, checkpoint_every: int) -> None:
29+
"""Run the SSE polling demo."""
30+
print(f"\n{'='*60}")
31+
print("SSE Polling Demo Client")
32+
print(f"{'='*60}")
33+
print(f"Server URL: {url}")
34+
print(f"Processing {items} items with checkpoints every {checkpoint_every}")
35+
print(f"Watch DEBUG logs for reconnection messages")
36+
print(f"{'='*60}\n")
37+
38+
async with streamablehttp_client(url) as (read_stream, write_stream, _):
39+
async with ClientSession(read_stream, write_stream) as session:
40+
# Initialize the connection
41+
print("Initializing connection...")
42+
await session.initialize()
43+
print("Connected!\n")
44+
45+
# List available tools
46+
tools = await session.list_tools()
47+
print(f"Available tools: {[t.name for t in tools.tools]}\n")
48+
49+
# Call the process_batch tool
50+
print(f"Calling process_batch(items={items}, checkpoint_every={checkpoint_every})...")
51+
print("Watch for reconnections in DEBUG logs\n")
52+
print("-" * 40)
53+
54+
result = await session.call_tool(
55+
"process_batch",
56+
{
57+
"items": items,
58+
"checkpoint_every": checkpoint_every,
59+
},
60+
)
61+
62+
print("-" * 40)
63+
if result.content:
64+
content = result.content[0]
65+
text = getattr(content, "text", str(content))
66+
print(f"\nResult: {text}")
67+
else:
68+
print("\nResult: No content")
69+
print(f"{'='*60}\n")
70+
71+
72+
@click.command()
73+
@click.option(
74+
"--url",
75+
default="http://localhost:3000/mcp",
76+
help="Server URL",
77+
)
78+
@click.option(
79+
"--items",
80+
default=10,
81+
help="Number of items to process",
82+
)
83+
@click.option(
84+
"--checkpoint-every",
85+
default=3,
86+
help="Checkpoint interval",
87+
)
88+
@click.option(
89+
"--log-level",
90+
default="DEBUG",
91+
help="Logging level",
92+
)
93+
def main(url: str, items: int, checkpoint_every: int, log_level: str) -> None:
94+
"""Run the SSE Polling Demo client."""
95+
logging.basicConfig(
96+
level=getattr(logging, log_level.upper()),
97+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
98+
)
99+
100+
asyncio.run(run_demo(url, items, checkpoint_every))
101+
102+
103+
if __name__ == "__main__":
104+
main()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[project]
2+
name = "mcp-sse-polling-client"
3+
version = "0.1.0"
4+
description = "Demo client for SSE polling with auto-reconnect"
5+
readme = "README.md"
6+
requires-python = ">=3.10"
7+
authors = [{ name = "Anthropic, PBC." }]
8+
keywords = ["mcp", "sse", "polling", "client"]
9+
license = { text = "MIT" }
10+
dependencies = ["click>=8.2.0", "mcp"]
11+
12+
[project.scripts]
13+
mcp-sse-polling-client = "mcp_sse_polling_client.main:main"
14+
15+
[build-system]
16+
requires = ["hatchling"]
17+
build-backend = "hatchling.build"
18+
19+
[tool.hatch.build.targets.wheel]
20+
packages = ["mcp_sse_polling_client"]
21+
22+
[tool.pyright]
23+
include = ["mcp_sse_polling_client"]
24+
venvPath = "."
25+
venv = ".venv"
26+
27+
[tool.ruff.lint]
28+
select = ["E", "F", "I"]
29+
ignore = []
30+
31+
[tool.ruff]
32+
line-length = 120
33+
target-version = "py310"
34+
35+
[dependency-groups]
36+
dev = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# MCP SSE Polling Demo Server
2+
3+
Demonstrates the SSE polling pattern with server-initiated stream close for long-running tasks (SEP-1699).
4+
5+
## Features
6+
7+
- Priming events (automatic with EventStore)
8+
- Server-initiated stream close via `close_sse_stream()` callback
9+
- Client auto-reconnect with Last-Event-ID
10+
- Progress notifications during long-running tasks
11+
- Configurable retry interval
12+
13+
## Usage
14+
15+
```bash
16+
# Start server on default port
17+
uv run mcp-sse-polling-demo --port 3000
18+
19+
# Custom retry interval (milliseconds)
20+
uv run mcp-sse-polling-demo --port 3000 --retry-interval 100
21+
```
22+
23+
## Tool: process_batch
24+
25+
Processes items with periodic checkpoints that trigger SSE stream closes:
26+
27+
- `items`: Number of items to process (1-100, default: 10)
28+
- `checkpoint_every`: Close stream after this many items (1-20, default: 3)
29+
30+
## Client
31+
32+
Use the companion `mcp-sse-polling-client` to test:
33+
34+
```bash
35+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
36+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""SSE Polling Demo Server - demonstrates close_sse_stream for long-running tasks."""
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Entry point for the SSE Polling Demo server."""
2+
3+
from .server import main
4+
5+
if __name__ == "__main__":
6+
main()
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""
2+
In-memory event store for demonstrating resumability functionality.
3+
4+
This is a simple implementation intended for examples and testing,
5+
not for production use where a persistent storage solution would be more appropriate.
6+
"""
7+
8+
import logging
9+
from collections import deque
10+
from dataclasses import dataclass
11+
from uuid import uuid4
12+
13+
from mcp.server.streamable_http import EventCallback, EventId, EventMessage, EventStore, StreamId
14+
from mcp.types import JSONRPCMessage
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class EventEntry:
21+
"""Represents an event entry in the event store."""
22+
23+
event_id: EventId
24+
stream_id: StreamId
25+
message: JSONRPCMessage | None # None for priming events
26+
27+
28+
class InMemoryEventStore(EventStore):
29+
"""
30+
Simple in-memory implementation of the EventStore interface for resumability.
31+
This is primarily intended for examples and testing, not for production use
32+
where a persistent storage solution would be more appropriate.
33+
34+
This implementation keeps only the last N events per stream for memory efficiency.
35+
"""
36+
37+
def __init__(self, max_events_per_stream: int = 100):
38+
"""Initialize the event store.
39+
40+
Args:
41+
max_events_per_stream: Maximum number of events to keep per stream
42+
"""
43+
self.max_events_per_stream = max_events_per_stream
44+
# for maintaining last N events per stream
45+
self.streams: dict[StreamId, deque[EventEntry]] = {}
46+
# event_id -> EventEntry for quick lookup
47+
self.event_index: dict[EventId, EventEntry] = {}
48+
49+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
50+
"""Stores an event with a generated event ID.
51+
52+
Args:
53+
stream_id: ID of the stream the event belongs to
54+
message: The message to store, or None for priming events
55+
"""
56+
event_id = str(uuid4())
57+
event_entry = EventEntry(event_id=event_id, stream_id=stream_id, message=message)
58+
59+
# Get or create deque for this stream
60+
if stream_id not in self.streams:
61+
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
62+
63+
# If deque is full, the oldest event will be automatically removed
64+
# We need to remove it from the event_index as well
65+
if len(self.streams[stream_id]) == self.max_events_per_stream:
66+
oldest_event = self.streams[stream_id][0]
67+
self.event_index.pop(oldest_event.event_id, None)
68+
69+
# Add new event
70+
self.streams[stream_id].append(event_entry)
71+
self.event_index[event_id] = event_entry
72+
73+
return event_id
74+
75+
async def replay_events_after(
76+
self,
77+
last_event_id: EventId,
78+
send_callback: EventCallback,
79+
) -> StreamId | None:
80+
"""Replays events that occurred after the specified event ID."""
81+
if last_event_id not in self.event_index:
82+
logger.warning(f"Event ID {last_event_id} not found in store")
83+
return None
84+
85+
# Get the stream and find events after the last one
86+
last_event = self.event_index[last_event_id]
87+
stream_id = last_event.stream_id
88+
stream_events = self.streams.get(last_event.stream_id, deque())
89+
90+
# Events in deque are already in chronological order
91+
found_last = False
92+
for event in stream_events:
93+
if found_last:
94+
# Skip priming events (None messages) during replay
95+
if event.message is not None:
96+
await send_callback(EventMessage(event.message, event.event_id))
97+
elif event.event_id == last_event_id:
98+
found_last = True
99+
100+
return stream_id

0 commit comments

Comments
 (0)