Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ ctx.WithValue(key, value) // Chainable variant

**Gotcha**: `BlockRestrictedWrites()` silently drops writes to reserved keys. This prevents plugins from accidentally overwriting internal state.

**Hard rule — never store stream-sized data in `BifrostContext`.** Context holds small handles only: IDs, durations, booleans, interface pointers. Any per-request state that scales with stream content (chunk buffers, accumulated payloads, replay queues, large per-request slices/maps) must live in a top-level manager keyed by `RequestID`, not in `ctx`. Reference implementations:

- `framework/streaming.Accumulator` — owns a `sync.Map` of per-stream `StreamAccumulator` entries keyed by `RequestID`. Only `BifrostContextKeyAccumulatorID` (the ID string) is stored on the context; the chunk buffers live in the manager. The pause/resume gate (`gate.go`) extends the same per-stream entry with a state machine — again, **no buffer in ctx**.
- The `Tracer` interface (in ctx as a small pointer) is the access path for plugins/providers to reach managers without putting bulky data on the context itself.

When in doubt: if your new ctx key would hold a slice/map that grows with request content, route the storage through a manager and keep only the ID in ctx.

---

## Core Patterns
Expand Down
8 changes: 4 additions & 4 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -4948,7 +4948,7 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem
pipeline.FinalizeStreamingPostHookSpans(ctx)
bifrost.releasePluginPipeline(pipeline)
}()
defer close(outputStream)
defer providerUtils.CloseStream(ctx, outputStream)

for streamMsg := range shortCircuit.Stream {
if streamMsg == nil {
Expand Down Expand Up @@ -4986,9 +4986,9 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem
// Guarded send: if the consumer abandons outputStream (client
// disconnect, ctx cancel), drain the upstream shortCircuit.Stream
// so its producer can exit cleanly instead of blocking on its send.
select {
case outputStream <- streamResponse:
case <-ctx.Done():
// GateSendChunk routes through the pause/resume gate when a plugin
// has engaged it; otherwise it's a bare ctx-guarded channel send.
if !providerUtils.GateSendChunk(ctx, streamResponse, outputStream) {
Comment thread
akshaydeo marked this conversation as resolved.
for range shortCircuit.Stream {
}
return
Expand Down
8 changes: 4 additions & 4 deletions core/providers/anthropic/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func HandleAnthropicChatCompletionStreaming(
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -682,7 +682,7 @@ func HandleAnthropicChatCompletionStreaming(
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)

Expand Down Expand Up @@ -1133,7 +1133,7 @@ func HandleAnthropicResponsesStream(
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -1149,7 +1149,7 @@ func HandleAnthropicResponsesStream(
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// If body stream is nil, return an error
Expand Down
2 changes: 1 addition & 1 deletion core/providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (provider *AzureProvider) SpeechStream(ctx *schemas.BifrostContext, postHoo
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
// Always release response on exit; bodyStream close should prevent indefinite blocking.
defer providerUtils.ReleaseStreamingResponse(resp)
Expand Down
6 changes: 3 additions & 3 deletions core/providers/bedrock/bedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func (provider *BedrockProvider) TextCompletionStream(ctx *schemas.BifrostContex
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer resp.Body.Close()

Expand Down Expand Up @@ -1140,7 +1140,7 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx *schemas.BifrostContex
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer resp.Body.Close()

Expand Down Expand Up @@ -1504,7 +1504,7 @@ func (provider *BedrockProvider) ResponsesStream(ctx *schemas.BifrostContext, po
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
// Always release response on exit; bodyStream close should prevent indefinite blocking.
defer resp.Body.Close()
Expand Down
8 changes: 4 additions & 4 deletions core/providers/cohere/cohere.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx *schemas.BifrostContext
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -507,7 +507,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx *schemas.BifrostContext
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
Expand Down Expand Up @@ -753,7 +753,7 @@ func (provider *CohereProvider) ResponsesStream(ctx *schemas.BifrostContext, pos
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -771,7 +771,7 @@ func (provider *CohereProvider) ResponsesStream(ctx *schemas.BifrostContext, pos
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
Expand Down
2 changes: 1 addition & 1 deletion core/providers/elevenlabs/elevenlabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (provider *ElevenlabsProvider) SpeechStream(ctx *schemas.BifrostContext, po
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
Expand Down
16 changes: 8 additions & 8 deletions core/providers/gemini/gemini.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func HandleGeminiChatCompletionStream(
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -464,7 +464,7 @@ func HandleGeminiChatCompletionStream(
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)

Expand Down Expand Up @@ -943,7 +943,7 @@ func HandleGeminiResponsesStream(
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -959,7 +959,7 @@ func HandleGeminiResponsesStream(
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()

defer providerUtils.ReleaseStreamingResponse(resp)
Expand Down Expand Up @@ -1432,7 +1432,7 @@ func (provider *GeminiProvider) SpeechStream(ctx *schemas.BifrostContext, postHo
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -1450,7 +1450,7 @@ func (provider *GeminiProvider) SpeechStream(ctx *schemas.BifrostContext, postHo
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()

defer providerUtils.ReleaseStreamingResponse(resp)
Expand Down Expand Up @@ -1721,7 +1721,7 @@ func (provider *GeminiProvider) TranscriptionStream(ctx *schemas.BifrostContext,
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -1739,7 +1739,7 @@ func (provider *GeminiProvider) TranscriptionStream(ctx *schemas.BifrostContext,
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
Expand Down
8 changes: 4 additions & 4 deletions core/providers/huggingface/huggingface.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func (provider *HuggingFaceProvider) ImageGenerationStream(ctx *schemas.BifrostC
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -1105,7 +1105,7 @@ func (provider *HuggingFaceProvider) ImageGenerationStream(ctx *schemas.BifrostC
go func() {
defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer)
defer providerUtils.ReleaseStreamingResponse(resp)
defer close(responseChan)
defer providerUtils.CloseStream(ctx, responseChan)

if resp.BodyStream() == nil {
bifrostErr := providerUtils.NewBifrostOperationError(
Expand Down Expand Up @@ -1471,7 +1471,7 @@ func (provider *HuggingFaceProvider) ImageEditStream(ctx *schemas.BifrostContext
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -1484,7 +1484,7 @@ func (provider *HuggingFaceProvider) ImageEditStream(ctx *schemas.BifrostContext
go func() {
defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer)
defer providerUtils.ReleaseStreamingResponse(resp)
defer close(responseChan)
defer providerUtils.CloseStream(ctx, responseChan)

if resp.BodyStream() == nil {
bifrostErr := providerUtils.NewBifrostOperationError(
Expand Down
4 changes: 2 additions & 2 deletions core/providers/mistral/mistral.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
// Large payload streaming passthrough — pipe raw upstream SSE to client
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
responseChan := make(chan *schemas.BifrostStreamChunk)
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
return responseChan, nil
}

Expand All @@ -472,7 +472,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
}
close(responseChan)
providerUtils.CloseStream(ctx, responseChan)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
Expand Down
Loading
Loading