Skip to content

Commit d0922ab

Browse files
committed
mcp: allow SSE messages with empty data (SEP-1699)
1 parent 1fb6e20 commit d0922ab

File tree

3 files changed

+59
-1
lines changed

3 files changed

+59
-1
lines changed

conformance/baseline.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,3 @@ client:
1919
- auth/client-credentials-jwt
2020
- auth/client-credentials-basic
2121
- auth/pre-registration
22-
- sse-retry

mcp/streamable.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,15 @@ func (c *streamableClientConn) processStream(ctx context.Context, requestSummary
19351935
reconnectDelay = time.Duration(n) * time.Millisecond
19361936
}
19371937
}
1938+
1939+
// According to SSE specification
1940+
// (https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation)
1941+
// events with an empty data buffer are allowed.
1942+
// In MCP these can be priming events (SEP-1699) that carry only a Last-Event-ID for stream resumption.
1943+
if len(evt.Data) == 0 {
1944+
continue
1945+
}
1946+
19381947
// According to SSE spec, events with no name default to "message"
19391948
if evt.Name != "" && evt.Name != "message" {
19401949
continue

mcp/streamable_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,6 +2218,56 @@ collectLoop:
22182218
}
22192219
}
22202220

2221+
// TestProcessStreamPrimingEvent verifies that the streamable client correctly ignores
2222+
// SSE events with empty data buffers, which are used as priming events (e.g. SEP-1699).
2223+
func TestProcessStreamPrimingEvent(t *testing.T) {
2224+
// We create a mock response with a priming event (empty data, with an ID),
2225+
// followed by a normal event.
2226+
sseData := `id: 123
2227+
2228+
id: 124
2229+
data: {"jsonrpc":"2.0","id":1,"result":{}}
2230+
2231+
`
2232+
2233+
ctx := t.Context()
2234+
resp := &http.Response{
2235+
StatusCode: http.StatusOK,
2236+
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
2237+
Body: io.NopCloser(strings.NewReader(sseData)),
2238+
}
2239+
2240+
incoming := make(chan jsonrpc.Message, 10)
2241+
done := make(chan struct{})
2242+
2243+
conn := &streamableClientConn{
2244+
ctx: ctx,
2245+
done: done,
2246+
incoming: incoming,
2247+
failed: make(chan struct{}),
2248+
logger: ensureLogger(nil),
2249+
}
2250+
2251+
lastID, _, clientClosed := conn.processStream(ctx, "test", resp, nil)
2252+
2253+
if clientClosed {
2254+
t.Fatalf("processStream was unexpectedly closed by client")
2255+
}
2256+
2257+
if lastID != "124" {
2258+
t.Errorf("lastEventID = %q, want %q", lastID, "124")
2259+
}
2260+
2261+
select {
2262+
case msg := <-incoming:
2263+
if res, ok := msg.(*jsonrpc.Response); !(ok && res.ID == jsonrpc2.Int64ID(1)) {
2264+
t.Errorf("got unexpected message: %v", msg)
2265+
}
2266+
default:
2267+
t.Errorf("expected a JSON-RPC message to be produced")
2268+
}
2269+
}
2270+
22212271
// TestScanEventsPingFiltering is a unit test for the low-level event scanning
22222272
// with ping events to verify scanEvents properly parses all event types.
22232273
func TestScanEventsPingFiltering(t *testing.T) {

0 commit comments

Comments
 (0)