-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Add SSE polling support (SEP-1699) #1654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add tests for previously uncovered lines in streamable_http client and server to achieve full test coverage: - Client _get_next_reconnection_delay with server retry values - Client resume_stream early returns and error handling - Server _create_priming_event with various configurations - Server close_sse_stream including edge cases Github-Issue:#1654
- Add pragma: no cover to defensive code paths that require complex mocking (server retry field, resume_stream success path, generic exception handlers) - Fix ruff formatting in test file Github-Issue:#1654
- Add type annotations to memory object streams in tests - Import SessionMessage for type annotations - Add pragma: no cover to lines 556 and 571 specifically Github-Issue:#1654
89d038c to
99d1873
Compare
The SSE polling reconnection code paths run in a subprocess during testing, making them difficult to cover with the main test process's coverage instrumentation. Add pragma comments to exclude these from coverage requirements: - Client _attempt_sse_reconnection method and call site - Server StreamableHTTPSessionManager.close_sse_stream method - Test callback branch for empty data handling Github-Issue:#1654
Implements SEP-1699 which enables servers to disconnect SSE connections at will by sending priming events and retry fields. This enables more efficient resource management on the server side while maintaining resumability. Key changes: - Server sends priming event (empty data with event ID) on SSE stream - Server can call close_sse_stream() to close stream while gathering events - Client auto-reconnects using server-provided retryInterval or exponential backoff - Added e2e integration tests and example server/client Github-Issue:#1654
426773f to
52603e8
Compare
Sets up test infrastructure and API surface for SEP-1699 SSE polling. Tests will fail until implementation is complete. New APIs (stubbed): - StreamableHTTPReconnectionOptions dataclass - Server: _create_priming_event(), close_sse_stream(), retry_interval - Client: resume_stream(), _get_next_reconnection_delay() - RequestContext.close_sse_stream callback Github-Issue:#1654
Implements the SSE polling behavior defined in SEP-1699: - Server sends priming event (empty data with event ID) on SSE stream - Server can call close_sse_stream() to trigger client reconnection - Client auto-reconnects using server-provided retryInterval or exponential backoff Github-Issue:#1654
52603e8 to
4adce73
Compare
|
Reworking this from scratch. |
This commit adds the API stubs and failing tests for the server-side disconnect feature that enables SSE polling. When implemented, this will allow servers to disconnect SSE streams without terminating them, triggering client reconnection for polling patterns. API stubs added: - CloseSSEStreamCallback type in message.py - close_sse_stream field in ServerMessageMetadata and RequestContext - close_sse_stream() stub in StreamableHTTPServerTransport - close_sse_stream() stub in FastMCP Context - retry_interval parameter in session manager and transport Tests added (all expected to fail until implementation): - test_streamablehttp_client_receives_priming_event - test_server_close_sse_stream_via_context - test_streamablehttp_client_auto_reconnects - test_streamablehttp_client_respects_retry_interval - test_streamablehttp_sse_polling_full_cycle - test_streamablehttp_events_replayed_after_disconnect Github-Issue:#1699
Server now sends a priming event (SSE event with ID but empty data) at the start of POST SSE streams when an EventStore is configured. This enables clients to reconnect with Last-Event-ID even if the server closes the connection before sending any actual data. Changes: - EventStore.store_event now accepts JSONRPCMessage | None (None for priming) - Server sends priming event before processing messages in sse_writer - Client calls resumption callback for empty-data events that have an ID
Server now supports closing SSE streams mid-operation via close_sse_stream(), which triggers client reconnection. Client automatically reconnects when the stream closes after receiving a priming event. Changes: - Server transport: Implement close_sse_stream() to close SSE writer - Server transport: Create callback and pass via ServerMessageMetadata - Lowlevel server: Thread close_sse_stream callback to RequestContext - FastMCP Context: Wire close_sse_stream() to call the callback - Client: Track priming events and auto-reconnect with Last-Event-ID
Server now sends the retry field in SSE priming events when retry_interval is configured. Client respects this field and waits the specified interval before reconnecting. Changes: - Server: Add retry field to priming event when retry_interval is set - Server: Extract _send_priming_event() helper method - Client: Track retry interval from SSE events - Client: Wait for retry interval before reconnecting
Prevents potential DDOS when server doesn't provide retry interval. Changes: - Always wait before reconnecting (server retry value or 1s default) - Track failed attempts only - successful reconnections reset counter - Bail after 2 consecutive failures
a8aa7ea to
e3a1c06
Compare
4bf5296 to
6170f02
Compare
Demonstrates the SSE polling pattern with close_sse_stream(): - Server: process_batch tool that checkpoints periodically - Client: auto-reconnects transparently with Last-Event-ID - Shows priming events, retry interval, and event replay
…EP-1699) - Register SSE writer in _replay_events() so subsequent close_sse_stream() calls work - Send priming event on each reconnection - Handle ClosedResourceError gracefully in both POST and GET SSE writers - Add disconnect/reconnect logging at INFO level for visibility - Add test for multiple reconnections during long-running tool calls - Remove pragma from store_event (now covered by tests)
21aafe8 to
39675e3
Compare
…ver (SEP-1699) - Add retry_interval parameter to FastMCP for SSE polling control - Add InMemoryEventStore and test_reconnection tool to everything-server - Enables SSE polling conformance test to pass (server-sse-polling scenario)
238d4a2 to
01f5876
Compare
01f5876 to
f54b18a
Compare
|
Reworked this from the ground by going from tests first. The best way to review this is probably look at the tests in Then looking through The |
| checkpoint_every = arguments.get("checkpoint_every", 3) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've ran this example with a proxy in the middle, saved the request/results and I checked the event IDs being used. I noticed that the priming event ID is not used in this scenario because the connection is closed after sending messages, so the client ends up using the event ID from the last message, not from the priming event.
I'm not sure if the intention was to test the usage of the priming event, but if we wanted to add a test to this, then my local test with Claude indicates you'd have to close a connection before sending the first message:
if test_priming and ctx.close_sse_stream:
logger.info("Testing priming: closing SSE stream immediately (before any events)")
await ctx.close_sse_stream()
it does work successfully btw, this is more of a comment in case we wanted to add a specific flag to test this.
| async with sse_stream_writer, request_stream_reader: | ||
| # Send priming event for SSE resumability | ||
| await self._send_priming_event(request_id, sse_stream_writer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[future improvement idea] not something for now, but as I understand it, a context manager would be nice here to make the code even cleaner and ensure we don't forget to clean up the writers. i.e.:
@asynccontextmanager
async def _managed_sse_stream(self, request_id, sse_writer, request_reader):
self._sse_stream_writers[request_id] = sse_writer
try:
async with sse_writer, request_reader:
yield
finally:
self._sse_stream_writers.pop(request_id, None)
await self._clean_up_memory_streams(request_id)
Usage:
async with self._managed_sse_stream(request_id, sse_stream_writer, request_stream_reader):
...| # Then send the message to be processed by the server | ||
| metadata = ServerMessageMetadata(request_context=request) | ||
| session_message = SessionMessage(message, metadata=metadata) | ||
| session_message = self._create_session_message(message, request, request_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_create_session_message was a nice solution for passing around the close stream
| # If stream ID not in mapping, create it | ||
| if stream_id and stream_id not in self._request_streams: | ||
| # Register SSE writer so close_sse_stream() can close it | ||
| self._sse_stream_writers[stream_id] = sse_stream_writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[related to other comment] just tying it back to the contextmanager idea, if we had that, then we would not have to remember to close it here
|
|
||
| await sse_stream_writer.send(event_data) | ||
| except anyio.ClosedResourceError: | ||
| # Expected when close_sse_stream() is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] I'm reading this relatively fast, so might be an incorrect question:
do we need to pop self._sse_stream_writers.pop(request_id, None) here?
| await ctx.read_stream_writer.send(e) # pragma: no cover | ||
| return # Normal completion, no reconnect needed | ||
| except Exception as e: # pragma: no cover | ||
| logger.debug(f"SSE stream ended: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm bit confused about the change in wording here. Is this now related to the stream ending? I would expect this only happens on error reading stream?
| # Stream ended without response - reconnect if we received an event with ID | ||
| if last_event_id is not None: # pragma: no branch | ||
| logger.info("SSE stream disconnected, reconnecting...") | ||
| await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[future] I think this is aligned with how we current do things in the SDK, but would love to eventually revisit these things to make it easier to handle this sort of things in a distributed manner. It's related to my previous comment here, but essentially I'd like to allow client developers to be able handle reconnection in different ways. This is 100% out of scope for this feature, specially because we'd need to change so many other things in the parent level, but just leaving a breadcrumb for the future.
| headers=headers, | ||
| timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout), | ||
| ) as event_source: | ||
| event_source.response.raise_for_status() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[curious] was there a challenge with reading the response before raising?
|
|
||
| # Stream ended again without response - reconnect again (reset attempt counter) | ||
| logger.info("SSE stream disconnected, reconnecting...") | ||
| await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] why do we need to reset the attempt counter here?
crondinini-ant
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great to me

Summary
Implements SEP-1699 which enables servers to disconnect SSE connections at will by sending priming events and retry fields.
Motivation and Context
SEP-1699 introduces SSE polling behavior that allows servers to control client reconnection timing and close connections gracefully. This enables more efficient resource management on the server side while maintaining resumability.
We implement this on the
POSTSSE stream as implied by the SEP language linked above. I.e. when a server establishes an SSE stream:close_sse_streamto close the stream while still gathering the events.retryIntervalsupplied by the server before disconnection.How Has This Been Tested?
Example server and client:
Breaking Changes
None. Client falls back to exponential backoff if no retry field is provided.
Types of changes
Checklist