Skip to content

Commit 4adce73

Browse files
Simplify SEP-1699 API design
- Remove StreamableHTTPReconnectionOptions dataclass (4 params -> 1 max_retries) - Per spec, client MUST use server's retry field; exponential backoff removed - Add get_stream_id_for_event_id() to EventStore for stream identification - Simplify CloseSSEStreamCallback to no-arg (remove unused retry_interval_ms) - Add close_standalone_sse_stream() for GET notification streams - Update tests and examples for simplified API Net: -84 lines, cleaner API that follows spec more closely
1 parent dc457ff commit 4adce73

File tree

6 files changed

+74
-158
lines changed

6 files changed

+74
-158
lines changed

examples/snippets/clients/sse_polling_client.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import logging
2525

2626
from mcp import ClientSession
27-
from mcp.client.streamable_http import StreamableHTTPReconnectionOptions, streamablehttp_client
27+
from mcp.client.streamable_http import streamablehttp_client
2828
from mcp.types import LoggingMessageNotificationParams, TextContent
2929

3030
logging.basicConfig(
@@ -50,19 +50,13 @@ async def logging_callback(params: LoggingMessageNotificationParams) -> None:
5050
notifications_received.append(data_str)
5151
print(f"[Progress] {data_str}")
5252

53-
# Configure reconnection behavior
54-
reconnection_options = StreamableHTTPReconnectionOptions(
55-
initial_reconnection_delay=1.0, # Start with 1 second
56-
max_reconnection_delay=30.0, # Cap at 30 seconds
57-
reconnection_delay_grow_factor=1.5, # Exponential backoff
58-
max_retries=5, # Try up to 5 times
59-
)
60-
6153
print("[Client] Connecting to server...")
6254

55+
# The client automatically uses the server's retry interval (per SEP-1699 spec)
56+
# max_retries controls how many reconnection attempts before giving up
6357
async with streamablehttp_client(
6458
"http://localhost:3001/mcp",
65-
reconnection_options=reconnection_options,
59+
max_retries=5,
6660
) as (read_stream, write_stream, get_session_id):
6761
# Create session with logging callback to receive progress notifications
6862
async with ClientSession(

src/mcp/client/streamable_http.py

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,10 @@ class ResumptionError(StreamableHTTPError):
5858
"""Raised when resumption request is invalid."""
5959

6060

61-
@dataclass
62-
class StreamableHTTPReconnectionOptions:
63-
"""Configuration options for reconnection behavior of StreamableHTTPTransport.
64-
65-
Attributes:
66-
initial_reconnection_delay: Initial backoff time in seconds. Default is 1.0.
67-
max_reconnection_delay: Maximum backoff time in seconds. Default is 30.0.
68-
reconnection_delay_grow_factor: Factor by which delay increases. Default is 1.5.
69-
max_retries: Maximum reconnection attempts. Default is 2.
70-
"""
71-
72-
initial_reconnection_delay: float = 1.0
73-
max_reconnection_delay: float = 30.0
74-
reconnection_delay_grow_factor: float = 1.5
75-
max_retries: int = 2
76-
77-
def __post_init__(self) -> None:
78-
if self.initial_reconnection_delay > self.max_reconnection_delay:
79-
raise ValueError("initial_reconnection_delay cannot exceed max_reconnection_delay")
61+
# Default reconnection settings per SEP-1699
62+
# The server controls timing via SSE retry field; this is just a fallback
63+
DEFAULT_RECONNECTION_DELAY = 1.0 # seconds, used when server doesn't provide retry
64+
DEFAULT_MAX_RETRIES = 2
8065

8166

8267
@dataclass
@@ -102,7 +87,7 @@ def __init__(
10287
timeout: float | timedelta = 30,
10388
sse_read_timeout: float | timedelta = 60 * 5,
10489
auth: httpx.Auth | None = None,
105-
reconnection_options: StreamableHTTPReconnectionOptions | None = None,
90+
max_retries: int = DEFAULT_MAX_RETRIES,
10691
) -> None:
10792
"""Initialize the StreamableHTTP transport."""
10893
self.url = url
@@ -114,7 +99,7 @@ def __init__(
11499
self.auth = auth
115100
self.session_id = None
116101
self.protocol_version = None
117-
self.reconnection_options = reconnection_options or StreamableHTTPReconnectionOptions()
102+
self.max_retries = max_retries
118103
self._server_retry_seconds: float | None = None # Server-provided retry delay
119104
self.request_headers = {
120105
ACCEPT: f"{JSON}, {SSE}",
@@ -166,23 +151,15 @@ def _maybe_extract_protocol_version_from_message(
166151
) # pragma: no cover
167152
logger.warning(f"Raw result: {message.root.result}")
168153

169-
def _get_next_reconnection_delay(self, attempt: int) -> float:
170-
"""Calculate the next reconnection delay using exponential backoff.
154+
def _get_reconnection_delay(self) -> float:
155+
"""Get the reconnection delay per SEP-1699.
171156
172-
Args:
173-
attempt: Current reconnection attempt count
174-
175-
Returns:
176-
Time to wait in seconds before next reconnection attempt
157+
Returns the server-provided retry value if available (MUST per spec),
158+
otherwise falls back to a simple default.
177159
"""
178-
# Use server-provided retry value if available
179160
if self._server_retry_seconds is not None:
180161
return self._server_retry_seconds
181-
182-
# Fall back to exponential backoff
183-
opts = self.reconnection_options
184-
delay = opts.initial_reconnection_delay * (opts.reconnection_delay_grow_factor**attempt)
185-
return min(delay, opts.max_reconnection_delay)
162+
return DEFAULT_RECONNECTION_DELAY
186163

187164
async def _handle_sse_event(
188165
self,
@@ -427,17 +404,15 @@ async def _attempt_sse_reconnection( # pragma: no cover
427404
Called when SSE stream ends without receiving a response/error,
428405
but we have a priming event indicating resumability.
429406
"""
430-
max_retries = self.reconnection_options.max_retries
431-
432-
if attempt >= max_retries:
433-
error_msg = f"Max reconnection attempts ({max_retries}) exceeded"
407+
if attempt >= self.max_retries:
408+
error_msg = f"Max reconnection attempts ({self.max_retries}) exceeded"
434409
logger.error(error_msg)
435410
await ctx.read_stream_writer.send(StreamableHTTPError(error_msg))
436411
return
437412

438-
# Calculate delay (uses server retry if available, else exponential backoff)
439-
delay = self._get_next_reconnection_delay(attempt)
440-
logger.info(f"SSE stream closed, reconnecting in {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
413+
# Use server-provided retry interval (MUST per spec) or fallback
414+
delay = self._get_reconnection_delay()
415+
logger.info(f"SSE stream closed, reconnecting in {delay:.1f}s (attempt {attempt + 1}/{self.max_retries})")
441416

442417
await anyio.sleep(delay)
443418

@@ -635,7 +610,7 @@ async def streamablehttp_client(
635610
terminate_on_close: bool = True,
636611
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
637612
auth: httpx.Auth | None = None,
638-
reconnection_options: StreamableHTTPReconnectionOptions | None = None,
613+
max_retries: int = DEFAULT_MAX_RETRIES,
639614
) -> AsyncGenerator[
640615
tuple[
641616
MemoryObjectReceiveStream[SessionMessage | Exception],
@@ -649,8 +624,10 @@ async def streamablehttp_client(
649624
650625
`sse_read_timeout` determines how long (in seconds) the client will wait for a new
651626
event before disconnecting. All other HTTP operations are controlled by `timeout`.
627+
`max_retries` controls how many times the client will attempt to reconnect when
628+
the server closes an SSE stream mid-operation (SEP-1699 polling).
652629
"""
653-
transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout, auth, reconnection_options)
630+
transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout, auth, max_retries)
654631

655632
read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0)
656633
write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0)

src/mcp/server/fastmcp/server.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,23 +1304,17 @@ async def error(self, message: str, **extra: Any) -> None:
13041304
"""Send an error log message."""
13051305
await self.log("error", message, **extra)
13061306

1307-
async def close_sse_stream(self, retry_interval_ms: int | None = None) -> None:
1307+
async def close_sse_stream(self) -> None:
13081308
"""Close the SSE stream for this request, triggering client reconnection.
13091309
13101310
Use this to implement polling behavior during long-running operations.
1311-
The client will reconnect after the retry interval and receive any events
1312-
that were stored while disconnected.
1311+
The client will reconnect after the retry interval (configured at the
1312+
transport level) and receive any events that were stored while disconnected.
13131313
13141314
This is only available when using StreamableHTTP transport with an
13151315
event store configured for resumability. It is a no-op otherwise,
13161316
allowing portable tool code.
1317-
1318-
Args:
1319-
retry_interval_ms: Optional retry interval in milliseconds to suggest
1320-
to the client before closing (currently unused,
1321-
reserved for future use - the retry interval is
1322-
configured at the transport level)
13231317
"""
13241318
callback = self._request_context.close_sse_stream if self._request_context else None
13251319
if callback:
1326-
await callback(retry_interval_ms)
1320+
await callback()

src/mcp/server/streamable_http.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,26 @@ async def replay_events_after(
118118
"""
119119
pass # pragma: no cover
120120

121+
async def get_stream_id_for_event_id(self, event_id: EventId) -> StreamId | None:
122+
"""
123+
Get the stream ID associated with a given event ID.
124+
125+
This optional method allows servers to look up which stream an event
126+
belongs to, supporting proper stream identification for resumption.
127+
128+
Args:
129+
event_id: The event ID to look up
130+
131+
Returns:
132+
The stream ID, or None if not found or not implemented
133+
134+
Note:
135+
The default implementation returns None. Override this in your
136+
EventStore implementation if you need explicit stream identification
137+
(e.g., for multiple concurrent resumable streams).
138+
"""
139+
return None
140+
121141

122142
class StreamableHTTPServerTransport:
123143
"""
@@ -740,9 +760,7 @@ def _create_close_sse_callback(self, request_id: str) -> CloseSSEStreamCallback
740760
if not self._event_store:
741761
return None
742762

743-
async def close_callback(retry_interval_ms: int | None = None) -> None:
744-
# Note: retry_interval_ms is accepted for API compatibility but not used
745-
# The retry interval is set via the priming event
763+
async def close_callback() -> None:
746764
await self.close_sse_stream(request_id)
747765

748766
return close_callback
@@ -754,7 +772,7 @@ async def close_sse_stream(self, request_id: RequestId) -> None:
754772
client will reconnect after the retry interval specified in the priming event.
755773
756774
Args:
757-
request_id: The request ID (or stream key) of the stream to close
775+
request_id: The request ID (or GET_STREAM_KEY for standalone GET stream)
758776
"""
759777
request_id_str = str(request_id)
760778
if request_id_str in self._request_streams:
@@ -768,6 +786,14 @@ async def close_sse_stream(self, request_id: RequestId) -> None:
768786
finally:
769787
self._request_streams.pop(request_id_str, None)
770788

789+
async def close_standalone_sse_stream(self) -> None:
790+
"""Close the standalone GET notification stream, triggering client reconnection.
791+
792+
This is a convenience method equivalent to close_sse_stream(GET_STREAM_KEY).
793+
Use this to implement polling behavior for server-initiated notifications.
794+
"""
795+
await self.close_sse_stream(GET_STREAM_KEY)
796+
771797
async def _handle_unsupported_request(self, request: Request, send: Send) -> None: # pragma: no cover
772798
"""Handle unsupported HTTP methods."""
773799
headers = {

src/mcp/shared/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
ResumptionTokenUpdateCallback = Callable[[ResumptionToken], Awaitable[None]]
1616

17-
# Callback type for closing SSE streams (takes optional retry_interval_ms)
18-
CloseSSEStreamCallback = Callable[[int | None], Awaitable[None]]
17+
# Callback type for closing SSE streams to trigger client reconnection
18+
CloseSSEStreamCallback = Callable[[], Awaitable[None]]
1919

2020

2121
@dataclass

0 commit comments

Comments
 (0)