77 "fmt"
88 "strings"
99 "sync"
10+ "sync/atomic"
1011 "time"
1112
1213 "github.com/bytedance/sonic"
@@ -21,6 +22,12 @@ import (
2122 "github.com/valyala/fasthttp"
2223)
2324
25+ // sseHeartbeatInterval is the cadence of SSE comment pings on the MCP SSE
26+ // stream. It must stay below typical proxy/load-balancer idle timeouts (60s on
27+ // most stacks) so connections aren't reaped, while being large enough to avoid
28+ // gratuitous wake-ups on idle clients.
29+ const sseHeartbeatInterval = 15 * time .Second
30+
2431// MCPToolExecutor interface defines the method needed for executing MCP tools
2532type MCPToolManager interface {
2633 GetAvailableMCPTools (ctx context.Context ) []schemas.ChatTool
@@ -147,10 +154,29 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
147154 return
148155 }
149156
157+ // Signal to transport-plugin and tracing middlewares that this is a streaming
158+ // response. Without this, fasthttpResponseToHTTPResponse calls ctx.Response.Body()
159+ // during post-hook processing, which materializes the SSE body stream and
160+ // deadlocks waiting for an EOF that only arrives after the goroutine exits.
161+ ctx .SetUserValue (schemas .BifrostContextKeyDeferTraceCompletion , true )
162+
163+ // Pre-allocate atomic.Value slot for the transport post-hook completer.
164+ // TransportInterceptorMiddleware stores the completer into this slot after next(ctx)
165+ // returns. The goroutine reads from the closure-captured pointer, avoiding any ctx
166+ // access after the handler returns (fasthttp recycles RequestCtx).
167+ var completerSlot atomic.Value
168+ ctx .SetUserValue (schemas .BifrostContextKeyTransportPostHookCompleter , & completerSlot )
169+
170+ // Get the trace completer function for use in the streaming callback.
171+ // Signature: func([]schemas.PluginLogEntry) — accepts transport plugin logs so it
172+ // never needs to read from ctx.UserValue (ctx may be recycled).
173+ traceCompleter , _ := ctx .UserValue (schemas .BifrostContextKeyTraceCompleter ).(func ([]schemas.PluginLogEntry ))
174+
150175 // Set SSE headers
151176 ctx .SetContentType ("text/event-stream" )
152177 ctx .Response .Header .Set ("Cache-Control" , "no-cache" )
153178 ctx .Response .Header .Set ("Connection" , "keep-alive" )
179+ ctx .Response .Header .Set ("X-Accel-Buffering" , "no" )
154180
155181 // Convert context
156182 bifrostCtx , cancel := lib .ConvertToBifrostContext (ctx , h .config )
@@ -162,9 +188,66 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
162188 ctx .Response .SetBodyStream (reader , - 1 )
163189
164190 go func () {
191+ var transportLogs []schemas.PluginLogEntry
192+ completerRan := false
193+ // runCompleter invokes the transport post-hook completer at most once.
194+ // sendSSEOnError=true emits plugin errors as SSE "event: error" frames so the
195+ // client sees them; =false logs server-side only (defer fallback, after stream
196+ // termination). The MCP SSE handler has no happy-path completion point, so it
197+ // only ever invokes this from the defer with sendSSEOnError=false.
198+ runCompleter := func (sendSSEOnError bool ) {
199+ if completerRan {
200+ return
201+ }
202+ // Bounded wait for TransportInterceptorMiddleware to publish the completer.
203+ // It calls slot.Store after next(ctx) returns, which races with this goroutine
204+ // on fast/empty streams. 100ms is ample — the store runs a few instructions
205+ // after the handler returns.
206+ var loaded any
207+ deadline := time .Now ().Add (100 * time .Millisecond )
208+ for {
209+ if loaded = completerSlot .Load (); loaded != nil {
210+ break
211+ }
212+ if time .Now ().After (deadline ) {
213+ break
214+ }
215+ time .Sleep (time .Millisecond )
216+ }
217+ if loaded == nil {
218+ return
219+ }
220+ postHookCompleter , ok := loaded .(func () ([]schemas.PluginLogEntry , error ))
221+ if ! ok {
222+ return
223+ }
224+ completerRan = true
225+ logs , err := postHookCompleter ()
226+ if err != nil {
227+ if sendSSEOnError {
228+ errorJSON , marshalErr := sonic .Marshal (map [string ]string {"error" : err .Error ()})
229+ if marshalErr == nil {
230+ reader .SendError (errorJSON )
231+ }
232+ } else {
233+ logger .Warn ("transport post-hook failed after stream terminated: %v" , err )
234+ }
235+ }
236+ transportLogs = logs
237+ }
238+
165239 defer func () {
240+ // Run the deferred transport post-hook completer before cancelling the
241+ // context so plugins see a live context. Errors are logged server-side
242+ // only — the stream is already closing.
243+ runCompleter (false )
166244 cancel ()
167245 reader .Done ()
246+ // Complete the trace after streaming finishes, passing transport plugin logs.
247+ // This ensures all spans are properly ended before the trace is sent to OTEL.
248+ if traceCompleter != nil {
249+ traceCompleter (transportLogs )
250+ }
168251 }()
169252
170253 // Send initial connection message
@@ -177,11 +260,27 @@ func (h *MCPServerHandler) handleMCPServerSSE(ctx *fasthttp.RequestCtx) {
177260 buf = append (buf , "data: " ... )
178261 buf = append (buf , initJSON ... )
179262 buf = append (buf , '\n' , '\n' )
180- reader .Send (buf )
263+ if ! reader .Send (buf ) {
264+ return
265+ }
181266 }
182267
183- // Wait for context cancellation (client disconnect or server-side cancel)
184- <- (* bifrostCtx ).Done ()
268+ // Periodic SSE comment heartbeats keep idle connections alive through
269+ // proxies and let us detect client disconnect via reader.Send() returning
270+ // false — fasthttp.RequestCtx never cancels bifrostCtx on its own.
271+ ticker := time .NewTicker (sseHeartbeatInterval )
272+ defer ticker .Stop ()
273+ ping := []byte (": ping\n \n " )
274+ for {
275+ select {
276+ case <- ticker .C :
277+ if ! reader .Send (ping ) {
278+ return
279+ }
280+ case <- (* bifrostCtx ).Done ():
281+ return
282+ }
283+ }
185284 }()
186285}
187286
0 commit comments