fix(channel): consume provider streaming in tool loop drafts#2868
fix(channel): consume provider streaming in tool loop drafts#2868theonlyhennygod merged 1 commit intodevfrom
Conversation
|
Note
|
| Cohort / File(s) | Summary |
|---|---|
Agent Loop Streaming src/agent/loop_.rs |
Introduces streaming response consumption with marker-window detection for tool payloads, new consume_provider_streaming_response and call_provider_chat async helpers, should_consume_provider_stream decision logic, and StreamedChatOutcome tracking. Modifies run_tool_call_loop to conditionally use streaming path with fallback, propagates response_streamed_live flag downstream for post-stream behavior adjustment, and adds tests (StreamingScriptedProvider) for streaming scenarios, tool payloads, deduplication, and edge cases. |
Sequence Diagram(s)
sequenceDiagram
participant Agent as Agent Loop
participant Provider as Provider
participant DeltaChan as Delta Channel
participant History as Response History
Agent->>Agent: should_consume_provider_stream?
alt Stream Path
Agent->>Provider: stream_chat_with_history()
loop Consume Deltas
Provider-->>Agent: streaming delta
Agent->>Agent: looks_like_streamed_tool_payload()?
alt Not Tool Payload
Agent->>DeltaChan: forward delta
end
Agent->>Agent: update marker window
end
Agent->>Agent: finalize StreamedChatOutcome
else Fallback Path
Agent->>Provider: chat()
Provider-->>Agent: complete response
Agent->>DeltaChan: chunk & emit post-hoc
end
Agent->>History: integrate streaming state
History-->>Agent: ChatResponse with streamed flag
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~45 minutes
Suggested labels
risk: high
🚥 Pre-merge checks | ✅ 3 | ❌ 2
❌ Failed checks (2 warnings)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Description check | The PR description provides a clear summary of changes, validation steps executed, and links to the closed issue; however, it lacks most required template sections (Label Snapshot, Change Metadata, Linked Issue details, Validation Evidence, Security/Privacy assessments, etc.). | Complete the PR description by adding all required template sections including Risk/Size/Scope labels, Change type/Primary scope metadata, full Linked Issue section with Linear keys, comprehensive Validation Evidence, Security/Privacy Impact, and Compatibility/Rollback sections. | |
| Docstring Coverage | Docstring coverage is 46.67% which is insufficient. The required threshold is 80.00%. | Write docstrings for the functions missing them to satisfy the coverage threshold. |
✅ Passed checks (3 passed)
| Check name | Status | Explanation |
|---|---|---|
| Title check | ✅ Passed | The title clearly and specifically describes the primary change: consuming provider streaming in the tool loop draft handling, which directly addresses the bug reported in the linked issue. |
| Linked Issues check | ✅ Passed | The PR implements all core requirements from issue #2721: consumes provider streaming in run_tool_call_loop when streaming is active, preserves tool-loop semantics by suppressing tool payload markers, provides fallback to non-streaming chat, skips post-hoc chunking when live deltas delivered response, and adds regression tests. |
| Out of Scope Changes check | ✅ Passed | All changes in src/agent/loop_.rs are directly scoped to implementing streaming consumption in the tool call loop, with helper functions and tests supporting this objective; no unrelated modifications detected. |
✏️ Tip: You can configure your own custom pre-merge checks in the settings.
✨ Finishing Touches
- 📝 Generate docstrings (stacked PR)
- 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
issue-2721-provider-streaming-dev
Comment @coderabbitai help to get the list of available commands and usage tips.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/loop_.rs`:
- Around line 506-511: The looks_like_streamed_tool_payload function currently
checks only a few markers; extend its lowered.contains checks to include all
accepted tool-call aliases and fenced variants (e.g., "<tool-call", "<invoke",
"tool-call", "```tool-call", "```tool ", "```invoke" and any minimax-style
aliases your parser accepts) so streamed tool payloads are detected across those
formats; update the contains() conditions in looks_like_streamed_tool_payload to
include these additional substring checks.
- Around line 930-953: The Err(stream_err) branch should short-circuit when the
error represents a cancellation instead of treating it as a normal streaming
failure: detect cancellation on stream_err (e.g., match against the
cancellation/aborted error variant or use is_cancelled()-style API for the error
type) and return/propagate immediately before calling tracing::warn!,
runtime_trace::record_event, sending DRAFT_CLEAR_SENTINEL via on_delta, or
invoking call_provider_chat; otherwise continue with the existing fallback logic
that logs provider_name, model, iteration and emits the runtime trace and
sentinel before calling call_provider_chat.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9111dd61-3451-4b6f-b8ce-34b6c1bd2daa
📒 Files selected for processing (1)
src/agent/loop_.rs
| fn looks_like_streamed_tool_payload(window: &str) -> bool { | ||
| let lowered = window.to_ascii_lowercase(); | ||
| lowered.contains("<tool_call") | ||
| || lowered.contains("<toolcall") | ||
| || lowered.contains("\"tool_calls\"") | ||
| } |
There was a problem hiding this comment.
Expand streamed tool-payload detection to all accepted tool-call aliases.
Current detection only matches three markers, but this file’s parser supports additional wrappers (e.g., <tool-call>, <invoke>, minimax aliases, fenced variants). On those formats, streamed tool payload can still leak into the draft before suppression.
Proposed fix
fn looks_like_streamed_tool_payload(window: &str) -> bool {
let lowered = window.to_ascii_lowercase();
lowered.contains("<tool_call")
|| lowered.contains("<toolcall")
+ || lowered.contains("<tool-call")
+ || lowered.contains("<invoke")
+ || lowered.contains("<minimax:tool_call")
+ || lowered.contains("<minimax:toolcall")
+ || lowered.contains("```tool_call")
+ || lowered.contains("```tool-call")
+ || lowered.contains("```tool ")
+ || lowered.contains("```invoke")
|| lowered.contains("\"tool_calls\"")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn looks_like_streamed_tool_payload(window: &str) -> bool { | |
| let lowered = window.to_ascii_lowercase(); | |
| lowered.contains("<tool_call") | |
| || lowered.contains("<toolcall") | |
| || lowered.contains("\"tool_calls\"") | |
| } | |
| fn looks_like_streamed_tool_payload(window: &str) -> bool { | |
| let lowered = window.to_ascii_lowercase(); | |
| lowered.contains("<tool_call") | |
| || lowered.contains("<toolcall") | |
| || lowered.contains("<tool-call") | |
| || lowered.contains("<invoke") | |
| || lowered.contains("<minimax:tool_call") | |
| || lowered.contains("<minimax:toolcall") | |
| || lowered.contains(" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/loop_.rs` around lines 506 - 511, The
looks_like_streamed_tool_payload function currently checks only a few markers;
extend its lowered.contains checks to include all accepted tool-call aliases and
fenced variants (e.g., "<tool-call", "<invoke", "tool-call", "```tool-call",
"```tool ", "```invoke" and any minimax-style aliases your parser accepts) so
streamed tool payloads are detected across those formats; update the contains()
conditions in looks_like_streamed_tool_payload to include these additional
substring checks.
| Err(stream_err) => { | ||
| tracing::warn!( | ||
| provider = provider_name, | ||
| model = model, | ||
| iteration = iteration + 1, | ||
| "provider streaming failed, falling back to non-streaming chat: {stream_err}" | ||
| ); | ||
| runtime_trace::record_event( | ||
| "llm_stream_fallback", | ||
| Some(channel_name), | ||
| Some(provider_name), | ||
| Some(model), | ||
| Some(&turn_id), | ||
| Some(false), | ||
| Some("provider stream failed; fallback to non-streaming chat"), | ||
| serde_json::json!({ | ||
| "iteration": iteration + 1, | ||
| "error": scrub_credentials(&stream_err.to_string()), | ||
| }), | ||
| ); | ||
| if let Some(ref tx) = on_delta { | ||
| let _ = tx.send(DRAFT_CLEAR_SENTINEL.to_string()).await; | ||
| } | ||
| call_provider_chat( |
There was a problem hiding this comment.
Short-circuit cancellation instead of treating it as stream fallback failure.
When streaming exits due cancellation, this branch logs llm_stream_fallback, emits clear-sentinel, and enters fallback flow. That misclassifies cancellation and adds unnecessary side effects. Return cancellation directly before fallback handling.
Proposed fix
Err(stream_err) => {
+ if is_tool_loop_cancelled(&stream_err) {
+ return Err(stream_err);
+ }
tracing::warn!(
provider = provider_name,
model = model,
iteration = iteration + 1,
"provider streaming failed, falling back to non-streaming chat: {stream_err}"
);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/agent/loop_.rs` around lines 930 - 953, The Err(stream_err) branch should
short-circuit when the error represents a cancellation instead of treating it as
a normal streaming failure: detect cancellation on stream_err (e.g., match
against the cancellation/aborted error variant or use is_cancelled()-style API
for the error type) and return/propagate immediately before calling
tracing::warn!, runtime_trace::record_event, sending DRAFT_CLEAR_SENTINEL via
on_delta, or invoking call_provider_chat; otherwise continue with the existing
fallback logic that logs provider_name, model, iteration and emits the runtime
trace and sentinel before calling call_provider_chat.
…rafts Re-lands the stranded work from PRs zeroclaw-labs#2868, zeroclaw-labs#2873, zeroclaw-labs#2875 which were merged to a deleted dev branch and never reached master. Changes applied (resolving divergence conflicts against current master): - `src/providers/traits.rs`: add `StreamEvent` enum (TextDelta/ToolCall/Final), `supports_streaming_tool_events()` trait method, and default `stream_chat()` that adapts legacy chunk streams into structured events. - `src/providers/compatible.rs`: add `StreamToolCallDelta` / `StreamToolCallAccumulator` for SSE native tool-call delta accumulation; add `parse_sse_chunk()` / `extract_sse_text_delta()` helpers; add `sse_bytes_to_events()` for structured event streaming; implement `supports_streaming_tool_events()` and `stream_chat()` on `OpenAiCompatibleProvider`. - `src/providers/router.rs`: add `supports_streaming()`, `supports_streaming_tool_events()`, `stream_chat_with_history()`, and `stream_chat()` passthrough to resolved provider. - `src/providers/reliable.rs`: add `supports_streaming_tool_events()` and `stream_chat()` with provider selection logic that respects tool-event capability requirements. - `src/agent/loop_.rs`: add `StreamedChatOutcome`, `call_provider_chat()`, and `consume_provider_streaming_response()` helpers; update `run_tool_call_loop` to consume provider streaming when available (with fallback to non-streaming on error); skip post-hoc whitespace chunking when live deltas were already forwarded. - `src/channels/wati.rs`: fix pre-existing missing `attachments` field in `ChannelMessage` initializer (unrelated build error surfaced during compilation). Closes zeroclaw-labs#4093 Refs zeroclaw-labs#2721, zeroclaw-labs#2869 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
run_tool_call_loopwhen channel draft streaming is active and request tools are not attachedprovider.chat(...)with trace eventllm_stream_fallbackValidation
cargo fmt --allcargo test run_tool_call_loop_consumes_provider_stream_for_final_response -- --nocapturecargo test run_tool_call_loop_streaming_path_preserves_tool_loop_semantics -- --nocapturecargo test process_channel_message_streaming_ -- --nocaptureCloses #2721
Summary by CodeRabbit