Skip to content

Commit c58a20c

Browse files
committed
adds pause resume flows for streaming calls
1 parent 58d40f4 commit c58a20c

24 files changed

Lines changed: 2245 additions & 175 deletions

File tree

AGENTS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ ctx.WithValue(key, value) // Chainable variant
245245

246246
**Gotcha**: `BlockRestrictedWrites()` silently drops writes to reserved keys. This prevents plugins from accidentally overwriting internal state.
247247

248+
**Hard rule — never store stream-sized data in `BifrostContext`.** Context holds small handles only: IDs, durations, booleans, interface pointers. Any per-request state that scales with stream content (chunk buffers, accumulated payloads, replay queues, large per-request slices/maps) must live in a top-level manager keyed by `RequestID`, not in `ctx`. Reference implementations:
249+
250+
- `framework/streaming.Accumulator` — owns a `sync.Map` of per-stream `StreamAccumulator` entries keyed by `RequestID`. Only `BifrostContextKeyAccumulatorID` (the ID string) is stored on the context; the chunk buffers live in the manager. The pause/resume gate (`gate.go`) extends the same per-stream entry with a state machine — again, **no buffer in ctx**.
251+
- The `Tracer` interface (in ctx as a small pointer) is the access path for plugins/providers to reach managers without putting bulky data on the context itself.
252+
253+
When in doubt: if your new ctx key would hold a slice/map that grows with request content, route the storage through a manager and keep only the ID in ctx.
254+
248255
---
249256

250257
## Core Patterns

core/bifrost.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4982,9 +4982,9 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem
49824982
// Guarded send: if the consumer abandons outputStream (client
49834983
// disconnect, ctx cancel), drain the upstream shortCircuit.Stream
49844984
// so its producer can exit cleanly instead of blocking on its send.
4985-
select {
4986-
case outputStream <- streamResponse:
4987-
case <-ctx.Done():
4985+
// GateSendChunk routes through the pause/resume gate when a plugin
4986+
// has engaged it; otherwise it's a bare ctx-guarded channel send.
4987+
if !providerUtils.GateSendChunk(ctx, streamResponse, outputStream) {
49884988
for range shortCircuit.Stream {
49894989
}
49904990
return

core/providers/anthropic/anthropic.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ func HandleAnthropicChatCompletionStreaming(
666666
// Large payload streaming passthrough — pipe raw upstream SSE to client
667667
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
668668
responseChan := make(chan *schemas.BifrostStreamChunk)
669-
close(responseChan)
669+
providerUtils.CloseStream(ctx, responseChan)
670670
return responseChan, nil
671671
}
672672

@@ -682,7 +682,7 @@ func HandleAnthropicChatCompletionStreaming(
682682
} else if ctx.Err() == context.DeadlineExceeded {
683683
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
684684
}
685-
close(responseChan)
685+
providerUtils.CloseStream(ctx, responseChan)
686686
}()
687687
defer providerUtils.ReleaseStreamingResponse(resp)
688688

@@ -1133,7 +1133,7 @@ func HandleAnthropicResponsesStream(
11331133
// Large payload streaming passthrough — pipe raw upstream SSE to client
11341134
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
11351135
responseChan := make(chan *schemas.BifrostStreamChunk)
1136-
close(responseChan)
1136+
providerUtils.CloseStream(ctx, responseChan)
11371137
return responseChan, nil
11381138
}
11391139

@@ -1149,7 +1149,7 @@ func HandleAnthropicResponsesStream(
11491149
} else if ctx.Err() == context.DeadlineExceeded {
11501150
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
11511151
}
1152-
close(responseChan)
1152+
providerUtils.CloseStream(ctx, responseChan)
11531153
}()
11541154
defer providerUtils.ReleaseStreamingResponse(resp)
11551155
// If body stream is nil, return an error

core/providers/azure/azure.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1046,7 +1046,7 @@ func (provider *AzureProvider) SpeechStream(ctx *schemas.BifrostContext, postHoo
10461046
} else if ctx.Err() == context.DeadlineExceeded {
10471047
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
10481048
}
1049-
close(responseChan)
1049+
providerUtils.CloseStream(ctx, responseChan)
10501050
}()
10511051
// Always release response on exit; bodyStream close should prevent indefinite blocking.
10521052
defer providerUtils.ReleaseStreamingResponse(resp)

core/providers/bedrock/bedrock.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ func (provider *BedrockProvider) TextCompletionStream(ctx *schemas.BifrostContex
914914
} else if ctx.Err() == context.DeadlineExceeded {
915915
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
916916
}
917-
close(responseChan)
917+
providerUtils.CloseStream(ctx, responseChan)
918918
}()
919919
defer resp.Body.Close()
920920

@@ -1140,7 +1140,7 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx *schemas.BifrostContex
11401140
} else if ctx.Err() == context.DeadlineExceeded {
11411141
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
11421142
}
1143-
close(responseChan)
1143+
providerUtils.CloseStream(ctx, responseChan)
11441144
}()
11451145
defer resp.Body.Close()
11461146

@@ -1504,7 +1504,7 @@ func (provider *BedrockProvider) ResponsesStream(ctx *schemas.BifrostContext, po
15041504
} else if ctx.Err() == context.DeadlineExceeded {
15051505
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
15061506
}
1507-
close(responseChan)
1507+
providerUtils.CloseStream(ctx, responseChan)
15081508
}()
15091509
// Always release response on exit; bodyStream close should prevent indefinite blocking.
15101510
defer resp.Body.Close()

core/providers/cohere/cohere.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx *schemas.BifrostContext
489489
// Large payload streaming passthrough — pipe raw upstream SSE to client
490490
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
491491
responseChan := make(chan *schemas.BifrostStreamChunk)
492-
close(responseChan)
492+
providerUtils.CloseStream(ctx, responseChan)
493493
return responseChan, nil
494494
}
495495

@@ -507,7 +507,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx *schemas.BifrostContext
507507
} else if ctx.Err() == context.DeadlineExceeded {
508508
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
509509
}
510-
close(responseChan)
510+
providerUtils.CloseStream(ctx, responseChan)
511511
}()
512512
defer providerUtils.ReleaseStreamingResponse(resp)
513513
// Decompress gzip-encoded streams transparently (no-op for non-gzip)
@@ -753,7 +753,7 @@ func (provider *CohereProvider) ResponsesStream(ctx *schemas.BifrostContext, pos
753753
// Large payload streaming passthrough — pipe raw upstream SSE to client
754754
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
755755
responseChan := make(chan *schemas.BifrostStreamChunk)
756-
close(responseChan)
756+
providerUtils.CloseStream(ctx, responseChan)
757757
return responseChan, nil
758758
}
759759

@@ -771,7 +771,7 @@ func (provider *CohereProvider) ResponsesStream(ctx *schemas.BifrostContext, pos
771771
} else if ctx.Err() == context.DeadlineExceeded {
772772
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
773773
}
774-
close(responseChan)
774+
providerUtils.CloseStream(ctx, responseChan)
775775
}()
776776
defer providerUtils.ReleaseStreamingResponse(resp)
777777
// Decompress gzip-encoded streams transparently (no-op for non-gzip)

core/providers/elevenlabs/elevenlabs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func (provider *ElevenlabsProvider) SpeechStream(ctx *schemas.BifrostContext, po
390390
} else if ctx.Err() == context.DeadlineExceeded {
391391
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
392392
}
393-
close(responseChan)
393+
providerUtils.CloseStream(ctx, responseChan)
394394
}()
395395
defer providerUtils.ReleaseStreamingResponse(resp)
396396
// Decompress gzip-encoded streams transparently (no-op for non-gzip)

core/providers/gemini/gemini.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func HandleGeminiChatCompletionStream(
448448
// Large payload streaming passthrough — pipe raw upstream SSE to client
449449
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
450450
responseChan := make(chan *schemas.BifrostStreamChunk)
451-
close(responseChan)
451+
providerUtils.CloseStream(ctx, responseChan)
452452
return responseChan, nil
453453
}
454454

@@ -464,7 +464,7 @@ func HandleGeminiChatCompletionStream(
464464
} else if ctx.Err() == context.DeadlineExceeded {
465465
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
466466
}
467-
close(responseChan)
467+
providerUtils.CloseStream(ctx, responseChan)
468468
}()
469469
defer providerUtils.ReleaseStreamingResponse(resp)
470470

@@ -943,7 +943,7 @@ func HandleGeminiResponsesStream(
943943
// Large payload streaming passthrough — pipe raw upstream SSE to client
944944
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
945945
responseChan := make(chan *schemas.BifrostStreamChunk)
946-
close(responseChan)
946+
providerUtils.CloseStream(ctx, responseChan)
947947
return responseChan, nil
948948
}
949949

@@ -959,7 +959,7 @@ func HandleGeminiResponsesStream(
959959
} else if ctx.Err() == context.DeadlineExceeded {
960960
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, logger, postHookSpanFinalizer)
961961
}
962-
close(responseChan)
962+
providerUtils.CloseStream(ctx, responseChan)
963963
}()
964964

965965
defer providerUtils.ReleaseStreamingResponse(resp)
@@ -1432,7 +1432,7 @@ func (provider *GeminiProvider) SpeechStream(ctx *schemas.BifrostContext, postHo
14321432
// Large payload streaming passthrough — pipe raw upstream SSE to client
14331433
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
14341434
responseChan := make(chan *schemas.BifrostStreamChunk)
1435-
close(responseChan)
1435+
providerUtils.CloseStream(ctx, responseChan)
14361436
return responseChan, nil
14371437
}
14381438

@@ -1450,7 +1450,7 @@ func (provider *GeminiProvider) SpeechStream(ctx *schemas.BifrostContext, postHo
14501450
} else if ctx.Err() == context.DeadlineExceeded {
14511451
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
14521452
}
1453-
close(responseChan)
1453+
providerUtils.CloseStream(ctx, responseChan)
14541454
}()
14551455

14561456
defer providerUtils.ReleaseStreamingResponse(resp)
@@ -1721,7 +1721,7 @@ func (provider *GeminiProvider) TranscriptionStream(ctx *schemas.BifrostContext,
17211721
// Large payload streaming passthrough — pipe raw upstream SSE to client
17221722
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
17231723
responseChan := make(chan *schemas.BifrostStreamChunk)
1724-
close(responseChan)
1724+
providerUtils.CloseStream(ctx, responseChan)
17251725
return responseChan, nil
17261726
}
17271727

@@ -1739,7 +1739,7 @@ func (provider *GeminiProvider) TranscriptionStream(ctx *schemas.BifrostContext,
17391739
} else if ctx.Err() == context.DeadlineExceeded {
17401740
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
17411741
}
1742-
close(responseChan)
1742+
providerUtils.CloseStream(ctx, responseChan)
17431743
}()
17441744
defer providerUtils.ReleaseStreamingResponse(resp)
17451745
// Decompress gzip-encoded streams transparently (no-op for non-gzip)

core/providers/huggingface/huggingface.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ func (provider *HuggingFaceProvider) ImageGenerationStream(ctx *schemas.BifrostC
10921092
// Large payload streaming passthrough — pipe raw upstream SSE to client
10931093
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
10941094
responseChan := make(chan *schemas.BifrostStreamChunk)
1095-
close(responseChan)
1095+
providerUtils.CloseStream(ctx, responseChan)
10961096
return responseChan, nil
10971097
}
10981098

@@ -1105,7 +1105,7 @@ func (provider *HuggingFaceProvider) ImageGenerationStream(ctx *schemas.BifrostC
11051105
go func() {
11061106
defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer)
11071107
defer providerUtils.ReleaseStreamingResponse(resp)
1108-
defer close(responseChan)
1108+
defer providerUtils.CloseStream(ctx, responseChan)
11091109

11101110
if resp.BodyStream() == nil {
11111111
bifrostErr := providerUtils.NewBifrostOperationError(
@@ -1471,7 +1471,7 @@ func (provider *HuggingFaceProvider) ImageEditStream(ctx *schemas.BifrostContext
14711471
// Large payload streaming passthrough — pipe raw upstream SSE to client
14721472
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
14731473
responseChan := make(chan *schemas.BifrostStreamChunk)
1474-
close(responseChan)
1474+
providerUtils.CloseStream(ctx, responseChan)
14751475
return responseChan, nil
14761476
}
14771477

@@ -1484,7 +1484,7 @@ func (provider *HuggingFaceProvider) ImageEditStream(ctx *schemas.BifrostContext
14841484
go func() {
14851485
defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer)
14861486
defer providerUtils.ReleaseStreamingResponse(resp)
1487-
defer close(responseChan)
1487+
defer providerUtils.CloseStream(ctx, responseChan)
14881488

14891489
if resp.BodyStream() == nil {
14901490
bifrostErr := providerUtils.NewBifrostOperationError(

core/providers/mistral/mistral.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
455455
// Large payload streaming passthrough — pipe raw upstream SSE to client
456456
if providerUtils.SetupStreamingPassthrough(ctx, resp) {
457457
responseChan := make(chan *schemas.BifrostStreamChunk)
458-
close(responseChan)
458+
providerUtils.CloseStream(ctx, responseChan)
459459
return responseChan, nil
460460
}
461461

@@ -472,7 +472,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
472472
} else if ctx.Err() == context.DeadlineExceeded {
473473
providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)
474474
}
475-
close(responseChan)
475+
providerUtils.CloseStream(ctx, responseChan)
476476
}()
477477
defer providerUtils.ReleaseStreamingResponse(resp)
478478
// Decompress gzip-encoded streams transparently (no-op for non-gzip)

0 commit comments

Comments
 (0)