Skip to content

Commit 4bf5296

Browse files
Fix close_sse_stream to close reader and improve demo output (SEP-1699)
- Close request stream receiver in close_sse_stream() so sse_writer loop exits cleanly instead of hitting ClosedResourceError - Log reconnections at INFO level so they're visible without DEBUG - Remove misleading "Watch DEBUG logs" text from demo client - Fix type annotation (str instead of str | None)
1 parent 7a83017 commit 4bf5296

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

examples/clients/sse-polling-client/mcp_sse_polling_client/main.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ async def run_demo(url: str, items: int, checkpoint_every: int) -> None:
3232
print(f"{'=' * 60}")
3333
print(f"Server URL: {url}")
3434
print(f"Processing {items} items with checkpoints every {checkpoint_every}")
35-
print("Watch DEBUG logs for reconnection messages")
3635
print(f"{'=' * 60}\n")
3736

3837
async with streamablehttp_client(url) as (read_stream, write_stream, _):
@@ -47,8 +46,7 @@ async def run_demo(url: str, items: int, checkpoint_every: int) -> None:
4746
print(f"Available tools: {[t.name for t in tools.tools]}\n")
4847

4948
# Call the process_batch tool
50-
print(f"Calling process_batch(items={items}, checkpoint_every={checkpoint_every})...")
51-
print("Watch for reconnections in DEBUG logs\n")
49+
print(f"Calling process_batch(items={items}, checkpoint_every={checkpoint_every})...\n")
5250
print("-" * 40)
5351

5452
result = await session.call_tool(

src/mcp/client/streamable_http.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,10 +399,10 @@ async def _handle_reconnection(
399399
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
400400
) as event_source:
401401
event_source.response.raise_for_status()
402-
logger.debug("Reconnection GET SSE connection established")
402+
logger.info("Reconnected to SSE stream")
403403

404404
# Track for potential further reconnection
405-
reconnect_last_event_id: str | None = last_event_id
405+
reconnect_last_event_id: str = last_event_id
406406
reconnect_retry_ms = retry_interval_ms
407407

408408
async for sse in event_source.aiter_sse():
@@ -422,10 +422,7 @@ async def _handle_reconnection(
422422
return
423423

424424
# Stream ended again without response - reconnect again (reset attempt counter)
425-
if reconnect_last_event_id is not None:
426-
await self._handle_reconnection(
427-
ctx, reconnect_last_event_id, reconnect_retry_ms, 0
428-
)
425+
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
429426
except Exception as e:
430427
logger.debug(f"Reconnection failed: {e}")
431428
# Try to reconnect again if we still have an event ID

src/mcp/server/streamable_http.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import Any
1919

2020
import anyio
21+
from anyio import ClosedResourceError
2122
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
2223
from pydantic import ValidationError
2324
from sse_starlette import EventSourceResponse
@@ -207,6 +208,10 @@ def close_sse_stream(self, request_id: RequestId) -> None:
207208
writer = self._sse_stream_writers.pop(request_id, None)
208209
if writer:
209210
writer.close()
211+
# Only close request stream if we had an SSE writer for it
212+
# (i.e., this is the original POST, not a GET reconnect)
213+
if request_id in self._request_streams:
214+
self._request_streams[request_id][1].close()
210215

211216
def _create_session_message(
212217
self,
@@ -545,6 +550,9 @@ async def sse_writer():
545550
JSONRPCResponse | JSONRPCError,
546551
):
547552
break
553+
except ClosedResourceError:
554+
# Expected when close_sse_stream() is called
555+
logger.debug("SSE stream closed by close_sse_stream()")
548556
except Exception:
549557
logger.exception("Error in SSE writer")
550558
finally:

0 commit comments

Comments
 (0)