Skip to content

Commit e2bc0b5

Browse files
fix: mcp sse hang fixes
1 parent 553a797 commit e2bc0b5

1 file changed

Lines changed: 102 additions & 3 deletions

File tree

transports/bifrost-http/handlers/mcpserver.go

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"strings"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/bytedance/sonic"
@@ -21,6 +22,12 @@ import (
2122
"github.com/valyala/fasthttp"
2223
)
2324

25+
// sseHeartbeatInterval is the cadence of SSE comment pings on the MCP SSE
26+
// stream. It must stay below typical proxy/load-balancer idle timeouts (60s on
27+
// most stacks) so connections aren't reaped, while being large enough to avoid
28+
// gratuitous wake-ups on idle clients.
29+
const sseHeartbeatInterval = 15 * time.Second
30+
2431
// MCPToolExecutor interface defines the method needed for executing MCP tools
2532
type MCPToolManager interface {
2633
GetAvailableMCPTools(ctx context.Context) []schemas.ChatTool
@@ -147,10 +154,29 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
147154
return
148155
}
149156

157+
// Signal to transport-plugin and tracing middlewares that this is a streaming
158+
// response. Without this, fasthttpResponseToHTTPResponse calls ctx.Response.Body()
159+
// during post-hook processing, which materializes the SSE body stream and
160+
// deadlocks waiting for an EOF that only arrives after the goroutine exits.
161+
ctx.SetUserValue(schemas.BifrostContextKeyDeferTraceCompletion, true)
162+
163+
// Pre-allocate atomic.Value slot for the transport post-hook completer.
164+
// TransportInterceptorMiddleware stores the completer into this slot after next(ctx)
165+
// returns. The goroutine reads from the closure-captured pointer, avoiding any ctx
166+
// access after the handler returns (fasthttp recycles RequestCtx).
167+
var completerSlot atomic.Value
168+
ctx.SetUserValue(schemas.BifrostContextKeyTransportPostHookCompleter, &completerSlot)
169+
170+
// Get the trace completer function for use in the streaming callback.
171+
// Signature: func([]schemas.PluginLogEntry) — accepts transport plugin logs so it
172+
// never needs to read from ctx.UserValue (ctx may be recycled).
173+
traceCompleter, _ := ctx.UserValue(schemas.BifrostContextKeyTraceCompleter).(func([]schemas.PluginLogEntry))
174+
150175
// Set SSE headers
151176
ctx.SetContentType("text/event-stream")
152177
ctx.Response.Header.Set("Cache-Control", "no-cache")
153178
ctx.Response.Header.Set("Connection", "keep-alive")
179+
ctx.Response.Header.Set("X-Accel-Buffering", "no")
154180

155181
// Convert context
156182
bifrostCtx, cancel := lib.ConvertToBifrostContext(ctx, h.config)
@@ -162,9 +188,66 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
162188
ctx.Response.SetBodyStream(reader, -1)
163189

164190
go func() {
191+
var transportLogs []schemas.PluginLogEntry
192+
completerRan := false
193+
// runCompleter invokes the transport post-hook completer at most once.
194+
// sendSSEOnError=true emits plugin errors as SSE "event: error" frames so the
195+
// client sees them; =false logs server-side only (defer fallback, after stream
196+
// termination). The MCP SSE handler has no happy-path completion point, so it
197+
// only ever invokes this from the defer with sendSSEOnError=false.
198+
runCompleter := func(sendSSEOnError bool) {
199+
if completerRan {
200+
return
201+
}
202+
// Bounded wait for TransportInterceptorMiddleware to publish the completer.
203+
// It calls slot.Store after next(ctx) returns, which races with this goroutine
204+
// on fast/empty streams. 100ms is ample — the store runs a few instructions
205+
// after the handler returns.
206+
var loaded any
207+
deadline := time.Now().Add(100 * time.Millisecond)
208+
for {
209+
if loaded = completerSlot.Load(); loaded != nil {
210+
break
211+
}
212+
if time.Now().After(deadline) {
213+
break
214+
}
215+
time.Sleep(time.Millisecond)
216+
}
217+
if loaded == nil {
218+
return
219+
}
220+
postHookCompleter, ok := loaded.(func() ([]schemas.PluginLogEntry, error))
221+
if !ok {
222+
return
223+
}
224+
completerRan = true
225+
logs, err := postHookCompleter()
226+
if err != nil {
227+
if sendSSEOnError {
228+
errorJSON, marshalErr := sonic.Marshal(map[string]string{"error": err.Error()})
229+
if marshalErr == nil {
230+
reader.SendError(errorJSON)
231+
}
232+
} else {
233+
logger.Warn("transport post-hook failed after stream terminated: %v", err)
234+
}
235+
}
236+
transportLogs = logs
237+
}
238+
165239
defer func() {
240+
// Run the deferred transport post-hook completer before cancelling the
241+
// context so plugins see a live context. Errors are logged server-side
242+
// only — the stream is already closing.
243+
runCompleter(false)
166244
cancel()
167245
reader.Done()
246+
// Complete the trace after streaming finishes, passing transport plugin logs.
247+
// This ensures all spans are properly ended before the trace is sent to OTEL.
248+
if traceCompleter != nil {
249+
traceCompleter(transportLogs)
250+
}
168251
}()
169252

170253
// Send initial connection message
@@ -177,11 +260,27 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
177260
buf = append(buf, "data: "...)
178261
buf = append(buf, initJSON...)
179262
buf = append(buf, '\n', '\n')
180-
reader.Send(buf)
263+
if !reader.Send(buf) {
264+
return
265+
}
181266
}
182267

183-
// Wait for context cancellation (client disconnect or server-side cancel)
184-
<-(*bifrostCtx).Done()
268+
// Periodic SSE comment heartbeats keep idle connections alive through
269+
// proxies and let us detect client disconnect via reader.Send() returning
270+
// false — fasthttp.RequestCtx never cancels bifrostCtx on its own.
271+
ticker := time.NewTicker(sseHeartbeatInterval)
272+
defer ticker.Stop()
273+
ping := []byte(": ping\n\n")
274+
for {
275+
select {
276+
case <-ticker.C:
277+
if !reader.Send(ping) {
278+
return
279+
}
280+
case <-(*bifrostCtx).Done():
281+
return
282+
}
283+
}
185284
}()
186285
}
187286

0 commit comments

Comments
 (0)