Skip to content

Commit d7fd279

Browse files
committed
adds pause resume flows for streaming calls
1 parent 1279747 commit d7fd279

11 files changed

Lines changed: 1258 additions & 14 deletions

File tree

AGENTS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ ctx.WithValue(key, value) // Chainable variant
245245

246246
**Gotcha**: `BlockRestrictedWrites()` silently drops writes to reserved keys. This prevents plugins from accidentally overwriting internal state.
247247

248+
**Hard rule — never store stream-sized data in `BifrostContext`.** Context holds small handles only: IDs, durations, booleans, interface pointers. Any per-request state that scales with stream content (chunk buffers, accumulated payloads, replay queues, large per-request slices/maps) must live in a top-level manager keyed by `RequestID`, not in `ctx`. Reference implementations:
249+
250+
- `framework/streaming.Accumulator` — owns a `sync.Map` of per-stream `StreamAccumulator` entries keyed by `RequestID`. Only `BifrostContextKeyAccumulatorID` (the ID string) is stored on the context; the chunk buffers live in the manager. The pause/resume gate (`gate.go`) extends the same per-stream entry with a state machine — again, **no buffer in ctx**.
251+
- The `Tracer` interface (in ctx as a small pointer) is the access path for plugins/providers to reach managers without putting bulky data on the context itself.
252+
253+
When in doubt: if your new ctx key would hold a slice/map that grows with request content, route the storage through a manager and keep only the ID in ctx.
254+
248255
---
249256

250257
## Core Patterns

core/bifrost.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4982,9 +4982,9 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem
49824982
// Guarded send: if the consumer abandons outputStream (client
49834983
// disconnect, ctx cancel), drain the upstream shortCircuit.Stream
49844984
// so its producer can exit cleanly instead of blocking on its send.
4985-
select {
4986-
case outputStream <- streamResponse:
4987-
case <-ctx.Done():
4985+
// GateSendChunk routes through the pause/resume gate when a plugin
4986+
// has engaged it; otherwise it's a bare ctx-guarded channel send.
4987+
if !providerUtils.GateSendChunk(ctx, streamResponse, outputStream) {
49884988
for range shortCircuit.Stream {
49894989
}
49904990
return

core/providers/utils/utils.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,35 @@ func BuildClientStreamChunk(ctx context.Context, processedResponse *schemas.Bifr
18761876
return streamResponse
18771877
}
18781878

1879+
// GateSendChunk routes a stream chunk through the tracer's pause/resume/end
1880+
// gate ONLY when a plugin has engaged the gate for this stream (via
1881+
// ctx.PauseStream/ResumeStream/EndStream, which sets BifrostContextKeyStreamGated).
1882+
// Streams that never engage the gate (the overwhelmingly common case) take the
1883+
// fast path: a direct channel send with ctx.Done() guard — same code as before
1884+
// the gate was introduced, no extra lookups or locks.
1885+
func GateSendChunk(ctx *schemas.BifrostContext, chunk *schemas.BifrostStreamChunk, responseChan chan *schemas.BifrostStreamChunk) bool {
1886+
if gated, _ := ctx.Value(schemas.BifrostContextKeyStreamGated).(bool); gated {
1887+
isFinal := false
1888+
if v := ctx.Value(schemas.BifrostContextKeyStreamEndIndicator); v != nil {
1889+
if b, ok := v.(bool); ok {
1890+
isFinal = b
1891+
}
1892+
}
1893+
isHardErr := chunk != nil && chunk.BifrostError != nil && chunk.BifrostError.IsBifrostError
1894+
if tracer, ok := ctx.Value(schemas.BifrostContextKeyTracer).(schemas.Tracer); ok && tracer != nil {
1895+
if traceID, ok := ctx.Value(schemas.BifrostContextKeyTraceID).(string); ok && traceID != "" {
1896+
return tracer.GateSend(traceID, chunk, isFinal, isHardErr, responseChan, ctx)
1897+
}
1898+
}
1899+
}
1900+
select {
1901+
case responseChan <- chunk:
1902+
return true
1903+
case <-ctx.Done():
1904+
return false
1905+
}
1906+
}
1907+
18791908
// ProcessAndSendResponse handles post-hook processing and sends the response to the channel.
18801909
// This utility reduces code duplication across streaming implementations by encapsulating
18811910
// the common pattern of running post hooks, handling errors, and sending responses with
@@ -1910,9 +1939,7 @@ func ProcessAndSendResponse(
19101939

19111940
streamResponse := BuildClientStreamChunk(ctx, processedResponse, processedError)
19121941

1913-
select {
1914-
case responseChan <- streamResponse:
1915-
case <-ctx.Done():
1942+
if !GateSendChunk(ctx, streamResponse, responseChan) {
19161943
return
19171944
}
19181945

@@ -1952,10 +1979,7 @@ func ProcessAndSendBifrostError(
19521979

19531980
streamResponse := BuildClientStreamChunk(ctx, processedResponse, processedError)
19541981

1955-
select {
1956-
case responseChan <- streamResponse:
1957-
case <-ctx.Done():
1958-
}
1982+
GateSendChunk(ctx, streamResponse, responseChan)
19591983

19601984
// Check if this is the final chunk and complete deferred span with post-processed data
19611985
if isFinalChunk := ctx.Value(schemas.BifrostContextKeyStreamEndIndicator); isFinalChunk != nil {
@@ -2208,10 +2232,7 @@ func ProcessAndSendError(
22082232
streamResponse.BifrostError = processedError
22092233
}
22102234

2211-
select {
2212-
case responseChan <- streamResponse:
2213-
case <-ctx.Done():
2214-
}
2235+
GateSendChunk(ctx, streamResponse, responseChan)
22152236
}
22162237

22172238
// CreateBifrostTextCompletionChunkResponse creates a bifrost text completion chunk response.

core/schemas/bifrost.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ const (
194194
BifrostContextKeyNumberOfRetries BifrostContextKey = "bifrost-number-of-retries" // int (to store the number of retries (set by bifrost - DO NOT SET THIS MANUALLY))
195195
BifrostContextKeyFallbackIndex BifrostContextKey = "bifrost-fallback-index" // int (to store the fallback index (set by bifrost - DO NOT SET THIS MANUALLY)) 0 for primary, 1 for first fallback, etc.
196196
BifrostContextKeyStreamEndIndicator BifrostContextKey = "bifrost-stream-end-indicator" // bool (set by bifrost - DO NOT SET THIS MANUALLY))
197+
BifrostContextKeyStreamGated BifrostContextKey = "bifrost-stream-gated" // bool (set by ctx.PauseStream/ResumeStream/EndStream when a plugin first engages the pause/resume gate; provider helpers use this as a fast-path check to skip Tracer.GateSend on streams that never engage the gate)
197198
BifrostContextKeyStreamIdleTimeout BifrostContextKey = "bifrost-stream-idle-timeout" // time.Duration (per-chunk idle timeout for streaming)
198199
BifrostContextKeySkipKeySelection BifrostContextKey = "bifrost-skip-key-selection" // bool (will pass an empty key to the provider)
199200
BifrostContextKeyExtraHeaders BifrostContextKey = "bifrost-extra-headers" // map[string][]string

core/schemas/context.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,57 @@ func (bc *BifrostContext) GetParentCtxWithUserValues() context.Context {
354354
return parentCtx
355355
}
356356

357+
// PauseStream marks the active streaming response associated with this context
358+
// as paused. While paused, chunks continue to flow through PostLLMHook (so
359+
// plugins can still inspect them), but they are buffered instead of delivered
360+
// to the client. Buffered chunks are flushed in order when ResumeStream is
361+
// called. Idempotent. No-op if no Tracer or trace ID is present in ctx.
362+
//
363+
// Calling this method engages the pause/resume gate for the stream: provider
364+
// send sites switch from a direct channel send to Tracer.GateSend. Streams that
365+
// never call Pause/Resume/End pay no extra cost.
366+
func (bc *BifrostContext) PauseStream() {
367+
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
368+
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
369+
if tr == nil || tid == "" {
370+
return
371+
}
372+
bc.SetValue(BifrostContextKeyStreamGated, true)
373+
tr.PauseStream(tid)
374+
}
375+
376+
// ResumeStream resumes a previously paused stream. Buffered chunks are flushed
377+
// to the client in order, then live streaming continues. Idempotent. No-op if
378+
// no Tracer or trace ID is present in ctx.
379+
//
380+
// Engages the pause/resume gate (see PauseStream).
381+
func (bc *BifrostContext) ResumeStream() {
382+
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
383+
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
384+
if tr == nil || tid == "" {
385+
return
386+
}
387+
bc.SetValue(BifrostContextKeyStreamGated, true)
388+
tr.ResumeStream(tid)
389+
}
390+
391+
// EndStream terminates the active streaming response. Any buffered chunks are
392+
// flushed first; if err is non-nil it is then delivered as a final error chunk.
393+
// After EndStream returns, all further provider chunks for this stream are
394+
// dropped (PostLLMHook still fires, but no client delivery happens). Idempotent.
395+
// No-op if no Tracer or trace ID is present in ctx.
396+
//
397+
// Engages the pause/resume gate (see PauseStream).
398+
func (bc *BifrostContext) EndStream(err *BifrostError) {
399+
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
400+
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
401+
if tr == nil || tid == "" {
402+
return
403+
}
404+
bc.SetValue(BifrostContextKeyStreamGated, true)
405+
tr.EndStream(tid, err)
406+
}
407+
357408
// AppendRoutingEngineLog appends a routing engine log entry to the context.
358409
// Parameters:
359410
// - ctx: The Bifrost context

core/schemas/tracer.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,32 @@ type Tracer interface {
114114
// The ctx parameter must contain the stream end indicator for proper final chunk detection.
115115
ProcessStreamingChunk(traceID string, isFinalChunk bool, result *BifrostResponse, err *BifrostError) *StreamAccumulatorResult
116116

117+
// PauseStream marks the streaming response identified by traceID as paused.
118+
// While paused, post-processed chunks are buffered (not delivered) but plugin
119+
// hooks continue to fire. Idempotent. No-op if no active stream is found.
120+
PauseStream(traceID string)
121+
122+
// ResumeStream resumes a previously paused stream. Buffered chunks are flushed
123+
// to the client in order, then live streaming continues. Idempotent.
124+
ResumeStream(traceID string)
125+
126+
// EndStream terminates the stream. If err is non-nil, it is delivered to the
127+
// client as a final error chunk after any buffered chunks are flushed. After
128+
// EndStream, all further chunks for this stream are dropped (post-hooks still
129+
// run but no client delivery happens). Idempotent.
130+
EndStream(traceID string, err *BifrostError)
131+
132+
// GateSend is called by stream producers (provider helpers) instead of writing
133+
// directly to the response channel. It implements the pause/resume/end gate:
134+
// - Active state: chunk is forwarded to ch (with ctx.Done() guard)
135+
// - Paused state: chunk is buffered for later replay
136+
// - Ended state: chunk is dropped
137+
// Final chunks (isFinal) and hard provider errors (isHardErr) bypass the gate
138+
// and force-flush + transition to Ended.
139+
// Returns true if the chunk was handled (delivered or buffered), false if the
140+
// caller should stop sending (ctx done or stream ended).
141+
GateSend(traceID string, chunk *BifrostStreamChunk, isFinal, isHardErr bool, ch chan *BifrostStreamChunk, ctx *BifrostContext) bool
142+
117143
// AttachPluginLogs appends plugin log entries to the trace identified by traceID.
118144
// Thread-safe. Should be called after plugin hooks complete, before trace completion.
119145
AttachPluginLogs(traceID string, logs []PluginLogEntry)
@@ -187,6 +213,30 @@ func (n *NoOpTracer) ProcessStreamingChunk(_ string, _ bool, _ *BifrostResponse,
187213
return nil
188214
}
189215

216+
// PauseStream does nothing.
217+
func (n *NoOpTracer) PauseStream(_ string) {}
218+
219+
// ResumeStream does nothing.
220+
func (n *NoOpTracer) ResumeStream(_ string) {}
221+
222+
// EndStream does nothing.
223+
func (n *NoOpTracer) EndStream(_ string, _ *BifrostError) {}
224+
225+
// GateSend forwards the chunk directly to the channel with ctx.Done() guard.
226+
// NoOpTracer has no gate state, so this is a pure passthrough.
227+
func (n *NoOpTracer) GateSend(_ string, chunk *BifrostStreamChunk, _ bool, _ bool, ch chan *BifrostStreamChunk, ctx *BifrostContext) bool {
228+
if ctx == nil {
229+
ch <- chunk
230+
return true
231+
}
232+
select {
233+
case ch <- chunk:
234+
return true
235+
case <-ctx.Done():
236+
return false
237+
}
238+
}
239+
190240
// AttachPluginLogs does nothing.
191241
func (n *NoOpTracer) AttachPluginLogs(_ string, _ []PluginLogEntry) {}
192242

framework/streaming/accumulator.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ func (a *Accumulator) createStreamAccumulator(requestID string) *StreamAccumulat
158158
mu: sync.Mutex{},
159159
Timestamp: now,
160160
StartTimestamp: now, // Set default StartTimestamp for proper TTFT/latency calculation
161+
gateState: StreamStateActive,
162+
gatePausedAt: -1,
161163
}
164+
sc.gateCond = sync.NewCond(&sc.mu)
162165
a.streamAccumulators.Store(requestID, sc)
163166
return sc
164167
}
@@ -193,7 +196,10 @@ func (a *Accumulator) getOrCreateStreamAccumulator(requestID string) *StreamAccu
193196
mu: sync.Mutex{},
194197
Timestamp: now,
195198
StartTimestamp: now,
199+
gateState: StreamStateActive,
200+
gatePausedAt: -1,
196201
}
202+
newAcc.gateCond = sync.NewCond(&newAcc.mu)
197203

198204
// LoadOrStore atomically: if key exists, return existing; else store new
199205
actual, _ := a.streamAccumulators.LoadOrStore(requestID, newAcc)
@@ -361,6 +367,15 @@ func (a *Accumulator) cleanupStreamAccumulator(requestID string) {
361367
if accumulator, exists := a.streamAccumulators.Load(requestID); exists {
362368
acc := accumulator.(*StreamAccumulator)
363369

370+
// Force-end the gate so any running flusher goroutine wakes up and
371+
// exits. Caller already holds acc.mu, so we can mutate gate fields.
372+
if acc.gateState != StreamStateEnded {
373+
acc.gateState = StreamStateEnded
374+
if acc.gateCond != nil {
375+
acc.gateCond.Broadcast()
376+
}
377+
}
378+
364379
// Return all chunks to the pool before deleting
365380
for _, chunk := range acc.ChatStreamChunks {
366381
a.putChatStreamChunk(chunk)

0 commit comments

Comments
 (0)