Skip to content

adds pause resume flows for streaming calls#3091

Open
akshaydeo wants to merge 1 commit intomainfrom
stream-pausing-flow
Open

adds pause resume flows for streaming calls#3091
akshaydeo wants to merge 1 commit intomainfrom
stream-pausing-flow

Conversation

@akshaydeo
Copy link
Copy Markdown
Contributor

@akshaydeo akshaydeo commented Apr 27, 2026

Summary

Introduces a pause/resume/end gate for streaming responses, allowing plugins to pause chunk delivery to the client, buffer chunks in-flight, and resume or terminate the stream on demand. Streams that never engage the gate pay no extra cost — they continue to use a direct channel send with ctx.Done() guard, identical to the previous behavior.

Changes

  • Added PauseStream, ResumeStream, and EndStream methods to BifrostContext, which delegate to the Tracer interface and set BifrostContextKeyStreamGated to engage the gate.
  • Added PauseStream, ResumeStream, EndStream, and GateSend to the Tracer interface, with no-op implementations on NoOpTracer.
  • Introduced GateSendChunk in providerUtils as a drop-in replacement for the inline select { case responseChan <- chunk: ... case <-ctx.Done(): } pattern. When BifrostContextKeyStreamGated is set, it routes through Tracer.GateSend; otherwise it falls through to the same direct send as before.
  • Replaced all direct channel send sites in bifrost.go and utils.go with GateSendChunk.
  • Added framework/streaming/gate.go implementing the per-StreamAccumulator gate state machine (Active → Paused → Active | Ended), including a lazily-started flusher goroutine that drains the replay buffer when the gate resumes or ends.
  • Extended StreamAccumulator with gate fields (gateState, gateReplayBuf, gateCond, gateFlusherCh, etc.) and added BifrostContextKeyStreamGated to the context key registry.
  • Wired PauseStream, ResumeStream, EndStream, and GateSend through framework/tracing/tracer.go to the accumulator layer.
  • cleanupStreamAccumulator now force-ends the gate and broadcasts to unblock any running flusher goroutine before cleanup.

The key design decision is the fast-path check on BifrostContextKeyStreamGated: streams that never call Pause/Resume/End skip all gate logic entirely, keeping the hot path allocation-free and lock-free.

Type of change

  • Bug fix
  • Feature
  • Refactor
  • Documentation
  • Chore/CI

Affected areas

  • Core (Go)
  • Transports (HTTP)
  • Providers/Integrations
  • Plugins
  • UI (React)
  • Docs

How to test

go test ./...

To validate gate behavior manually:

  1. Implement a plugin that calls ctx.PauseStream() in PostLLMHook, waits a short duration, then calls ctx.ResumeStream(). Verify that chunks are buffered during the pause and flushed in order on resume.
  2. Call ctx.EndStream(err) from a plugin hook and verify that subsequent provider chunks are dropped and the supplied error is delivered as the terminal chunk.
  3. Run a streaming request without any plugin engaging the gate and confirm behavior is identical to before this change.

Breaking changes

  • Yes
  • No

The Tracer interface has four new required methods: PauseStream, ResumeStream, EndStream, and GateSend. Any custom Tracer implementation outside this repository must add these methods. The NoOpTracer embedded struct can be used to satisfy them with no-ops.

Related issues

Security considerations

Buffered chunks are held in memory for the duration of a pause. Plugins that pause indefinitely or on high-throughput streams could cause unbounded memory growth. Callers should ensure ResumeStream or EndStream is always eventually called.

Checklist

  • I read docs/contributing/README.md and followed the guidelines
  • I added/updated tests where appropriate
  • I updated documentation where needed
  • I verified builds succeed (Go and UI)
  • I verified the CI pipeline passes locally if applicable

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 27, 2026

📝 Walkthrough

Summary by CodeRabbit

Release Notes

  • New Features

    • Added stream pause, resume, and end flow control with ordered buffering of chunks while paused.
    • Introduced stream state inspection methods and on-demand accumulated response snapshots.
  • Improvements

    • Standardized stream channel termination across all provider integrations.
    • Enhanced streaming lifecycle and error handling during paused states.
  • Documentation

    • Updated design guidance on context usage and request state management.

Walkthrough

The pull request introduces a streaming pause/resume/end gate system that manages ordered chunk delivery with accumulation and buffering capabilities. Provider utilities now centralize stream termination through context-aware helpers. Accumulators are refactored to support gate-aware cleanup semantics, and all streaming providers are updated to use shared stream-close utilities.

Changes

Cohort / File(s) Summary
Streaming Gate Core
framework/streaming/gate.go, framework/streaming/types.go, framework/streaming/gate_test.go
Introduces complete pause/resume/end gate implementation with state management (Active/Paused/Ended), ordered chunk buffering bounded by 100MB, lazy flusher goroutine, terminal error handling, and comprehensive test coverage validating state transitions, client disconnects, ctx cancellation, and query methods.
Schema & Context Updates
core/schemas/bifrost.go, core/schemas/context.go, core/schemas/tracer.go
Adds BifrostContextKeyStreamGated constant and introduces six new BifrostContext methods (PauseStream, ResumeStream, EndStream, IsStreamEnded, IsStreamPaused, GetAccumulatedResponse) as guarded delegates to the Tracer interface with eight new streaming-control methods.
Tracer Implementation
framework/tracing/tracer.go
Implements Tracer streaming-control APIs (pause/resume/end/flusher-wait/state queries/accumulated-response snapshot) as wrappers over embedded Accumulator with early-return defaults, and adds GateSend to replace direct provider channel sends with gated semantics or fallback direct sends.
Stream Accumulator Refactoring
framework/streaming/accumulator.go, framework/streaming/images.go
Refactors cleanup into cleanupStreamAccumulator(requestID, forceEndGate) with two paths: natural-completion defers teardown if flusher/gate is active; force-end-gate immediately terminates and clears buffers. Updates all cleanup callers and drift processing to route through new semantics.
Provider Stream Utilities
core/providers/utils/utils.go
Introduces GateSendChunk(ctx, chunk, responseChan) bool centralizing chunk delivery via tracer gate when engaged, and CloseStream(ctx, ch) coordinating shutdown with gate end/flusher flush before closing. Updates three internal call sites (ProcessAndSendResponse/Error/BifrostError) to use GateSendChunk.
Provider Stream Cleanup
core/bifrost.go, core/providers/anthropic/anthropic.go, core/providers/azure/azure.go, core/providers/bedrock/bedrock.go, core/providers/cohere/cohere.go, core/providers/elevenlabs/elevenlabs.go, core/providers/gemini/gemini.go, core/providers/huggingface/huggingface.go, core/providers/mistral/mistral.go, core/providers/openai/openai.go, core/providers/replicate/replicate.go, core/providers/vllm/vllm.go
Replaces direct close(responseChan) with providerUtils.CloseStream(ctx, responseChan) across all streaming handlers (passthrough fast paths and deferred goroutine cleanup), centralizing channel termination and context coordination.
Documentation & UI
AGENTS.md, ui/components/ui/form.tsx
AGENTS.md adds guidance enforcing small-handle context design and referencing Accumulator/gate as patterns for request-state managers. ui/components/ui/form.tsx reformats indentation (tabs→spaces) and multi-lines JSX exports without logic changes.

Sequence Diagram

sequenceDiagram
    participant Producer as Producer Goroutine
    participant Gate as Gate/Accumulator
    participant Tracer as Tracer
    participant Consumer as Consumer

    Producer->>Gate: GateSend(chunk, isFinal=false)
    Note over Gate: State=Active
    Gate->>Consumer: Send chunk immediately
    Consumer-->>Gate: ✓

    Producer->>Tracer: PauseStream(traceID)
    Tracer->>Gate: Pause + set flag
    Note over Gate: State=Paused, flusher starts

    Producer->>Gate: GateSend(chunk, isFinal=false)
    Note over Gate: Buffer chunk in replay queue
    Gate-->>Producer: ✓ (buffered, not sent)

    Producer->>Gate: GateSend(chunk, isFinal=true)
    Note over Gate: Buffer all pending, send buffered chunks
    Gate->>Consumer: Flush buffered chunks in order
    Consumer-->>Gate: ✓

    Producer->>Tracer: ResumeStream(traceID)
    Tracer->>Gate: Resume state
    Note over Gate: Flusher drains buffer
    Gate->>Consumer: Send remaining chunks in order
    Consumer-->>Gate: ✓

    Producer->>Tracer: EndStream(err)
    Tracer->>Gate: End + optional error chunk
    Note over Gate: State=Ended, drop further chunks
    Gate->>Consumer: Send terminal error (if err provided)
    Consumer-->>Gate: ✓
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 A streaming gate we've built with care,
Pause and buffer chunks midair!
Resume the flow, end with grace,
Order preserved in every place.
Providers close through shared helpers now,
No more direct closes—we know how!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'adds pause resume flows for streaming calls' directly and specifically describes the main feature added in this PR: pause/resume functionality for streaming responses.
Description check ✅ Passed The PR description covers all required template sections: Summary, Changes, Type of change, Affected areas, How to test, Breaking changes, and Security considerations. The description is comprehensive, detailed, and well-structured.
Docstring Coverage ✅ Passed Docstring coverage is 85.48% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch stream-pausing-flow

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.11.4)

level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain main module or its selected dependencies"


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@akshaydeo akshaydeo marked this pull request as ready for review April 27, 2026 18:52
@github-actions
Copy link
Copy Markdown
Contributor

🧪 Test Suite Available

This PR can be tested by a repository admin.

Run tests for PR #3091

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 27, 2026

Confidence Score: 3/5

Not safe to merge as-is; two P1 defects in the core gate logic need to be fixed first.

Two P1 findings: a missing panic recovery in Tracer.GateSend's fallback path that can crash provider goroutines, and a gateReplayBufBytes accounting bug that causes premature stream termination after a partial drain followed by a re-pause. Multiple P1s together pull the score below the P1 ceiling of 4.

framework/streaming/gate.go (byte counter staleness), framework/tracing/tracer.go (panic recovery in GateSend fallback)

Important Files Changed

Filename Overview
framework/streaming/gate.go New gate state machine for pause/resume/end; has known issues with gateReplayBufBytes staleness after partial drain and ordering races on resume
framework/tracing/tracer.go Adds PauseStream/ResumeStream/EndStream/WaitForFlusher/GateSend wiring to the Tracer; GateSend fallback path (accumulator==nil) is missing panic recovery unlike all other equivalent send paths
framework/streaming/accumulator.go Adds gate fields to StreamAccumulator and deferred-cleanup logic; cleanupStreamAccumulator and flusher defer interaction looks correct
core/providers/utils/utils.go Adds GateSendChunk and CloseStream helpers with proper panic recovery and fast-path gate check; straightforward replacement of inline channel sends
core/schemas/context.go Adds PauseStream/ResumeStream/EndStream/IsStreamEnded/IsStreamPaused/GetAccumulatedResponse methods to BifrostContext; delegation to Tracer looks correct
core/schemas/tracer.go Adds 7 new methods to the Tracer interface and NoOpTracer; NoOpTracer.GateSend has correct panic recovery; clean interface extension
framework/streaming/gate_test.go Comprehensive 1226-line test suite covering happy path, pause/resume, ordering, end-stream, buffer overflow, and concurrency
core/bifrost.go Replaces direct channel send and close with GateSendChunk/CloseStream helpers; change is minimal and correct
ui/components/ui/form.tsx Indentation-only change (tabs to spaces); no functional changes

Reviews (6): Last reviewed commit: "adds pause resume flows for streaming ca..." | Re-trigger Greptile

Comment thread framework/streaming/gate.go
Comment thread framework/streaming/gate.go
Comment thread framework/streaming/gate.go
Comment thread framework/streaming/gate.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
core/bifrost.go (1)

4947-4947: ⚠️ Potential issue | 🟠 Major

Don't close gated short-circuit streams from this goroutine.

GateSendChunk can buffer chunks in the tracer while the stream is paused, but Line 4947 still unconditionally closes outputStream and this path never marks a terminal chunk via BifrostContextKeyStreamEndIndicator. For paused short-circuit streams, that leaves the gate with buffered tail data but a closed downstream channel, so resume/end can drop the buffered chunks or panic on send-to-closed-channel.

Also applies to: 4985-4987

core/providers/utils/utils.go (1)

1942-1950: ⚠️ Potential issue | 🟠 Major

Don’t tie final-chunk span completion to successful client delivery.

When a plugin has already called ctx.EndStream(...), GateSendChunk returns false for the provider’s natural final response chunk. This early return skips completeDeferredSpan, so the deferred LLM span/post-hook finalizer never runs for that path.

⚠️ Suggested adjustment
-	if !GateSendChunk(ctx, streamResponse, responseChan) {
-		return
-	}
+	delivered := GateSendChunk(ctx, streamResponse, responseChan)

 	// Check if this is the final chunk and complete deferred span with post-processed data
 	if isFinalChunk := ctx.Value(schemas.BifrostContextKeyStreamEndIndicator); isFinalChunk != nil {
 		if final, ok := isFinalChunk.(bool); ok && final {
 			completeDeferredSpan(ctx, processedResponse, processedError, postHookSpanFinalizer)
 		}
 	}
+
+	if !delivered {
+		return
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/providers/utils/utils.go` around lines 1942 - 1950, The current logic
ties completing the deferred LLM span (completeDeferredSpan) to successful
delivery via GateSendChunk, so when GateSendChunk returns false (e.g., because a
plugin already called ctx.EndStream), the post-hook finalizer never runs; to fix
it, detect the final-chunk indicator
(schemas.BifrostContextKeyStreamEndIndicator) and call completeDeferredSpan(ctx,
processedResponse, processedError, postHookSpanFinalizer) regardless of
GateSendChunk's boolean result — either move the isFinalChunk check and
completeDeferredSpan call before the GateSendChunk early return or add a
follow-up branch that runs completeDeferredSpan when GateSendChunk returned
false and isFinalChunk is true, ensuring processedResponse/processedError and
postHookSpanFinalizer are available.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@framework/streaming/gate.go`:
- Around line 74-87: End currently just stores gateEndError and broadcasts,
which can be dropped if gateFlusher never drains (e.g., gate paused or no
buffered chunks), so update the shutdown flow to ensure the terminal error is
actually delivered to the client: in StreamAccumulator.End (and the analogous
code at the other location), after setting gateState = StreamStateEnded and
gateEndError, ensure the gateFlusher loop (or any consumer path that normally
emits final messages) checks gateEndError when it wakes and explicitly
emits/delivers the terminal error to the client (or enqueues a terminal error
chunk) before returning; reference symbols: StreamAccumulator.End, gateFlusher,
EndStream(err), gateEndError, gateState, StreamStateEnded, gateCond to find and
implement the explicit delivery logic so the error cannot be silently dropped.
- Around line 135-137: The gate flusher can attempt to send on responseChan
after a provider goroutine closes it (race between gateFlusher and provider);
ensure providers signal the flusher to stop before closing the channel: call
Accumulator.EndStream(traceID, err) (or otherwise set gateState to
StreamStateEnded / cancel gateFlusherCtx) from the provider before executing any
defer close(responseChan), or alternatively stop closing the channel in
providers and let gateFlusher own closing (i.e., hand off ownership so
sendOrCancel in gateFlusher never sends to a closed channel). Reference
gateFlusher, responseChan, gateFlusherCtx, sendOrCancel, StreamStateEnded, and
Accumulator.EndStream when making the change.

---

Outside diff comments:
In `@core/providers/utils/utils.go`:
- Around line 1942-1950: The current logic ties completing the deferred LLM span
(completeDeferredSpan) to successful delivery via GateSendChunk, so when
GateSendChunk returns false (e.g., because a plugin already called
ctx.EndStream), the post-hook finalizer never runs; to fix it, detect the
final-chunk indicator (schemas.BifrostContextKeyStreamEndIndicator) and call
completeDeferredSpan(ctx, processedResponse, processedError,
postHookSpanFinalizer) regardless of GateSendChunk's boolean result — either
move the isFinalChunk check and completeDeferredSpan call before the
GateSendChunk early return or add a follow-up branch that runs
completeDeferredSpan when GateSendChunk returned false and isFinalChunk is true,
ensuring processedResponse/processedError and postHookSpanFinalizer are
available.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: efd68463-75c6-4c76-b72f-814fd18602f2

📥 Commits

Reviewing files that changed from the base of the PR and between 1279747 and 75d8bf2.

📒 Files selected for processing (9)
  • core/bifrost.go
  • core/providers/utils/utils.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go

Comment thread framework/streaming/gate.go
Comment thread framework/streaming/gate.go Outdated
@akshaydeo akshaydeo force-pushed the stream-pausing-flow branch 2 times, most recently from d7fd279 to c34a0d0 Compare April 27, 2026 19:26
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
framework/streaming/gate.go (1)

74-86: ⚠️ Potential issue | 🟠 Major

Deliver gateEndError on the first post-EndStream(err) send.

End only records the terminal error, and Line 110 drops out as soon as the gate is already Ended. On the common active-path flow (ctx.EndStream(err) from a post-hook, with no replay buffer/flusher yet), that means the current GateSend call is the first place that has (ch, ctx), but the synthetic terminal error is still never emitted. The client sees a silent close instead, and TestGate_EndStreamErrorWhileActive currently bakes in that broken behavior.

💡 Minimal fix sketch
 func (sa *StreamAccumulator) GateSend(chunk *schemas.BifrostStreamChunk, isFinal, isHardErr bool, ch chan *schemas.BifrostStreamChunk, ctx *schemas.BifrostContext) bool {
 	sa.mu.Lock()
 	sa.gateSeq++
 	// Cache (ch, ctx) for the flusher. They are stable for the life of the stream.
 	if sa.gateFlusherCh == nil {
 		sa.gateFlusherCh = ch
 	}
 	if sa.gateFlusherCtx == nil {
 		sa.gateFlusherCtx = ctx
 	}

 	if sa.gateState == StreamStateEnded {
+		if sa.gateEndError != nil && !sa.gateFlusherOn && len(sa.gateReplayBuf) == 0 {
+			errChunk := &schemas.BifrostStreamChunk{BifrostError: sa.gateEndError}
+			sa.gateEndError = nil
+			sa.mu.Unlock()
+			return sendOrCancel(ctx, ch, errChunk)
+		}
 		sa.mu.Unlock()
 		return false
 	}

Also applies to: 99-130

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate.go` around lines 74 - 86, StreamAccumulator.End
currently only records sa.gateEndError and doesn't ensure the terminal error is
emitted to the active sender; update the send path so the first GateSend after
EndStream(err) delivers that error. Add a boolean flag (e.g.
sa.gateEndDelivered) to StreamAccumulator, and in GateSend (the function that
handles sending when sa.gateState==StreamStateEnded) check if sa.gateEndError !=
nil and !sa.gateEndDelivered: deliver/return the gateEndError to the caller on
that first send and set sa.gateEndDelivered = true (under the same mutex
protecting sa.gateState/gateEndError) so subsequent sends behave as a normal
closed gate. Ensure End still sets sa.gateEndError and broadcasts sa.gateCond as
it does now.
🧹 Nitpick comments (2)
framework/streaming/accumulator.go (1)

370-377: Unify cleanup paths so gate teardown always runs.

cleanupStreamAccumulator now correctly force-ends/broadcasts the gate, but Cleanup() still does manual draining/deletion and bypasses this logic. Reusing cleanupStreamAccumulator there keeps shutdown behavior consistent.

♻️ Suggested refactor
 func (a *Accumulator) Cleanup() {
 	// Clean up all stream accumulators
 	a.streamAccumulators.Range(func(key, value interface{}) bool {
 		accumulator := value.(*StreamAccumulator)

-		// Lock before accessing chunk slices
+		// Lock and reuse centralized cleanup so gate teardown stays consistent.
 		accumulator.mu.Lock()
-		for _, chunk := range accumulator.ChatStreamChunks {
-			a.putChatStreamChunk(chunk)
-		}
-		for _, chunk := range accumulator.ResponsesStreamChunks {
-			a.putResponsesStreamChunk(chunk)
-		}
-		for _, chunk := range accumulator.TranscriptionStreamChunks {
-			a.putTranscriptionStreamChunk(chunk)
-		}
-		for _, chunk := range accumulator.AudioStreamChunks {
-			a.putAudioStreamChunk(chunk)
-		}
-		for _, chunk := range accumulator.ImageStreamChunks {
-			a.putImageStreamChunk(chunk)
-		}
+		a.cleanupStreamAccumulator(key.(string))
 		accumulator.mu.Unlock()
-
-		a.streamAccumulators.Delete(key)
 		return true
 	})
 	a.cleanupOnce.Do(func() {
 		close(a.stopCleanup)
 	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/accumulator.go` around lines 370 - 377, Cleanup()
duplicates manual draining/deletion logic and bypasses the unified gate teardown
in cleanupStreamAccumulator; instead, modify Cleanup() to call
cleanupStreamAccumulator so the force-end of acc.gateState to StreamStateEnded
and acc.gateCond.Broadcast() always run, removing the duplicate drain/delete
code path and ensuring consistent gate teardown behavior across shutdown paths.
core/schemas/tracer.go (1)

117-142: Please run provider-level streaming validation for this interface expansion.

Given this Tracer API fans out into provider send paths, validate with the repo’s end-to-end core test flow, not only unit tests.

Based on learnings: Use make test-core (not raw go test) for provider-level tests because it wires env vars and runs llmtests streaming scenarios.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/schemas/tracer.go` around lines 117 - 142, Run the repo’s end-to-end
core tests (make test-core) to validate provider-level streaming behavior for
the expanded Tracer interface (PauseStream, ResumeStream, EndStream, GateSend);
exercise llmtests streaming scenarios (pause/resume, final and hard-error paths)
so the provider send paths are exercised, capture any failures in the GateSend
buffering/flush logic or state transitions, and fix the provider helper code or
Tracer implementation until make test-core passes end-to-end streaming tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@framework/streaming/gate.go`:
- Around line 74-86: StreamAccumulator.End currently only records
sa.gateEndError and doesn't ensure the terminal error is emitted to the active
sender; update the send path so the first GateSend after EndStream(err) delivers
that error. Add a boolean flag (e.g. sa.gateEndDelivered) to StreamAccumulator,
and in GateSend (the function that handles sending when
sa.gateState==StreamStateEnded) check if sa.gateEndError != nil and
!sa.gateEndDelivered: deliver/return the gateEndError to the caller on that
first send and set sa.gateEndDelivered = true (under the same mutex protecting
sa.gateState/gateEndError) so subsequent sends behave as a normal closed gate.
Ensure End still sets sa.gateEndError and broadcasts sa.gateCond as it does now.

---

Nitpick comments:
In `@core/schemas/tracer.go`:
- Around line 117-142: Run the repo’s end-to-end core tests (make test-core) to
validate provider-level streaming behavior for the expanded Tracer interface
(PauseStream, ResumeStream, EndStream, GateSend); exercise llmtests streaming
scenarios (pause/resume, final and hard-error paths) so the provider send paths
are exercised, capture any failures in the GateSend buffering/flush logic or
state transitions, and fix the provider helper code or Tracer implementation
until make test-core passes end-to-end streaming tests.

In `@framework/streaming/accumulator.go`:
- Around line 370-377: Cleanup() duplicates manual draining/deletion logic and
bypasses the unified gate teardown in cleanupStreamAccumulator; instead, modify
Cleanup() to call cleanupStreamAccumulator so the force-end of acc.gateState to
StreamStateEnded and acc.gateCond.Broadcast() always run, removing the duplicate
drain/delete code path and ensuring consistent gate teardown behavior across
shutdown paths.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 92b1796c-0d91-4c8d-a531-51d42cbaee88

📥 Commits

Reviewing files that changed from the base of the PR and between 75d8bf2 and d7fd279.

📒 Files selected for processing (11)
  • AGENTS.md
  • core/bifrost.go
  • core/providers/utils/utils.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/gate_test.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go
✅ Files skipped from review due to trivial changes (2)
  • AGENTS.md
  • core/bifrost.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • framework/tracing/tracer.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/bifrost.go`:
- Around line 4985-4987: The goroutine currently always closes outputStream on
exit even when GateSendChunk buffered chunks during a pause, which lets a later
resume/flush (from shortCircuit.Stream replay) send into a closed channel;
change ownership so the gated path finalizes the channel: stop closing
outputStream unconditionally in the goroutine that calls GateSendChunk and
instead add an explicit gate-finalization step that is invoked only after
GateSendChunk confirms no buffered data remain (or let the gate/flush code own
closing). Locate the send logic around GateSendChunk, the shortCircuit.Stream
replay code, and the unconditional close(outputStream) call and implement: (1)
remove/guard the close(outputStream) so it only runs when not using the gated
buffer, or (2) add a GateFinalize/FlushComplete handshake that the gated path
calls before closing outputStream to ensure no future replay writes occur.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6748f2fd-3bff-4cd0-a4ea-1a7393cb669c

📥 Commits

Reviewing files that changed from the base of the PR and between d7fd279 and c34a0d0.

📒 Files selected for processing (11)
  • AGENTS.md
  • core/bifrost.go
  • core/providers/utils/utils.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/gate_test.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go
✅ Files skipped from review due to trivial changes (1)
  • AGENTS.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/providers/utils/utils.go

Comment thread core/bifrost.go
@akshaydeo akshaydeo force-pushed the stream-pausing-flow branch from c34a0d0 to c58a20c Compare April 28, 2026 10:24
Comment thread framework/streaming/gate.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (1)
core/bifrost.go (1)

4985-4991: ⚠️ Potential issue | 🔴 Critical

Keep outputStream open until gate finalization completes.

GateSendChunk can now buffer/replay chunks, but this path still relies on the goroutine-level close(outputStream) above. If the gate flushes after the goroutine exits, that close can still trigger a send-on-closed-channel panic.

🔧 Suggested fix
-				defer close(outputStream)
+				// Let the gate/finalizer own outputStream closure once all buffered
+				// chunks have been flushed.
...
-					if !providerUtils.GateSendChunk(ctx, streamResponse, outputStream) {
+					if !providerUtils.GateSendChunk(ctx, streamResponse, outputStream) {
 						for range shortCircuit.Stream {
 						}
 						return
 					}
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/providers/openai/openai.go`:
- Around line 520-525: The passthrough fast-path returns early without releasing
the pooled fasthttp response (resp), leaking pooled objects; before
creating/returning responseChan in the branch that calls
providerUtils.SetupStreamingPassthrough and providerUtils.CloseStream, release
the pooled response (e.g., call resp.Release() or fasthttp.ReleaseResponse(resp)
depending on project convention) and then proceed to create responseChan and
return; apply the same fix to each streaming helper passthrough that uses resp,
providerUtils.SetupStreamingPassthrough, providerUtils.CloseStream and returns
responseChan.

In `@core/providers/utils/utils.go`:
- Around line 1908-1927: CloseStream can deadlock when a provider exits while a
stream is paused because it waits on tracer.WaitForFlusher before the gate is
ended; change CloseStream (and its gated branch using
BifrostContextKeyStreamGated, BifrostContextKeyTracer, BifrostContextKeyTraceID,
and WaitForFlusher) to first force-end the gate on the tracer (call the tracer's
ResumeStream or EndStream/ForceEndGate equivalent with traceID) so any parked
flusher can proceed, then call tracer.WaitForFlusher(traceID) and finally
close(ch).

In `@core/schemas/tracer.go`:
- Around line 261-274: NoOpTracer.GateSend can panic with "send on closed
channel" when ch is closed concurrently; modify GateSend (the function on
NoOpTracer that receives parameters chunk *BifrostStreamChunk, ctx
*BifrostContext and ch chan *BifrostStreamChunk) to protect both direct-send
paths by performing the channel send inside a small closure that defers a
recover() and returns a bool indicating success; replace the raw ch <- chunk in
the ctx==nil branch and the ch <- chunk case inside the select with calls to
that closure so a concurrent close results in false instead of crashing.

In `@framework/streaming/accumulator.go`:
- Around line 365-416: Accumulator.Cleanup() is bypassing the gate-aware
teardown and must call the new helper; change Accumulator.Cleanup() to invoke
a.cleanupStreamAccumulator(requestID, true) (forcing the gate end and wake)
instead of manually returning chunks and deleting from a.streamAccumulators;
remove the manual chunk-return/delete logic there so teardown is centralized in
cleanupStreamAccumulator; apply the same replacement at the other location that
currently repeats manual teardown (the second manual cleanup block referenced in
the comment, lines ~461-493) so both paths consistently call
a.cleanupStreamAccumulator(requestID, true).

In `@framework/streaming/gate.go`:
- Around line 37-45: WaitForFlusher currently uses getOrCreateStreamAccumulator
which allocates a new stream entry for unknown traceIDs; change it to perform a
read-only lookup by calling a.getStreamAccumulator(traceID) instead, and if that
returns nil simply return, otherwise call sa.WaitForFlusher(); update the
WaitForFlusher method on Accumulator to use getStreamAccumulator (not
getOrCreateStreamAccumulator) and handle the nil case so misses are
allocation-free.

In `@framework/streaming/types.go`:
- Around line 149-172: The paused replay buffer gateReplayBuf is unbounded; add
bounded-cap fields (e.g., gateReplayBufBytes int, gateReplayBufMaxBytes int,
gateReplayBufMaxChunks int) into the same struct and enforce them wherever
chunks are appended (the GateSend/append-while-active path and any PauseStream
logic that captures chunks): when appending a chunk, increment
gateReplayBufBytes by the chunk byte length and increment chunk count; if either
limit would be exceeded, set gateEndError to a descriptive schemas.BifrostError,
set gatePendingTerminal=true, and transition the gate to Ended/trigger flusher
(call EndStream-equivalent) so the stream ends deterministically; ensure
Resume/flusher checks gateEndError and delivers the terminal chunk. Initialize
the max limits (reasonable defaults) where the Accumulator/stream is created and
document the overflow behavior in comments.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fb53a840-ecb9-4eaa-b234-6ddd4cd814b2

📥 Commits

Reviewing files that changed from the base of the PR and between c34a0d0 and c58a20c.

📒 Files selected for processing (24)
  • AGENTS.md
  • core/bifrost.go
  • core/providers/anthropic/anthropic.go
  • core/providers/azure/azure.go
  • core/providers/bedrock/bedrock.go
  • core/providers/cohere/cohere.go
  • core/providers/elevenlabs/elevenlabs.go
  • core/providers/gemini/gemini.go
  • core/providers/huggingface/huggingface.go
  • core/providers/mistral/mistral.go
  • core/providers/openai/openai.go
  • core/providers/replicate/replicate.go
  • core/providers/utils/utils.go
  • core/providers/vllm/vllm.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/gate_test.go
  • framework/streaming/images.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go
  • ui/components/ui/form.tsx
✅ Files skipped from review due to trivial changes (2)
  • AGENTS.md
  • ui/components/ui/form.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
  • framework/tracing/tracer.go

Comment thread core/providers/openai/openai.go
Comment thread core/providers/utils/utils.go
Comment thread core/schemas/tracer.go
Comment thread framework/streaming/accumulator.go
Comment on lines +37 to +45
// WaitForFlusher is the Tracer-level entry point for blocking until the gate
// flusher for traceID has fully drained and exited.
func (a *Accumulator) WaitForFlusher(traceID string) {
if traceID == "" {
return
}
sa := a.getOrCreateStreamAccumulator(traceID)
sa.WaitForFlusher()
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make WaitForFlusher read-only.

WaitForFlusher should be a barrier, not an allocator. Using getOrCreateStreamAccumulator here turns an unknown traceID into a fresh map entry that sticks around for the full TTL, which breaks the PR’s “non-gated streams stay allocation-free” fast path if close/wait is called on misses.

Suggested fix
 func (a *Accumulator) WaitForFlusher(traceID string) {
 	if traceID == "" {
 		return
 	}
-	sa := a.getOrCreateStreamAccumulator(traceID)
-	sa.WaitForFlusher()
+	v, ok := a.streamAccumulators.Load(traceID)
+	if !ok {
+		return
+	}
+	v.(*StreamAccumulator).WaitForFlusher()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// WaitForFlusher is the Tracer-level entry point for blocking until the gate
// flusher for traceID has fully drained and exited.
func (a *Accumulator) WaitForFlusher(traceID string) {
if traceID == "" {
return
}
sa := a.getOrCreateStreamAccumulator(traceID)
sa.WaitForFlusher()
}
// WaitForFlusher is the Tracer-level entry point for blocking until the gate
// flusher for traceID has fully drained and exited.
func (a *Accumulator) WaitForFlusher(traceID string) {
if traceID == "" {
return
}
v, ok := a.streamAccumulators.Load(traceID)
if !ok {
return
}
v.(*StreamAccumulator).WaitForFlusher()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate.go` around lines 37 - 45, WaitForFlusher currently
uses getOrCreateStreamAccumulator which allocates a new stream entry for unknown
traceIDs; change it to perform a read-only lookup by calling
a.getStreamAccumulator(traceID) instead, and if that returns nil simply return,
otherwise call sa.WaitForFlusher(); update the WaitForFlusher method on
Accumulator to use getStreamAccumulator (not getOrCreateStreamAccumulator) and
handle the nil case so misses are allocation-free.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — WaitForFlusher (framework/streaming/gate.go:39-49) switched to read-only streamAccumulators.Load; bails immediately on miss. Matches the IsStreamEnded/IsStreamPaused pattern, no more sticky-allocation when the traceID is unknown.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaydeo, thanks for the confirmation! Using streamAccumulators.Load with an early bail on miss is exactly the right approach — it keeps WaitForFlusher as a pure barrier with no side-effects, fully consistent with the IsStreamEnded/IsStreamPaused read-only pattern. ✅


🧠 Learnings used
Learnt from: qwerty-dvorak
Repo: maximhq/bifrost PR: 1006
File: core/schemas/account.go:9-18
Timestamp: 2025-12-09T17:07:42.007Z
Learning: In core/schemas/account.go, the HuggingFaceKeyConfig field within the Key struct is currently unused and reserved for future Hugging Face inference endpoint deployments. Do not flag this field as missing from OpenAPI documentation or require its presence in the API spec until the feature is actively implemented and used. When the feature is added, update the OpenAPI docs accordingly; otherwise, treat this field as non-breaking and not part of the current API surface.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1153
File: framework/configstore/rdb.go:2221-2246
Timestamp: 2025-12-29T11:54:55.836Z
Learning: In Go reviews, do not flag range-over-int patterns like for i := range n as compile-time errors, assuming Go 1.22+ semantics. Only flag actual range-capable values (slices, arrays, maps, channels, strings) and other compile-time issues. This applies to all Go files across the repository.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 1312
File: framework/modelcatalog/pricing.go:276-426
Timestamp: 2026-01-14T04:40:11.480Z
Learning: In the Bifrost codebase, ImageUsage and other usage types guarantee that TotalTokens is populated (computed as InputTokens + OutputTokens if providers don’t supply TotalTokens). Reviewers can rely on this invariant and should not assume TotalTokens may be missing when input/output tokens exist. When implementing tiering logic or token-based decisions, you can safely use TotalTokens without extra null/zero guards, provided you’re in a context where InputTokens and OutputTokens are present. If a branch might discard tokens, ensure the invariant is preserved or add explicit checks only where the inputs are confirmed to be valid.

Learnt from: Radheshg04
Repo: maximhq/bifrost PR: 1326
File: plugins/semanticcache/test_utils.go:545-559
Timestamp: 2026-01-14T13:30:28.760Z
Learning: In the maximhq/bifrost repository, prefer using bifrost.Ptr() to create pointers instead of the address operator (&) even when & would be valid syntactically. Apply this consistently across all code paths, including test utilities, to improve consistency and readability. Replace occurrences of &value where a *T is expected with bifrost.Ptr(value) (or an equivalent call) and ensure the function is in scope and used correctly for the target pointer type.

Learnt from: jerkeyray
Repo: maximhq/bifrost PR: 1740
File: transports/bifrost-http/handlers/governance.go:3168-3214
Timestamp: 2026-02-23T07:58:44.087Z
Learning: In this codebase using GORM, models with CreatedAt and UpdatedAt fields of type time.Time tagged with gorm:"autoCreateTime" and gorm:"autoUpdateTime" are populated automatically by GORM on insert/update. Do not manually set them with time.Now(). Remove any manual initialization; rely on GORM's automatic timestamps. This applies to all Go files with GORM models in the repository.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1670
File: core/providers/anthropic/anthropic.go:689-707
Timestamp: 2026-02-24T04:21:32.824Z
Learning: In Go streaming handlers that reuse pooled response objects (e.g., BifrostChatResponse, BifrostResponsesResponse.Response) via ProcessAndSendResponse, do not release them back to their pools while asynchronous readers (PostLLMHook goroutines) may still access them. Releasing between Acquire and use can cause data races and panics when fields are read by the goroutines. Rely on GC after all references are dropped, and apply this safety pattern to all pooled response types passed through ProcessAndSendResponse in streaming contexts. This should be documented and enforced consistently across all relevant Go files.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1836
File: core/providers/utils/utils.go:1994-2019
Timestamp: 2026-03-01T13:11:33.245Z
Learning: Enforce the repository-wide convention: all object pools must use raw sync.Pool (not pool.New[T]() or generic pool builders). When reviewing any Go files, verify that pooling code uses sync.Pool directly and constistent with the examples in maximhq/bifrost (e.g., core/bifrost.go, core/providers/anthropic/anthropic.go, core/providers/cohere/cohere.go, core/schemas/plugin.go, framework/tracing/store.go). In particular, do not introduce pool.New[T]() usage; ensure existing pool implementations remain the raw sync.Pool pattern.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2152
File: framework/logstore/tables.go:422-431
Timestamp: 2026-03-18T09:04:27.884Z
Learning: Do not flag usages of new(expr) in Go code as compile-time errors. Starting with Go 1.26, new() accepts an expression operand (e.g., new(string(data))), and is valid syntax. Reviewers should only flag actual invalid uses per the Go version used in CI, and assume new(expr) forms are allowed across Go files.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2322
File: core/providers/anthropic/responses.go:1988-1992
Timestamp: 2026-03-27T09:20:29.538Z
Learning: In maximhq/bifrost, `BifrostContext` (`core/schemas/context.go`) is a mutable shared context. Its `(*BifrostContext).SetValue(key, value any)` is a pointer-receiver that mutates the internal `userValues map[any]any` in place under a write mutex; it does not create a derived context like Go’s `context.WithValue`. Therefore, when reviewing code, do not flag `SetValue` usage as failing to “propagate” context—subsequent `ctx.Value()` reads on the same `*BifrostContext` pointer should see the updated value immediately.

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2339
File: plugins/logging/utils.go:531-543
Timestamp: 2026-03-31T05:42:40.984Z
Learning: When reviewing Go code that uses `schemas.ResponsesMessageContent` (as in `plugins/logging/utils.go`), treat `ContentStr` and `ContentBlocks` as mutually exclusive content sources. The type’s `MarshalJSON` enforces that: if `ContentStr != nil`, it is the sole content source and code should not include or suggest a fallback-to-`ContentBlocks` guard when `ContentStr` is non-nil (even if it might be empty). Conversely, only use `ContentBlocks` when `ContentStr` is nil, per the schema contract.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 2355
File: framework/streaming/transcription.go:196-204
Timestamp: 2026-04-02T07:11:08.346Z
Learning: In this codebase’s streaming responses, do not flag `ProcessedStreamResponse` as missing a legacy `Model` field. The `Model` field was intentionally removed during the model-alias refactor; consumers should use `RequestedModel` and/or `ResolvedModel` instead (e.g., for logging, maxim, telemetry, and tracing plugins).

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 2355
File: framework/streaming/types.go:272-280
Timestamp: 2026-04-02T08:03:26.610Z
Learning: In maximhq/bifrost streaming code (framework/streaming and related files), treat ProcessedStreamResponse.RawRequest as *interface{} by design. Do not dereference p.RawRequest (or otherwise collapse the two-level indirection) before assigning it to ExtraFields.RawRequest. Preserve the pointer and use the existing guard pattern: assign RawRequest only when p.RawRequest != nil, since nil pointer means “not set” while a non-nil pointer wrapping a nil interface means “set to a nil value.” This is relied upon by json.Marshal to serialize the underlying value correctly across all streaming branches (text, chat, responses, speech, transcription, image).

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2339
File: plugins/logging/main.go:740-744
Timestamp: 2026-04-03T10:46:22.677Z
Learning: In maximhq/bifrost, Bifrost context key/value assignments are done with `(*BifrostContext).SetValue(key, value)`, not with `context.WithValue`. During code review/searches for where `BifrostContextKey*` constants are set, look for `.SetValue(` patterns (e.g., `rg -n 'SetValue.*BifrostContextKey'`) rather than `context.WithValue`/`WithValue.*Key` to avoid false findings that a key is never set.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2510
File: framework/objectstore/gcs.go:32-43
Timestamp: 2026-04-04T10:05:42.632Z
Learning: In maximhq/bifrost, `schemas.EnvVar` does not implement `IsDefined()`. Reviewers should not flag or suggest calling `schemas.EnvVar.IsDefined()` anywhere in the repository. To check whether an EnvVar config is both present and resolved, use `cfg.Field != nil && cfg.Field.GetValue() != ""` (where `cfg.Field` is the `*schemas.EnvVar` pointer field being evaluated).

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2509
File: framework/logstore/store.go:110-127
Timestamp: 2026-04-04T10:30:13.550Z
Learning: In maximhq/bifrost, do not reference `EnvVar.IsDefined()` (it does not exist on `schemas.EnvVar`). To validate a non-pointer `schemas.EnvVar` field, check `field.GetValue() == ""` (for “defined” it should be non-empty). For pointer `*schemas.EnvVar` fields, use `field != nil && field.GetValue() != ""` to avoid nil dereferences. This rule should be applied across all Go source files in the repo.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2509
File: framework/logstore/hybrid.go:104-117
Timestamp: 2026-04-04T14:03:25.451Z
Learning: In maximhq/bifrost, when enqueueing to a channel that may be concurrently closed, follow the established race-safe pattern used for log/upload write queues: (1) check the `atomic.Bool` closed flag before attempting the send, then (2) `defer recover()` around the send to handle a possible `send on closed channel` panic that races the flag check. Do not flag this pattern or suggest replacing it with alternatives like `sync.Once` or `ProviderQueue`. Note: the `ProviderQueue` pattern is specific to provider request queues in `core/bifrost.go` and should not be applied as a replacement for these log/upload enqueue/write queues.

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2341
File: transports/bifrost-http/handlers/webrtc_realtime.go:1045-1066
Timestamp: 2026-04-07T10:37:38.913Z
Learning: In this repository’s Go code, treat governance context keys as limited to the ones that are actually propagated as request-context values: VirtualKey, Team, Customer, User, RoutingRule, IncludeOnlyKeys, and PluginName. Do not suggest adding or flagging missing propagation of BifrostContextKeyGovernanceBusinessUnitID or BifrostContextKeyGovernanceBusinessUnitName in any Go context copy/list. These Business Unit keys are UI-only and appear exclusively in TSX, not in Go request-context propagation (including core/schemas/bifrost.go and bifrost-enterprise/).

Learnt from: jerkeyray
Repo: maximhq/bifrost PR: 2605
File: framework/vectorstore/redis.go:1670-1686
Timestamp: 2026-04-09T19:27:39.791Z
Learning: In maximhq/bifrost, the method `(*schemas.EnvVar).CoerceBool(defaultValue bool)` is nil-receiver safe: it begins with `if e == nil { return defaultValue }`, so calling it on an optional `*schemas.EnvVar` (i.e., `nil` possible) should not be treated as a potential nil-dereference panic. During review, do not flag unconditional calls to `.CoerceBool(...)` on optional/nullable `*schemas.EnvVar` fields as nil-deref issues, since the implementation guards against `e == nil` in `core/schemas/envvar.go`.

Learnt from: TejasGhatte
Repo: maximhq/bifrost PR: 2559
File: core/internal/llmtests/embedding_multimodal.go:47-82
Timestamp: 2026-04-10T07:31:05.055Z
Learning: In maximhq/bifrost, `EmbeddingStruct` (and its `EmbeddingArray` / `Embedding2DArray` fields) has been removed from the live embedding schema. The active embedding representation is `EmbeddingsByType` in `core/schemas/embedding.go` with fields `Float []float64`, `Int8 []int8`, `Uint8 []uint8`, `Binary []byte`, `Ubinary []byte`, and `Base64 *string`. During code reviews, do not flag, require handling, or suggest supporting `EmbeddingArray` or `Embedding2DArray` anywhere in the codebase because those fields no longer exist in the current schema.

Learnt from: BearTS
Repo: maximhq/bifrost PR: 2893
File: transports/config.schema.json:1548-1550
Timestamp: 2026-04-21T12:58:33.892Z
Learning: In the maximhq/bifrost public repository, `access_profiles` / `AccessProfiles` is an enterprise-only feature implemented in the private enterprise codebase. During code review of the public repo, do not flag issues like “missing Go struct fields” (e.g., in `ConfigData`, `GovernanceConfig`, or related types) or related unmarshaling/handling gaps specifically for `access_profiles`, since the corresponding fields and runtime behavior are not present in the public code.

Learnt from: BearTS
Repo: maximhq/bifrost PR: 2935
File: transports/bifrost-http/integrations/pydanticai.go:49-54
Timestamp: 2026-04-22T13:14:01.847Z
Learning: When reviewing Go code in maximhq/bifrost, do not flag `resp.WithDefaults()` as a potential nil-pointer panic if `resp` is a `*schemas.BifrostResponsesResponse`. The method `(*schemas.BifrostResponsesResponse).WithDefaults()` (in `core/schemas/responses.go`) is nil-receiver safe: it immediately returns `nil` when `resp == nil`, so calls do not panic even without a prior nil check.

Learnt from: roroghost17
Repo: maximhq/bifrost PR: 2937
File: core/providers/anthropic/request_builder.go:1-1
Timestamp: 2026-04-23T11:26:47.834Z
Learning: In maximhq/bifrost, underscores in non-test Go filenames are an established naming convention (e.g., `count_tokens.go`, `large_payload.go`, `request_builder.go`). During code review, do not flag underscore-containing Go filenames as a naming violation or suggest renaming them. This exception applies only to non-test `.go` files; the general rule may still apply to test files if a separate convention exists.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 3067
File: transports/bifrost-http/handlers/inference.go:68-82
Timestamp: 2026-04-27T09:47:28.613Z
Learning: In maximhq/bifrost, `(*ModelCatalog).GetProvidersForModel(model)` (implemented in `framework/modelcatalog/models.go`) already returns only providers that are configured/deployed for the current deployment. Its `modelPool` is populated exclusively via `UpsertModelDataForProvider` for providers registered in the current deployment, and the cross-provider special-case additions (OpenRouter, Vertex, Groq, Bedrock) also gate on `mc.modelPool[provider]`. Therefore, when reviewing code at any call site (e.g., `resolveModelAndProvider` in `transports/bifrost-http/handlers/inference.go`), do not recommend adding an extra filter such as `config.GetAvailableProviders()` or similar to “re-check” availability—doing so would be redundant given the catalog’s implicit invariant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already addressed — WaitForFlusher now uses read-only streamAccumulators.Load (see framework/streaming/gate.go:60-69), matching the sibling IsStreamEnded/IsStreamPaused pattern. Unknown traceID returns immediately without allocating a map entry. This is solved, can you check and resolve if done properly?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current implementation of WaitForFlusher in gate.go
sed -n '37,80p' framework/streaming/gate.go

Length of output: 1379


@akshaydeo, confirmed — the fix is properly implemented. Lines 58–69 of framework/streaming/gate.go now read:

func (a *Accumulator) WaitForFlusher(traceID string) {
    if traceID == "" {
        return
    }
    v, ok := a.streamAccumulators.Load(traceID)
    if !ok {
        return
    }
    v.(*StreamAccumulator).WaitForFlusher()
}

This is exactly right:

  • Read-only sync.Map load — no entry is created for an unknown traceID.
  • Immediate bail on miss — consistent with the IsStreamEnded/IsStreamPaused sibling pattern.
  • The doc comment also explicitly calls out the "Read-only" contract.

Resolving the comment. ✅


🧠 Learnings used
Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 3091
File: core/bifrost.go:4985-4987
Timestamp: 2026-04-28T14:19:13.657Z
Learning: In maximhq/bifrost streaming code, do not call close(ch) directly. Always use providerUtils.CloseStream(ctx, ch), which waits for the tracer’s flusher to drain any gated/paused replay before closing. Chunk delivery should use providerUtils.GateSendChunk(ctx, chunk, ch) so non-gated streams take the fast path and gated streams buffer/replay safely. Applies to short-circuit streams in core/bifrost.go and provider streaming handlers.

Learnt from: qwerty-dvorak
Repo: maximhq/bifrost PR: 1006
File: core/schemas/account.go:9-18
Timestamp: 2025-12-09T17:07:42.007Z
Learning: In core/schemas/account.go, the HuggingFaceKeyConfig field within the Key struct is currently unused and reserved for future Hugging Face inference endpoint deployments. Do not flag this field as missing from OpenAPI documentation or require its presence in the API spec until the feature is actively implemented and used. When the feature is added, update the OpenAPI docs accordingly; otherwise, treat this field as non-breaking and not part of the current API surface.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1153
File: framework/configstore/rdb.go:2221-2246
Timestamp: 2025-12-29T11:54:55.836Z
Learning: In Go reviews, do not flag range-over-int patterns like for i := range n as compile-time errors, assuming Go 1.22+ semantics. Only flag actual range-capable values (slices, arrays, maps, channels, strings) and other compile-time issues. This applies to all Go files across the repository.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 1312
File: framework/modelcatalog/pricing.go:276-426
Timestamp: 2026-01-14T04:40:11.480Z
Learning: In the Bifrost codebase, ImageUsage and other usage types guarantee that TotalTokens is populated (computed as InputTokens + OutputTokens if providers don’t supply TotalTokens). Reviewers can rely on this invariant and should not assume TotalTokens may be missing when input/output tokens exist. When implementing tiering logic or token-based decisions, you can safely use TotalTokens without extra null/zero guards, provided you’re in a context where InputTokens and OutputTokens are present. If a branch might discard tokens, ensure the invariant is preserved or add explicit checks only where the inputs are confirmed to be valid.

Learnt from: Radheshg04
Repo: maximhq/bifrost PR: 1326
File: plugins/semanticcache/test_utils.go:545-559
Timestamp: 2026-01-14T13:30:28.760Z
Learning: In the maximhq/bifrost repository, prefer using bifrost.Ptr() to create pointers instead of the address operator (&) even when & would be valid syntactically. Apply this consistently across all code paths, including test utilities, to improve consistency and readability. Replace occurrences of &value where a *T is expected with bifrost.Ptr(value) (or an equivalent call) and ensure the function is in scope and used correctly for the target pointer type.

Learnt from: jerkeyray
Repo: maximhq/bifrost PR: 1740
File: transports/bifrost-http/handlers/governance.go:3168-3214
Timestamp: 2026-02-23T07:58:44.087Z
Learning: In this codebase using GORM, models with CreatedAt and UpdatedAt fields of type time.Time tagged with gorm:"autoCreateTime" and gorm:"autoUpdateTime" are populated automatically by GORM on insert/update. Do not manually set them with time.Now(). Remove any manual initialization; rely on GORM's automatic timestamps. This applies to all Go files with GORM models in the repository.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1670
File: core/providers/anthropic/anthropic.go:689-707
Timestamp: 2026-02-24T04:21:32.824Z
Learning: In Go streaming handlers that reuse pooled response objects (e.g., BifrostChatResponse, BifrostResponsesResponse.Response) via ProcessAndSendResponse, do not release them back to their pools while asynchronous readers (PostLLMHook goroutines) may still access them. Releasing between Acquire and use can cause data races and panics when fields are read by the goroutines. Rely on GC after all references are dropped, and apply this safety pattern to all pooled response types passed through ProcessAndSendResponse in streaming contexts. This should be documented and enforced consistently across all relevant Go files.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 1836
File: core/providers/utils/utils.go:1994-2019
Timestamp: 2026-03-01T13:11:33.245Z
Learning: Enforce the repository-wide convention: all object pools must use raw sync.Pool (not pool.New[T]() or generic pool builders). When reviewing any Go files, verify that pooling code uses sync.Pool directly and constistent with the examples in maximhq/bifrost (e.g., core/bifrost.go, core/providers/anthropic/anthropic.go, core/providers/cohere/cohere.go, core/schemas/plugin.go, framework/tracing/store.go). In particular, do not introduce pool.New[T]() usage; ensure existing pool implementations remain the raw sync.Pool pattern.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2152
File: framework/logstore/tables.go:422-431
Timestamp: 2026-03-18T09:04:27.884Z
Learning: Do not flag usages of new(expr) in Go code as compile-time errors. Starting with Go 1.26, new() accepts an expression operand (e.g., new(string(data))), and is valid syntax. Reviewers should only flag actual invalid uses per the Go version used in CI, and assume new(expr) forms are allowed across Go files.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2322
File: core/providers/anthropic/responses.go:1988-1992
Timestamp: 2026-03-27T09:20:29.538Z
Learning: In maximhq/bifrost, `BifrostContext` (`core/schemas/context.go`) is a mutable shared context. Its `(*BifrostContext).SetValue(key, value any)` is a pointer-receiver that mutates the internal `userValues map[any]any` in place under a write mutex; it does not create a derived context like Go’s `context.WithValue`. Therefore, when reviewing code, do not flag `SetValue` usage as failing to “propagate” context—subsequent `ctx.Value()` reads on the same `*BifrostContext` pointer should see the updated value immediately.

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2339
File: plugins/logging/utils.go:531-543
Timestamp: 2026-03-31T05:42:40.984Z
Learning: When reviewing Go code that uses `schemas.ResponsesMessageContent` (as in `plugins/logging/utils.go`), treat `ContentStr` and `ContentBlocks` as mutually exclusive content sources. The type’s `MarshalJSON` enforces that: if `ContentStr != nil`, it is the sole content source and code should not include or suggest a fallback-to-`ContentBlocks` guard when `ContentStr` is non-nil (even if it might be empty). Conversely, only use `ContentBlocks` when `ContentStr` is nil, per the schema contract.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 2355
File: framework/streaming/transcription.go:196-204
Timestamp: 2026-04-02T07:11:08.346Z
Learning: In this codebase’s streaming responses, do not flag `ProcessedStreamResponse` as missing a legacy `Model` field. The `Model` field was intentionally removed during the model-alias refactor; consumers should use `RequestedModel` and/or `ResolvedModel` instead (e.g., for logging, maxim, telemetry, and tracing plugins).

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 2355
File: framework/streaming/types.go:272-280
Timestamp: 2026-04-02T08:03:26.610Z
Learning: In maximhq/bifrost streaming code (framework/streaming and related files), treat ProcessedStreamResponse.RawRequest as *interface{} by design. Do not dereference p.RawRequest (or otherwise collapse the two-level indirection) before assigning it to ExtraFields.RawRequest. Preserve the pointer and use the existing guard pattern: assign RawRequest only when p.RawRequest != nil, since nil pointer means “not set” while a non-nil pointer wrapping a nil interface means “set to a nil value.” This is relied upon by json.Marshal to serialize the underlying value correctly across all streaming branches (text, chat, responses, speech, transcription, image).

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2339
File: plugins/logging/main.go:740-744
Timestamp: 2026-04-03T10:46:22.677Z
Learning: In maximhq/bifrost, Bifrost context key/value assignments are done with `(*BifrostContext).SetValue(key, value)`, not with `context.WithValue`. During code review/searches for where `BifrostContextKey*` constants are set, look for `.SetValue(` patterns (e.g., `rg -n 'SetValue.*BifrostContextKey'`) rather than `context.WithValue`/`WithValue.*Key` to avoid false findings that a key is never set.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2510
File: framework/objectstore/gcs.go:32-43
Timestamp: 2026-04-04T10:05:42.632Z
Learning: In maximhq/bifrost, `schemas.EnvVar` does not implement `IsDefined()`. Reviewers should not flag or suggest calling `schemas.EnvVar.IsDefined()` anywhere in the repository. To check whether an EnvVar config is both present and resolved, use `cfg.Field != nil && cfg.Field.GetValue() != ""` (where `cfg.Field` is the `*schemas.EnvVar` pointer field being evaluated).

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2509
File: framework/logstore/store.go:110-127
Timestamp: 2026-04-04T10:30:13.550Z
Learning: In maximhq/bifrost, do not reference `EnvVar.IsDefined()` (it does not exist on `schemas.EnvVar`). To validate a non-pointer `schemas.EnvVar` field, check `field.GetValue() == ""` (for “defined” it should be non-empty). For pointer `*schemas.EnvVar` fields, use `field != nil && field.GetValue() != ""` to avoid nil dereferences. This rule should be applied across all Go source files in the repo.

Learnt from: akshaydeo
Repo: maximhq/bifrost PR: 2509
File: framework/logstore/hybrid.go:104-117
Timestamp: 2026-04-04T14:03:25.451Z
Learning: In maximhq/bifrost, when enqueueing to a channel that may be concurrently closed, follow the established race-safe pattern used for log/upload write queues: (1) check the `atomic.Bool` closed flag before attempting the send, then (2) `defer recover()` around the send to handle a possible `send on closed channel` panic that races the flag check. Do not flag this pattern or suggest replacing it with alternatives like `sync.Once` or `ProviderQueue`. Note: the `ProviderQueue` pattern is specific to provider request queues in `core/bifrost.go` and should not be applied as a replacement for these log/upload enqueue/write queues.

Learnt from: danpiths
Repo: maximhq/bifrost PR: 2341
File: transports/bifrost-http/handlers/webrtc_realtime.go:1045-1066
Timestamp: 2026-04-07T10:37:38.913Z
Learning: In this repository’s Go code, treat governance context keys as limited to the ones that are actually propagated as request-context values: VirtualKey, Team, Customer, User, RoutingRule, IncludeOnlyKeys, and PluginName. Do not suggest adding or flagging missing propagation of BifrostContextKeyGovernanceBusinessUnitID or BifrostContextKeyGovernanceBusinessUnitName in any Go context copy/list. These Business Unit keys are UI-only and appear exclusively in TSX, not in Go request-context propagation (including core/schemas/bifrost.go and bifrost-enterprise/).

Learnt from: jerkeyray
Repo: maximhq/bifrost PR: 2605
File: framework/vectorstore/redis.go:1670-1686
Timestamp: 2026-04-09T19:27:39.791Z
Learning: In maximhq/bifrost, the method `(*schemas.EnvVar).CoerceBool(defaultValue bool)` is nil-receiver safe: it begins with `if e == nil { return defaultValue }`, so calling it on an optional `*schemas.EnvVar` (i.e., `nil` possible) should not be treated as a potential nil-dereference panic. During review, do not flag unconditional calls to `.CoerceBool(...)` on optional/nullable `*schemas.EnvVar` fields as nil-deref issues, since the implementation guards against `e == nil` in `core/schemas/envvar.go`.

Learnt from: TejasGhatte
Repo: maximhq/bifrost PR: 2559
File: core/internal/llmtests/embedding_multimodal.go:47-82
Timestamp: 2026-04-10T07:31:05.055Z
Learning: In maximhq/bifrost, `EmbeddingStruct` (and its `EmbeddingArray` / `Embedding2DArray` fields) has been removed from the live embedding schema. The active embedding representation is `EmbeddingsByType` in `core/schemas/embedding.go` with fields `Float []float64`, `Int8 []int8`, `Uint8 []uint8`, `Binary []byte`, `Ubinary []byte`, and `Base64 *string`. During code reviews, do not flag, require handling, or suggest supporting `EmbeddingArray` or `Embedding2DArray` anywhere in the codebase because those fields no longer exist in the current schema.

Learnt from: BearTS
Repo: maximhq/bifrost PR: 2893
File: transports/config.schema.json:1548-1550
Timestamp: 2026-04-21T12:58:33.892Z
Learning: In the maximhq/bifrost public repository, `access_profiles` / `AccessProfiles` is an enterprise-only feature implemented in the private enterprise codebase. During code review of the public repo, do not flag issues like “missing Go struct fields” (e.g., in `ConfigData`, `GovernanceConfig`, or related types) or related unmarshaling/handling gaps specifically for `access_profiles`, since the corresponding fields and runtime behavior are not present in the public code.

Learnt from: BearTS
Repo: maximhq/bifrost PR: 2935
File: transports/bifrost-http/integrations/pydanticai.go:49-54
Timestamp: 2026-04-22T13:14:01.847Z
Learning: When reviewing Go code in maximhq/bifrost, do not flag `resp.WithDefaults()` as a potential nil-pointer panic if `resp` is a `*schemas.BifrostResponsesResponse`. The method `(*schemas.BifrostResponsesResponse).WithDefaults()` (in `core/schemas/responses.go`) is nil-receiver safe: it immediately returns `nil` when `resp == nil`, so calls do not panic even without a prior nil check.

Learnt from: roroghost17
Repo: maximhq/bifrost PR: 2937
File: core/providers/anthropic/request_builder.go:1-1
Timestamp: 2026-04-23T11:26:47.834Z
Learning: In maximhq/bifrost, underscores in non-test Go filenames are an established naming convention (e.g., `count_tokens.go`, `large_payload.go`, `request_builder.go`). During code review, do not flag underscore-containing Go filenames as a naming violation or suggest renaming them. This exception applies only to non-test `.go` files; the general rule may still apply to test files if a separate convention exists.

Learnt from: Pratham-Mishra04
Repo: maximhq/bifrost PR: 3067
File: transports/bifrost-http/handlers/inference.go:68-82
Timestamp: 2026-04-27T09:47:28.613Z
Learning: In maximhq/bifrost, `(*ModelCatalog).GetProvidersForModel(model)` (implemented in `framework/modelcatalog/models.go`) already returns only providers that are configured/deployed for the current deployment. Its `modelPool` is populated exclusively via `UpsertModelDataForProvider` for providers registered in the current deployment, and the cross-provider special-case additions (OpenRouter, Vertex, Groq, Bedrock) also gate on `mc.modelPool[provider]`. Therefore, when reviewing code at any call site (e.g., `resolveModelAndProvider` in `transports/bifrost-http/handlers/inference.go`), do not recommend adding an extra filter such as `config.GetAvailableProviders()` or similar to “re-check” availability—doing so would be redundant given the catalog’s implicit invariant.

Comment thread framework/streaming/types.go
@akshaydeo akshaydeo force-pushed the stream-pausing-flow branch from c58a20c to d677de6 Compare April 28, 2026 14:18
Comment thread framework/streaming/gate.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
ui/app/workspace/logs/sheets/logDetailView.tsx (1)

1770-1806: Consider reusing the existing tool-call renderer to avoid drift.

This block duplicates the tool-call rendering pattern already used in ui/app/workspace/logs/views/logChatMessageView.tsx (same CollapsibleBox + CodeEditor behavior). Consider extracting a shared renderer/helper and using it in both places to keep copy/options/layout changes consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/app/workspace/logs/sheets/logDetailView.tsx` around lines 1770 - 1806,
Extract the duplicated CollapsibleBox+CodeEditor rendering into a shared
component or renderer (e.g., ToolCallView or renderToolCall) that accepts props
like toolCall, index, collapsedHeight, and any editor options; move it to a
common location and replace the inline map rendering in logDetailView.tsx (the
map over log.output_message.tool_calls using CollapsibleBox and CodeEditor) and
the corresponding code in ui/app/workspace/logs/views/logChatMessageView.tsx to
call the new shared component so both places reuse the same JSON serialization,
title logic (toolCall.function?.name || `#${index+1}`), onCopy behavior, editor
props (lang: "json", readonly, options) and layout.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@framework/streaming/accumulator.go`:
- Around line 389-399: When handling the orphan/TTL force-end path in the block
that sets acc.gateState = StreamStateEnded and broadcasts acc.gateCond, also
clear any pending terminal delivery by setting acc.gatePendingTerminal = nil (or
its zero value) and clear acc.gateEndError = nil/zero so gateFlusher() cannot
attempt to emit a synthetic final error after the broadcast; update the same
conditional branch that currently sets acc.gateReplayBuf/gateReplayBufBytes so
that acc.gatePendingTerminal and acc.gateEndError are explicitly reset when
forceEndGate && acc.gateState != StreamStateEnded.

In `@framework/streaming/gate.go`:
- Around line 341-351: The current GateSend path checks only
len(gateReplayBuf)>0 and can fall through during an active flusher, allowing
live chunks to bypass the replay tail and skip gateReplayBufBytes accounting;
change the condition to treat gateFlusherOn as the ordering barrier (i.e., if
sa.gateFlusherOn || len(sa.gateReplayBuf) > 0) so live chunks are queued until
the flusher finishes, and when queuing update sa.gateReplayBufBytes and enforce
the existing capacity/cap logic (same as other enqueue sites) before
broadcasting and unlocking; reference functions/fields: GateSend/sendOrCancel,
drainBufferLocked, sa.gateFlusherOn, sa.gateReplayBuf, and
sa.gateReplayBufBytes.

---

Nitpick comments:
In `@ui/app/workspace/logs/sheets/logDetailView.tsx`:
- Around line 1770-1806: Extract the duplicated CollapsibleBox+CodeEditor
rendering into a shared component or renderer (e.g., ToolCallView or
renderToolCall) that accepts props like toolCall, index, collapsedHeight, and
any editor options; move it to a common location and replace the inline map
rendering in logDetailView.tsx (the map over log.output_message.tool_calls using
CollapsibleBox and CodeEditor) and the corresponding code in
ui/app/workspace/logs/views/logChatMessageView.tsx to call the new shared
component so both places reuse the same JSON serialization, title logic
(toolCall.function?.name || `#${index+1}`), onCopy behavior, editor props (lang:
"json", readonly, options) and layout.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 96336992-e05d-4013-9a18-b738a7213321

📥 Commits

Reviewing files that changed from the base of the PR and between c58a20c and d677de6.

📒 Files selected for processing (25)
  • AGENTS.md
  • core/bifrost.go
  • core/providers/anthropic/anthropic.go
  • core/providers/azure/azure.go
  • core/providers/bedrock/bedrock.go
  • core/providers/cohere/cohere.go
  • core/providers/elevenlabs/elevenlabs.go
  • core/providers/gemini/gemini.go
  • core/providers/huggingface/huggingface.go
  • core/providers/mistral/mistral.go
  • core/providers/openai/openai.go
  • core/providers/replicate/replicate.go
  • core/providers/utils/utils.go
  • core/providers/vllm/vllm.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/gate_test.go
  • framework/streaming/images.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go
  • ui/app/workspace/logs/sheets/logDetailView.tsx
  • ui/components/ui/form.tsx
✅ Files skipped from review due to trivial changes (7)
  • core/providers/azure/azure.go
  • core/providers/elevenlabs/elevenlabs.go
  • core/providers/bedrock/bedrock.go
  • AGENTS.md
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • ui/components/ui/form.tsx
🚧 Files skipped from review as they are similar to previous changes (7)
  • framework/streaming/images.go
  • core/providers/vllm/vllm.go
  • core/providers/mistral/mistral.go
  • core/bifrost.go
  • core/providers/huggingface/huggingface.go
  • core/providers/anthropic/anthropic.go
  • core/providers/gemini/gemini.go

Comment on lines +389 to +399
// Orphan path: force the gate to terminate so any blocked flusher wakes
// up and exits. Drops buffered chunks — acceptable because the consumer
// of an orphaned stream is gone by definition.
if forceEndGate && acc.gateState != StreamStateEnded {
acc.gateState = StreamStateEnded
if acc.gateCond != nil {
acc.gateCond.Broadcast()
}
a.streamAccumulators.Delete(requestID)
acc.gateReplayBuf = nil
acc.gateReplayBufBytes = 0
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clear pending terminal delivery during force-end cleanup.

forceEndGate=true is the orphan/TTL path that intentionally drops buffered output, but gateFlusher() still checks gateEndError after this broadcast and can try to emit a synthetic final error chunk before exiting. If a paused stream had already staged EndStream(err), that send can hang the flusher on a channel with no live consumer. Clear gatePendingTerminal and gateEndError here so force-reap is truly drop-only.

🔧 Proposed fix
 	if forceEndGate && acc.gateState != StreamStateEnded {
 		acc.gateState = StreamStateEnded
+		acc.gatePendingTerminal = false
+		acc.gateEndError = nil
 		if acc.gateCond != nil {
 			acc.gateCond.Broadcast()
 		}
 		acc.gateReplayBuf = nil
 		acc.gateReplayBufBytes = 0
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/accumulator.go` around lines 389 - 399, When handling the
orphan/TTL force-end path in the block that sets acc.gateState =
StreamStateEnded and broadcasts acc.gateCond, also clear any pending terminal
delivery by setting acc.gatePendingTerminal = nil (or its zero value) and clear
acc.gateEndError = nil/zero so gateFlusher() cannot attempt to emit a synthetic
final error after the broadcast; update the same conditional branch that
currently sets acc.gateReplayBuf/gateReplayBufBytes so that
acc.gatePendingTerminal and acc.gateEndError are explicitly reset when
forceEndGate && acc.gateState != StreamStateEnded.

Comment thread framework/streaming/gate.go
@akshaydeo akshaydeo force-pushed the stream-pausing-flow branch from d677de6 to db9e5e8 Compare April 28, 2026 19:45
Comment on lines +375 to +401
// provider close paths to coordinate with paused streams. See
// schemas.Tracer.WaitForFlusher for full semantics.
func (t *Tracer) WaitForFlusher(traceID string) {
if traceID == "" || t.accumulator == nil {
return
}
t.accumulator.WaitForFlusher(traceID)
}

// IsStreamEnded reports whether the gate for traceID is in the Ended state.
// See schemas.Tracer.IsStreamEnded for full semantics.
func (t *Tracer) IsStreamEnded(traceID string) bool {
if traceID == "" || t.accumulator == nil {
return false
}
return t.accumulator.IsStreamEnded(traceID)
}

// IsStreamPaused reports whether the gate for traceID is currently Paused.
// See schemas.Tracer.IsStreamPaused for full semantics.
func (t *Tracer) IsStreamPaused(traceID string) bool {
if traceID == "" || t.accumulator == nil {
return false
}
return t.accumulator.IsStreamPaused(traceID)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fallback send path missing panic recovery

The defensive fallback in GateSend (reached when t.accumulator == nil || traceID == "") sends directly to ch without a recover() guard. Every other equivalent direct-send path in this PR — NoOpTracer.GateSend, sendOrCancel, and GateSendChunk's fall-through — wraps the channel send in a defer recover() to absorb "send on closed channel" panics. If a provider goroutine hits this fallback while the consumer has already closed the channel (e.g., a race with CloseStream), the missing recovery will crash the goroutine.

func (t *Tracer) GateSend(traceID string, chunk *schemas.BifrostStreamChunk, isFinal, isHardErr bool, ch chan *schemas.BifrostStreamChunk, ctx *schemas.BifrostContext) (ok bool) {
    if t.accumulator == nil || traceID == "" {
        defer func() {
            if recover() != nil {
                ok = false
            }
        }()
        if ctx == nil {
            ch <- chunk
            return true
        }
        select {
        case ch <- chunk:
            return true
        case <-ctx.Done():
            return false
        }
    }
    return t.accumulator.GateSend(traceID, chunk, isFinal, isHardErr, ch, ctx)
}

Comment on lines +357 to +378
func (sa *StreamAccumulator) drainBufferLocked() {
for sa.gateState != StreamStatePaused && len(sa.gateReplayBuf) > 0 {
chunk := sa.gateReplayBuf[0]
sa.gateReplayBuf = sa.gateReplayBuf[1:]
ch := sa.gateFlusherCh
ctx := sa.gateFlusherCtx
sa.mu.Unlock()
ok := sendOrCancel(ctx, ch, chunk)
sa.mu.Lock()
if !ok {
// ctx done; abandon remaining buffer and end the gate.
sa.gateReplayBuf = nil
sa.gateReplayBufBytes = 0
sa.gateState = StreamStateEnded
return
}
}
if len(sa.gateReplayBuf) == 0 {
sa.gateReplayBuf = nil // release backing array
sa.gateReplayBufBytes = 0
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 gateReplayBufBytes not decremented per-chunk during drain

drainBufferLocked removes chunks from gateReplayBuf one at a time but never decrements gateReplayBufBytes as it drains. The counter is only reset to 0 when the buffer reaches empty. If a concurrent Pause() call interrupts the drain mid-way (the loop exits because gateState == StreamStatePaused), gateReplayBufBytes retains the pre-drain total even though many bytes have been delivered. On the next pause, the overflow check uses a stale (inflated) byte count, so a stream that buffered and drained 80 MB, then was paused again with only 30 MB new data, incorrectly reports 110 MB buffered and force-ends the stream.

Additionally, the ordering-preservation append path (lines 344–348) adds chunks to gateReplayBuf while Active without incrementing gateReplayBufBytes at all, so a subsequent pause starts with an understated counter.

Fix: decrement gateReplayBufBytes by chunkBytes(chunk) inside the drain loop immediately after removing each chunk.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
core/providers/openai/openai.go (1)

7008-7009: ⚠️ Potential issue | 🟠 Major

PassthroughStream still bypasses pause/resume gate semantics.

This path still uses raw close(ch) and direct channel send, so pause/resume/end gating won’t apply consistently for passthrough streams.

🔧 Suggested fix
 		defer func() {
 			if ctx.Err() == context.Canceled {
 				providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.logger, postHookSpanFinalizer)
 			} else if ctx.Err() == context.DeadlineExceeded {
 				providerUtils.HandleStreamTimeout(ctx, postHookRunner, ch, provider.logger, postHookSpanFinalizer)
 			}
-			close(ch)
+			providerUtils.CloseStream(ctx, ch)
 		}()
@@
-				select {
-				case ch <- &schemas.BifrostStreamChunk{
-					BifrostPassthroughResponse: &schemas.BifrostPassthroughResponse{
-						StatusCode:  statusCode,
-						Headers:     headers,
-						Body:        chunk,
-						ExtraFields: extraFields,
-					},
-				}:
-				case <-ctx.Done():
-					return
-				}
+				providerUtils.GateSendChunk(ctx, &schemas.BifrostStreamChunk{
+					BifrostPassthroughResponse: &schemas.BifrostPassthroughResponse{
+						StatusCode:  statusCode,
+						Headers:     headers,
+						Body:        chunk,
+						ExtraFields: extraFields,
+					},
+				}, ch)

Based on learnings: in provider streaming handlers, chunk delivery should use providerUtils.GateSendChunk(ctx, chunk, ch) and stream closure should use providerUtils.CloseStream(ctx, ch).

Also applies to: 7020-7032

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/providers/openai/openai.go` around lines 7008 - 7009, The passthrough
streaming path in PassthroughStream (and the nearby block around the other loop
at 7020-7032) currently uses direct channel sends and raw close(ch), which
bypasses pause/resume/end gating; update the code to deliver chunks via
providerUtils.GateSendChunk(ctx, chunk, ch) instead of direct ch <- chunk, and
replace any raw close(ch) calls with providerUtils.CloseStream(ctx, ch) so
pause/resume/end semantics are respected for passthrough streams. Ensure you
change all occurrences in the PassthroughStream handler and the adjacent
send/close block (the second loop noted) to use these providerUtils helpers.
core/providers/anthropic/anthropic.go (1)

2706-2727: ⚠️ Potential issue | 🟠 Major

Passthrough stream still uses non-gated send/close primitives.

AnthropicProvider.PassthroughStream still does raw close(ch) and direct ch <- ... sends. That can bypass gate buffering/replay behavior for paused streams. Please migrate this path to providerUtils.CloseStream(ctx, ch) and providerUtils.GateSendChunk(...).

Based on learnings: Chunk delivery should use providerUtils.GateSendChunk(ctx, chunk, ch) and stream channels should be closed via providerUtils.CloseStream(ctx, ch) so gated/paused replay drains safely.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/providers/anthropic/anthropic.go` around lines 2706 - 2727,
AnthropicProvider.PassthroughStream currently uses raw channel operations
(close(ch) and ch <- ...) which bypass the gating/replay behavior; replace
direct sends of *schemas.BifrostStreamChunk with
providerUtils.GateSendChunk(ctx, chunk, ch) (wrapping the same
BifrostPassthroughResponse payload) and replace direct close(ch) calls with
providerUtils.CloseStream(ctx, ch); ensure the same variables (statusCode,
headers, extraFields, and the byte slice body chunk) are used when constructing
the chunk passed to GateSendChunk so paused streams still buffer/replay
correctly.
core/providers/azure/azure.go (1)

2858-2879: ⚠️ Potential issue | 🟠 Major

Passthrough stream still bypasses gate-aware send/close.

AzureProvider.PassthroughStream still uses raw close(ch) and direct channel sends. That path can bypass pause/resume/end gating behavior for passthrough chunks. Please migrate this path to providerUtils.CloseStream(ctx, ch) and providerUtils.GateSendChunk(...) as well.

Based on learnings: Chunk delivery should use providerUtils.GateSendChunk(ctx, chunk, ch) and stream channels should be closed via providerUtils.CloseStream(ctx, ch) so gated/paused replay drains safely.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/providers/azure/azure.go` around lines 2858 - 2879, In
AzureProvider.PassthroughStream, replace direct channel sends and raw close(ch)
with the gate-aware helpers: use providerUtils.GateSendChunk(ctx,
&schemas.BifrostStreamChunk{BifrostPassthroughResponse: ...}) instead of sending
into ch directly, and replace the raw close(ch) with
providerUtils.CloseStream(ctx, ch) so pause/resume/end gating is respected; also
remove or change any deferred close(ch) to defer providerUtils.CloseStream(ctx,
ch) and ensure all code paths that previously wrote to ch (creating
schemas.BifrostStreamChunk / BifrostPassthroughResponse) now call
providerUtils.GateSendChunk and handle ctx.Done()/errors consistently.
core/providers/gemini/gemini.go (1)

4207-4208: ⚠️ Potential issue | 🟠 Major

PassthroughStream still bypasses the gate on send/close.

PassthroughStream still uses raw close(ch) and direct select { case ch <- ... }. This bypasses gated buffering/replay semantics, so pause/resume/end behavior is inconsistent for this stream path.

Suggested fix direction
-           close(ch)
+           providerUtils.CloseStream(ctx, ch)

Also replace the direct send/select block with providerUtils.GateSendChunk(...) so passthrough chunks follow the same gate-aware delivery path as other streaming handlers.

Based on learnings: “Chunk delivery should use providerUtils.GateSendChunk(ctx, chunk, ch) … and stream channels should be closed via providerUtils.CloseStream(ctx, ch).”

Also applies to: 4239-4250

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/providers/gemini/gemini.go` around lines 4207 - 4208, PassthroughStream
currently closes the channel and sends chunks directly using close(ch) and
direct select/send which bypasses gated buffering/replay; change
PassthroughStream to call providerUtils.GateSendChunk(ctx, chunk, ch) for all
chunk sends and replace raw close(ch) with providerUtils.CloseStream(ctx, ch) so
this stream path respects pause/resume/end gate semantics; update every direct
send/select and close in the PassthroughStream implementation (references:
PassthroughStream, providerUtils.GateSendChunk, providerUtils.CloseStream, ch,
chunk) accordingly.
♻️ Duplicate comments (2)
framework/streaming/accumulator.go (1)

389-399: ⚠️ Potential issue | 🟠 Major

Force-end cleanup should clear staged terminal delivery too.

gateFlusher() still checks gatePendingTerminal and gateEndError after this broadcast. If TTL/orphan cleanup force-ends a stream that had already staged EndStream(err), the flusher can still try to emit the synthetic terminal chunk instead of exiting as a pure drop path.

🔧 Suggested fix
-	if forceEndGate && acc.gateState != StreamStateEnded {
-		acc.gateState = StreamStateEnded
-		if acc.gateCond != nil {
-			acc.gateCond.Broadcast()
-		}
-		acc.gateReplayBuf = nil
-		acc.gateReplayBufBytes = 0
-	}
+	if forceEndGate {
+		acc.gateState = StreamStateEnded
+		acc.gatePendingTerminal = false
+		acc.gateEndError = nil
+		if acc.gateCond != nil {
+			acc.gateCond.Broadcast()
+		}
+		acc.gateReplayBuf = nil
+		acc.gateReplayBufBytes = 0
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/accumulator.go` around lines 389 - 399, The force-end
branch that marks acc.gateState = StreamStateEnded and clears acc.gateReplayBuf
must also clear any staged terminal delivery so gateFlusher cannot emit a
synthetic terminal: when forceEndGate is true (in the same block that broadcasts
acc.gateCond and clears acc.gateReplayBuf/gateReplayBufBytes), also reset
acc.gatePendingTerminal and acc.gateEndError (or otherwise nil/zero them) so
that gateFlusher sees no pending terminal and will exit as a pure drop path
instead of emitting EndStream(err).
framework/streaming/gate.go (1)

344-346: ⚠️ Potential issue | 🔴 Critical

Keep replay ordering and byte accounting tied to the flusher, not len(gateReplayBuf).

Once drainBufferLocked pops the last buffered chunk, len(sa.gateReplayBuf) becomes 0 before that chunk is actually delivered. A concurrent send here falls back to sendOrCancel, so live chunks can overtake the replay tail. Separately, gateReplayBufBytes is only zeroed when the queue is fully empty, so a resume → pause cycle can overflow against bytes that were already sent.

Use sa.gateFlusherOn as the ordering barrier, queue new live chunks while the flusher is still running, and keep gateReplayBufBytes updated as chunks enter/leave the queue.

💡 Suggested direction
-	if len(sa.gateReplayBuf) > 0 {
-		sa.gateReplayBuf = append(sa.gateReplayBuf, chunk)
-		sa.gateCond.Broadcast()
+	if sa.gateFlusherOn || len(sa.gateReplayBuf) > 0 {
+		size := chunkBytes(chunk)
+		sa.gateReplayBuf = append(sa.gateReplayBuf, chunk)
+		sa.gateReplayBufBytes += size
+		if sa.gateCond != nil {
+			sa.gateCond.Broadcast()
+		}
 		sa.mu.Unlock()
 		return true
 	}
 	for sa.gateState != StreamStatePaused && len(sa.gateReplayBuf) > 0 {
 		chunk := sa.gateReplayBuf[0]
 		sa.gateReplayBuf = sa.gateReplayBuf[1:]
+		sa.gateReplayBufBytes -= chunkBytes(chunk)
+		if sa.gateReplayBufBytes < 0 {
+			sa.gateReplayBufBytes = 0
+		}
 		ch := sa.gateFlusherCh
 		ctx := sa.gateFlusherCtx

Also applies to: 357-376

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate.go` around lines 344 - 346, When reworking replay
ordering, don't base queuing on len(sa.gateReplayBuf); instead use the flusher
flag sa.gateFlusherOn as the ordering barrier so live sends don't overtake
replayed chunks: while sa.gateFlusherOn is true, append new chunks to
sa.gateReplayBuf (and call sa.gateCond.Broadcast as before) and update
sa.gateReplayBufBytes when pushing; similarly, decrement gateReplayBufBytes when
drainBufferLocked pops a chunk so byte accounting stays accurate; ensure the
send path uses sa.gateFlusherOn to decide between queueing vs calling
sendOrCancel so resume→pause cycles can't reuse stale byte counts.
🧹 Nitpick comments (1)
framework/streaming/gate_test.go (1)

118-123: Replace fixed sleeps with deadline-based polling to reduce flakiness.

time.Sleep(20ms) negative assertions are timing-sensitive under CI load; prefer bounded polling on invariants.

Also applies to: 181-184, 233-236, 315-318

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate_test.go` around lines 118 - 123, Replace fixed
time.Sleep calls (e.g., the sleep before calling r.snapshot() and similar at the
other ranges) with deadline-based polling: loop until a deadline (now + timeout)
and repeatedly call r.snapshot() (or the same invariant check used at lines
181-184, 233-236, 315-318) sleeping briefly (e.g., 5–10ms) between attempts;
break and proceed when the expected condition (e.g., len(cs) == 2 or pause
observed) is met, and fail the test if the deadline passes. Use the existing
r.snapshot() and pauseObservedAt checks to drive the loop so tests no longer
rely on a fixed 20ms sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/schemas/context.go`:
- Around line 357-405: PauseStream/ResumeStream/EndStream currently call
bc.SetValue(BifrostContextKeyStreamGated, true) which allows plugins to
overwrite this reserved routing flag; change these methods to write the flag
through the Bifrost-internal bypass (e.g., an existing internal setter or new
helper such as bc.setInternalValue / bc.setBypassReserved) instead of the public
SetValue, and ensure BifrostContextKeyStreamGated is included in the
reservedKeys so BlockRestrictedWrites will block external writes; update
PauseStream, ResumeStream, and EndStream to use that internal setter and do not
expose the reserved flag to public SetValue paths.

In `@core/schemas/tracer.go`:
- Around line 240-280: DefaultTracer currently returns NoOpTracer which makes
PauseStream, ResumeStream, EndStream and GateSend no-ops and silently disables
stream gating; update the implementation so the default tracer provides the
minimal gate/accumulator behavior (or separate the gate into its own component
used by DefaultTracer) so PauseStream, ResumeStream and EndStream actually
manage state and GateSend buffers/resumes accordingly. Specifically, replace or
extend NoOpTracer with a lightweight GateAccumulator that tracks stream state,
implements PauseStream/ResumeStream/EndStream, implements
GetAccumulatedResponse/IsStreamPaused/IsStreamEnded, and GateSend should enqueue
to the accumulator when paused, flush on resume, and return false on
closed/ended streams; alternatively extract gating logic into a StreamGate type
and have DefaultTracer instantiate and delegate to it instead of NoOpTracer so
gating works even when tracing is disabled.

In `@framework/streaming/gate_test.go`:
- Around line 622-624: The test creates a recorder via newRecorder(32) (variable
r) but only closes it at the end, risking goroutine leaks if a t.Fatalf occurs;
change the test to defer closing/shutting down r immediately after creating it
(e.g., defer r.Close() or r.Shutdown()) so recorder.run() is always stopped on
early returns/failures; apply the same defer pattern to the other occurrence
around makeChunks(2) (and the instance at lines 654-655) to ensure no goroutines
remain running after test failure.
- Around line 172-177: The setup calls to GateSend (e.g., the sequence using
traceID, chunks[], r.ch, ctx and PauseStream) ignore its bool return; update
each such call (including the blocks around GateSend at the shown location and
the other occurrences around lines 225-232, 342-347, 368-375) to assert the send
succeeded: capture GateSend’s returned bool and fail the test if it’s false
(using the test helper in this file, e.g., t.Fatalf or require.True) so failed
setup sends stop the test immediately.

---

Outside diff comments:
In `@core/providers/anthropic/anthropic.go`:
- Around line 2706-2727: AnthropicProvider.PassthroughStream currently uses raw
channel operations (close(ch) and ch <- ...) which bypass the gating/replay
behavior; replace direct sends of *schemas.BifrostStreamChunk with
providerUtils.GateSendChunk(ctx, chunk, ch) (wrapping the same
BifrostPassthroughResponse payload) and replace direct close(ch) calls with
providerUtils.CloseStream(ctx, ch); ensure the same variables (statusCode,
headers, extraFields, and the byte slice body chunk) are used when constructing
the chunk passed to GateSendChunk so paused streams still buffer/replay
correctly.

In `@core/providers/azure/azure.go`:
- Around line 2858-2879: In AzureProvider.PassthroughStream, replace direct
channel sends and raw close(ch) with the gate-aware helpers: use
providerUtils.GateSendChunk(ctx,
&schemas.BifrostStreamChunk{BifrostPassthroughResponse: ...}) instead of sending
into ch directly, and replace the raw close(ch) with
providerUtils.CloseStream(ctx, ch) so pause/resume/end gating is respected; also
remove or change any deferred close(ch) to defer providerUtils.CloseStream(ctx,
ch) and ensure all code paths that previously wrote to ch (creating
schemas.BifrostStreamChunk / BifrostPassthroughResponse) now call
providerUtils.GateSendChunk and handle ctx.Done()/errors consistently.

In `@core/providers/gemini/gemini.go`:
- Around line 4207-4208: PassthroughStream currently closes the channel and
sends chunks directly using close(ch) and direct select/send which bypasses
gated buffering/replay; change PassthroughStream to call
providerUtils.GateSendChunk(ctx, chunk, ch) for all chunk sends and replace raw
close(ch) with providerUtils.CloseStream(ctx, ch) so this stream path respects
pause/resume/end gate semantics; update every direct send/select and close in
the PassthroughStream implementation (references: PassthroughStream,
providerUtils.GateSendChunk, providerUtils.CloseStream, ch, chunk) accordingly.

In `@core/providers/openai/openai.go`:
- Around line 7008-7009: The passthrough streaming path in PassthroughStream
(and the nearby block around the other loop at 7020-7032) currently uses direct
channel sends and raw close(ch), which bypasses pause/resume/end gating; update
the code to deliver chunks via providerUtils.GateSendChunk(ctx, chunk, ch)
instead of direct ch <- chunk, and replace any raw close(ch) calls with
providerUtils.CloseStream(ctx, ch) so pause/resume/end semantics are respected
for passthrough streams. Ensure you change all occurrences in the
PassthroughStream handler and the adjacent send/close block (the second loop
noted) to use these providerUtils helpers.

---

Duplicate comments:
In `@framework/streaming/accumulator.go`:
- Around line 389-399: The force-end branch that marks acc.gateState =
StreamStateEnded and clears acc.gateReplayBuf must also clear any staged
terminal delivery so gateFlusher cannot emit a synthetic terminal: when
forceEndGate is true (in the same block that broadcasts acc.gateCond and clears
acc.gateReplayBuf/gateReplayBufBytes), also reset acc.gatePendingTerminal and
acc.gateEndError (or otherwise nil/zero them) so that gateFlusher sees no
pending terminal and will exit as a pure drop path instead of emitting
EndStream(err).

In `@framework/streaming/gate.go`:
- Around line 344-346: When reworking replay ordering, don't base queuing on
len(sa.gateReplayBuf); instead use the flusher flag sa.gateFlusherOn as the
ordering barrier so live sends don't overtake replayed chunks: while
sa.gateFlusherOn is true, append new chunks to sa.gateReplayBuf (and call
sa.gateCond.Broadcast as before) and update sa.gateReplayBufBytes when pushing;
similarly, decrement gateReplayBufBytes when drainBufferLocked pops a chunk so
byte accounting stays accurate; ensure the send path uses sa.gateFlusherOn to
decide between queueing vs calling sendOrCancel so resume→pause cycles can't
reuse stale byte counts.

---

Nitpick comments:
In `@framework/streaming/gate_test.go`:
- Around line 118-123: Replace fixed time.Sleep calls (e.g., the sleep before
calling r.snapshot() and similar at the other ranges) with deadline-based
polling: loop until a deadline (now + timeout) and repeatedly call r.snapshot()
(or the same invariant check used at lines 181-184, 233-236, 315-318) sleeping
briefly (e.g., 5–10ms) between attempts; break and proceed when the expected
condition (e.g., len(cs) == 2 or pause observed) is met, and fail the test if
the deadline passes. Use the existing r.snapshot() and pauseObservedAt checks to
drive the loop so tests no longer rely on a fixed 20ms sleep.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 59daeec9-dd51-4403-8258-a5c6b1347b3f

📥 Commits

Reviewing files that changed from the base of the PR and between d677de6 and db9e5e8.

📒 Files selected for processing (24)
  • AGENTS.md
  • core/bifrost.go
  • core/providers/anthropic/anthropic.go
  • core/providers/azure/azure.go
  • core/providers/bedrock/bedrock.go
  • core/providers/cohere/cohere.go
  • core/providers/elevenlabs/elevenlabs.go
  • core/providers/gemini/gemini.go
  • core/providers/huggingface/huggingface.go
  • core/providers/mistral/mistral.go
  • core/providers/openai/openai.go
  • core/providers/replicate/replicate.go
  • core/providers/utils/utils.go
  • core/providers/vllm/vllm.go
  • core/schemas/bifrost.go
  • core/schemas/context.go
  • core/schemas/tracer.go
  • framework/streaming/accumulator.go
  • framework/streaming/gate.go
  • framework/streaming/gate_test.go
  • framework/streaming/images.go
  • framework/streaming/types.go
  • framework/tracing/tracer.go
  • ui/components/ui/form.tsx
✅ Files skipped from review due to trivial changes (4)
  • core/providers/bedrock/bedrock.go
  • AGENTS.md
  • core/providers/huggingface/huggingface.go
  • ui/components/ui/form.tsx
🚧 Files skipped from review as they are similar to previous changes (4)
  • core/providers/elevenlabs/elevenlabs.go
  • framework/streaming/images.go
  • core/providers/mistral/mistral.go
  • core/providers/replicate/replicate.go

Comment thread core/schemas/context.go
Comment on lines +357 to +405
// PauseStream marks the active streaming response associated with this context
// as paused. While paused, chunks continue to flow through PostLLMHook (so
// plugins can still inspect them), but they are buffered instead of delivered
// to the client. Buffered chunks are flushed in order when ResumeStream is
// called. Idempotent. No-op if no Tracer or trace ID is present in ctx.
//
// Calling this method engages the pause/resume gate for the stream: provider
// send sites switch from a direct channel send to Tracer.GateSend. Streams that
// never call Pause/Resume/End pay no extra cost.
func (bc *BifrostContext) PauseStream() {
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
if tr == nil || tid == "" {
return
}
bc.SetValue(BifrostContextKeyStreamGated, true)
tr.PauseStream(tid)
}

// ResumeStream resumes a previously paused stream. Buffered chunks are flushed
// to the client in order, then live streaming continues. Idempotent. No-op if
// no Tracer or trace ID is present in ctx.
//
// Engages the pause/resume gate (see PauseStream).
func (bc *BifrostContext) ResumeStream() {
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
if tr == nil || tid == "" {
return
}
bc.SetValue(BifrostContextKeyStreamGated, true)
tr.ResumeStream(tid)
}

// EndStream terminates the active streaming response. Any buffered chunks are
// flushed first; if err is non-nil it is then delivered as a final error chunk.
// After EndStream returns, all further provider chunks for this stream are
// dropped (PostLLMHook still fires, but no client delivery happens). Idempotent.
// No-op if no Tracer or trace ID is present in ctx.
//
// Engages the pause/resume gate (see PauseStream).
func (bc *BifrostContext) EndStream(err *BifrostError) {
tr, _ := bc.Value(BifrostContextKeyTracer).(Tracer)
tid, _ := bc.Value(BifrostContextKeyTraceID).(string)
if tr == nil || tid == "" {
return
}
bc.SetValue(BifrostContextKeyStreamGated, true)
tr.EndStream(tid, err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Shield BifrostContextKeyStreamGated from direct SetValue writes.

These helpers introduce a new Bifrost-owned routing flag, but reservedKeys still doesn't protect it. A plugin can manually flip BifrostContextKeyStreamGated and desynchronize provider routing from the tracer's actual pause/end state. Please treat this like the other internal keys, and if PauseStream/ResumeStream/EndStream still need to set it from plugin scopes, write it through an internal bypass instead of the public SetValue path.

As per coding guidelines, "reserved keys set by Bifrost internals should not be set manually; BlockRestrictedWrites() silently drops writes to reserved keys."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/schemas/context.go` around lines 357 - 405,
PauseStream/ResumeStream/EndStream currently call
bc.SetValue(BifrostContextKeyStreamGated, true) which allows plugins to
overwrite this reserved routing flag; change these methods to write the flag
through the Bifrost-internal bypass (e.g., an existing internal setter or new
helper such as bc.setInternalValue / bc.setBypassReserved) instead of the public
SetValue, and ensure BifrostContextKeyStreamGated is included in the
reservedKeys so BlockRestrictedWrites will block external writes; update
PauseStream, ResumeStream, and EndStream to use that internal setter and do not
expose the reserved flag to public SetValue paths.

Comment thread core/schemas/tracer.go
Comment on lines +240 to +280
// PauseStream does nothing.
func (n *NoOpTracer) PauseStream(_ string) {}

// ResumeStream does nothing.
func (n *NoOpTracer) ResumeStream(_ string) {}

// EndStream does nothing.
func (n *NoOpTracer) EndStream(_ string, _ *BifrostError) {}

// WaitForFlusher does nothing — NoOpTracer has no gate or flusher.
func (n *NoOpTracer) WaitForFlusher(_ string) {}

// IsStreamEnded returns false — NoOpTracer has no gate state.
func (n *NoOpTracer) IsStreamEnded(_ string) bool { return false }

// IsStreamPaused returns false — NoOpTracer has no gate state.
func (n *NoOpTracer) IsStreamPaused(_ string) bool { return false }

// GetAccumulatedResponse returns nil — NoOpTracer has no accumulator.
func (n *NoOpTracer) GetAccumulatedResponse(_ string) *BifrostResponse { return nil }

// GateSend forwards the chunk directly to the channel with ctx.Done() guard.
// NoOpTracer has no gate state, so this is a pure passthrough. Recovers from
// "send on closed channel" so a closed consumer cannot crash the producer.
func (n *NoOpTracer) GateSend(_ string, chunk *BifrostStreamChunk, _ bool, _ bool, ch chan *BifrostStreamChunk, ctx *BifrostContext) (ok bool) {
defer func() {
if recover() != nil {
ok = false
}
}()
if ctx == nil {
ch <- chunk
return true
}
select {
case ch <- chunk:
return true
case <-ctx.Done():
return false
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

DefaultTracer silently disables the new stream gate.

DefaultTracer() still returns NoOpTracer, but NoOpTracer.PauseStream, ResumeStream, and EndStream are all no-ops and GateSend remains a pure passthrough. In any deployment where tracing is disabled, a plugin can call the new pause/end APIs and nothing will actually buffer, resume, or terminate on the wire.

Please back the default tracer with the minimal accumulator/gate implementation, or split stream gating out of the tracer abstraction so this feature still works when observability is off.

Also applies to: 291-294

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/schemas/tracer.go` around lines 240 - 280, DefaultTracer currently
returns NoOpTracer which makes PauseStream, ResumeStream, EndStream and GateSend
no-ops and silently disables stream gating; update the implementation so the
default tracer provides the minimal gate/accumulator behavior (or separate the
gate into its own component used by DefaultTracer) so PauseStream, ResumeStream
and EndStream actually manage state and GateSend buffers/resumes accordingly.
Specifically, replace or extend NoOpTracer with a lightweight GateAccumulator
that tracks stream state, implements PauseStream/ResumeStream/EndStream,
implements GetAccumulatedResponse/IsStreamPaused/IsStreamEnded, and GateSend
should enqueue to the accumulator when paused, flush on resume, and return false
on closed/ended streams; alternatively extract gating logic into a StreamGate
type and have DefaultTracer instantiate and delegate to it instead of NoOpTracer
so gating works even when tracing is disabled.

Comment on lines +172 to +177
a.GateSend(traceID, chunks[0], false, false, r.ch, ctx)
a.PauseStream(traceID)
for i := 1; i < 5; i++ {
a.GateSend(traceID, chunks[i], false, false, r.ch, ctx)
}
a.GateSend(traceID, chunks[5], true, false, r.ch, ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert GateSend return values in setup paths.

Several setup sends ignore bool results. If a send fails, the test can continue with a misleading failure signal.

Suggested pattern (apply similarly across these blocks)
-	a.GateSend(traceID, chunks[0], false, false, r.ch, ctx)
+	if !a.GateSend(traceID, chunks[0], false, false, r.ch, ctx) {
+		t.Fatalf("initial send returned false")
+	}

Also applies to: 225-232, 342-347, 368-375

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate_test.go` around lines 172 - 177, The setup calls to
GateSend (e.g., the sequence using traceID, chunks[], r.ch, ctx and PauseStream)
ignore its bool return; update each such call (including the blocks around
GateSend at the shown location and the other occurrences around lines 225-232,
342-347, 368-375) to assert the send succeeded: capture GateSend’s returned bool
and fail the test if it’s false (using the test helper in this file, e.g.,
t.Fatalf or require.True) so failed setup sends stop the test immediately.

Comment on lines +622 to +624
r := newRecorder(32)
chunks := makeChunks(2)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Defer recorder shutdown to prevent goroutine leaks on early test failure.

This test closes r only at the tail; any earlier t.Fatalf leaves recorder.run() alive.

Suggested fix
 r := newRecorder(32)
+defer r.close()
 chunks := makeChunks(2)
@@
-	r.close()

Also applies to: 654-655

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@framework/streaming/gate_test.go` around lines 622 - 624, The test creates a
recorder via newRecorder(32) (variable r) but only closes it at the end, risking
goroutine leaks if a t.Fatalf occurs; change the test to defer closing/shutting
down r immediately after creating it (e.g., defer r.Close() or r.Shutdown()) so
recorder.run() is always stopped on early returns/failures; apply the same defer
pattern to the other occurrence around makeChunks(2) (and the instance at lines
654-655) to ensure no goroutines remain running after test failure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants