Skip to content

Commit 9350afd

Browse files
teknium1Ryan
authored andcommitted
fix(agent): stateful streaming scrubber for reasoning-block leaks (NousResearch#17924) (NousResearch#20184)
* revert(gateway): remove stale-code self-check and auto-restart Removes the _detect_stale_code / _trigger_stale_code_restart mechanism introduced in NousResearch#17648 and iterated in NousResearch#19740. On every incoming message the gateway compared the boot-time git HEAD SHA to the current SHA on disk, and if they differed it would reply with Gateway code was updated in the background -- restarting this gateway so your next message runs on the new code. Please retry in a moment. and then kick off a graceful restart. This is unwanted behaviour: users who run a long-lived gateway and do their own ad-hoc git operations on the checkout end up with their chat interrupted and the current message dropped every time HEAD moves, with no way to opt out. If an operator really needs the old protection against stale sys.modules after "hermes update", the SIGKILL-survivor sweep in hermes update (hermes_cli/main.py, also tagged NousResearch#17648) already handles the supervisor-respawn case on its own. Removed: gateway/run.py: - _STALE_CODE_SENTINELS, _GIT_SHA_CACHE_TTL_SECS - _read_git_head_sha(), _compute_repo_mtime() module helpers - class-level _boot_wall_time / _boot_repo_mtime / _boot_git_sha / _stale_code_restart_triggered defaults - __init__ boot-snapshot block (_boot_*, _cached_current_sha*, _repo_root_for_staleness, _stale_code_notified) - _current_git_sha_cached(), _detect_stale_code(), _trigger_stale_code_restart() methods - stale-code check + user-facing restart notice at the top of _handle_message() tests/gateway/test_stale_code_self_check.py (deleted, 412 lines) No new logic added. Zero remaining references to any removed symbol. Gateway test suite passes the same 4589 tests it passed before; the 3 pre-existing unrelated failures (discord free-channel, feishu bot admission, teams typing) are unchanged by this commit. * fix(agent): stateful streaming scrubber for reasoning-block leaks (NousResearch#17924) Per-delta _strip_think_blocks ran at _fire_stream_delta and destroyed downstream state. When MiniMax-M2.7 / DeepSeek / Qwen3 streamed a tag split across deltas (delta1='<think>', delta2='Let me check'), the regex case-2 match erased delta1 entirely, so CLI/gateway state machines never learned a block was open and leaked delta2 as content. Raw consumers (ACP, api_server, TTS) had no downstream defense at all. Replace the per-delta regex with a stateful StreamingThinkScrubber that survives delta boundaries: - Closed <tag>X</tag> pairs always stripped (matches _strip_think_blocks case 1). - Unterminated open at block boundary enters a block; content discarded until close tag arrives. At end-of-stream, held content is dropped. - Orphan close tags stripped without boundary gating. - Partial tags at delta boundaries held back until resolved. - Block-boundary rule (start-of-stream, after \n, or whitespace-only since last \n) preserves prose that mentions tag names. Reset at turn start alongside the existing context scrubber; flush at turn end so a benign '<' held back at end-of-stream reaches the UI. E2E-verified on live OpenRouter->MiniMax-m2 streams: closed pairs strip cleanly, first word of post-block content is preserved, pure content passes through unchanged. Stefan's screenshot case (NousResearch#17924) — 'Let me check' getting chopped to ' me check' — no longer happens. Final _strip_think_blocks calls on completed strings (final_response, replay, compression) are preserved; only the streaming per-delta call site switched to the scrubber.
1 parent 1284d05 commit 9350afd

5 files changed

Lines changed: 665 additions & 709 deletions

File tree

agent/think_scrubber.py

Lines changed: 386 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,386 @@
1+
"""Stateful scrubber for reasoning/thinking blocks in streamed assistant text.
2+
3+
``run_agent._strip_think_blocks`` is regex-based and correct for a complete
4+
string, but when it runs *per-delta* in ``_fire_stream_delta`` it destroys
5+
the state that downstream consumers (CLI ``_stream_delta``, gateway
6+
``GatewayStreamConsumer._filter_and_accumulate``) rely on.
7+
8+
Concretely, when MiniMax-M2.7 streams
9+
10+
delta1 = "<think>"
11+
delta2 = "Let me check their config"
12+
delta3 = "</think>"
13+
14+
the per-delta regex erases delta1 entirely (case 2: unterminated-open at
15+
boundary matches ``^<think>...``), so the downstream state machine never
16+
sees the open tag, treats delta2 as regular content, and leaks reasoning
17+
to the user. Consumers that don't run their own state machine (ACP,
18+
api_server, TTS) never had any defence at all — they just emitted
19+
whatever survived the upstream regex.
20+
21+
This module centralises the tag-suppression state machine at the
22+
upstream layer so every stream_delta_callback sees text that has
23+
already had reasoning blocks removed. Partial tags at delta
24+
boundaries are held back until the next delta resolves them, and
25+
end-of-stream flushing surfaces any held-back prose that turned out
26+
not to be a real tag.
27+
28+
Usage::
29+
30+
scrubber = StreamingThinkScrubber()
31+
for delta in stream:
32+
visible = scrubber.feed(delta)
33+
if visible:
34+
emit(visible)
35+
tail = scrubber.flush() # at end of stream
36+
if tail:
37+
emit(tail)
38+
39+
The scrubber is re-entrant per agent instance. Call ``reset()`` at
40+
the top of each new turn so a hung block from an interrupted prior
41+
stream cannot taint the next turn's output.
42+
43+
Tag variants handled (case-insensitive):
44+
``<think>``, ``<thinking>``, ``<reasoning>``, ``<thought>``,
45+
``<REASONING_SCRATCHPAD>``.
46+
47+
Block-boundary rule for opens: an opening tag is only treated as a
48+
reasoning-block opener when it appears at the start of the stream,
49+
after a newline (optionally followed by whitespace), or when only
50+
whitespace has been emitted on the current line. This prevents prose
51+
that *mentions* the tag name (e.g. ``"use <think> tags here"``) from
52+
being incorrectly suppressed. Closed pairs (``<think>X</think>``) are
53+
always suppressed regardless of boundary; a closed pair is an
54+
intentional, bounded construct.
55+
"""
56+
57+
from __future__ import annotations
58+
59+
from typing import Tuple
60+
61+
__all__ = ["StreamingThinkScrubber"]
62+
63+
64+
class StreamingThinkScrubber:
65+
"""Stateful scrubber for streaming reasoning/thinking blocks.
66+
67+
State machine:
68+
- ``_in_block``: True while inside an opened block, waiting for
69+
a close tag. All text inside is discarded.
70+
- ``_buf``: held-back partial-tag tail. Emitted / discarded on
71+
the next ``feed()`` call or by ``flush()``.
72+
- ``_last_emitted_ended_newline``: True iff the most recent
73+
emission to the consumer ended with ``\\n``, or nothing has
74+
been emitted yet (start-of-stream counts as a boundary). Used
75+
to decide whether an open tag at buffer position 0 is at a
76+
block boundary.
77+
"""
78+
79+
_OPEN_TAG_NAMES: Tuple[str, ...] = (
80+
"think",
81+
"thinking",
82+
"reasoning",
83+
"thought",
84+
"REASONING_SCRATCHPAD",
85+
)
86+
87+
# Materialise literal tag strings so the hot path does string
88+
# operations, not regex compilation per feed().
89+
_OPEN_TAGS: Tuple[str, ...] = tuple(f"<{name}>" for name in _OPEN_TAG_NAMES)
90+
_CLOSE_TAGS: Tuple[str, ...] = tuple(f"</{name}>" for name in _OPEN_TAG_NAMES)
91+
92+
# Pre-compute the longest tag (for partial-tag hold-back bound).
93+
_MAX_TAG_LEN: int = max(len(tag) for tag in _OPEN_TAGS + _CLOSE_TAGS)
94+
95+
def __init__(self) -> None:
96+
self._in_block: bool = False
97+
self._buf: str = ""
98+
self._last_emitted_ended_newline: bool = True
99+
100+
def reset(self) -> None:
101+
"""Reset all state. Call at the top of every new turn."""
102+
self._in_block = False
103+
self._buf = ""
104+
self._last_emitted_ended_newline = True
105+
106+
def feed(self, text: str) -> str:
107+
"""Feed one delta; return the scrubbed visible portion.
108+
109+
May return an empty string when the entire delta is reasoning
110+
content or is being held back pending resolution of a partial
111+
tag at the boundary.
112+
"""
113+
if not text:
114+
return ""
115+
buf = self._buf + text
116+
self._buf = ""
117+
out: list[str] = []
118+
119+
while buf:
120+
if self._in_block:
121+
# Hunt for the earliest close tag.
122+
close_idx, close_len = self._find_first_tag(
123+
buf, self._CLOSE_TAGS,
124+
)
125+
if close_idx == -1:
126+
# No close yet — hold back a potential partial
127+
# close-tag prefix; discard everything else.
128+
held = self._max_partial_suffix(buf, self._CLOSE_TAGS)
129+
self._buf = buf[-held:] if held else ""
130+
return "".join(out)
131+
# Found close: discard block content + tag, continue.
132+
buf = buf[close_idx + close_len:]
133+
self._in_block = False
134+
else:
135+
# Priority 1 — closed <tag>X</tag> pair anywhere in
136+
# buf. Closed pairs are always an intentional,
137+
# bounded construct (even mid-line prose containing
138+
# an open/close pair is almost certainly a model
139+
# leaking reasoning inline), so no boundary gating.
140+
pair = self._find_earliest_closed_pair(buf)
141+
# Priority 2 — unterminated open tag at a block
142+
# boundary. Boundary-gated so prose that mentions
143+
# '<think>' isn't over-stripped.
144+
open_idx, open_len = self._find_open_at_boundary(
145+
buf, out,
146+
)
147+
148+
# Pick whichever match comes earliest in the buffer.
149+
if pair is not None and (
150+
open_idx == -1 or pair[0] <= open_idx
151+
):
152+
start_idx, end_idx = pair
153+
preceding = buf[:start_idx]
154+
if preceding:
155+
preceding = self._strip_orphan_close_tags(preceding)
156+
if preceding:
157+
out.append(preceding)
158+
self._last_emitted_ended_newline = (
159+
preceding.endswith("\n")
160+
)
161+
buf = buf[end_idx:]
162+
continue
163+
164+
if open_idx != -1:
165+
# Unterminated open at boundary — emit preceding,
166+
# enter block, continue loop with remainder.
167+
preceding = buf[:open_idx]
168+
if preceding:
169+
preceding = self._strip_orphan_close_tags(preceding)
170+
if preceding:
171+
out.append(preceding)
172+
self._last_emitted_ended_newline = (
173+
preceding.endswith("\n")
174+
)
175+
self._in_block = True
176+
buf = buf[open_idx + open_len:]
177+
continue
178+
179+
# No resolvable tag structure in buf. Hold back any
180+
# partial-tag prefix at the tail so a split tag
181+
# across deltas isn't missed, then emit the rest.
182+
held = self._max_partial_suffix(buf, self._OPEN_TAGS)
183+
held_close = self._max_partial_suffix(
184+
buf, self._CLOSE_TAGS,
185+
)
186+
held = max(held, held_close)
187+
if held:
188+
emit_text = buf[:-held]
189+
self._buf = buf[-held:]
190+
else:
191+
emit_text = buf
192+
self._buf = ""
193+
if emit_text:
194+
emit_text = self._strip_orphan_close_tags(emit_text)
195+
if emit_text:
196+
out.append(emit_text)
197+
self._last_emitted_ended_newline = (
198+
emit_text.endswith("\n")
199+
)
200+
return "".join(out)
201+
202+
return "".join(out)
203+
204+
def flush(self) -> str:
205+
"""End-of-stream flush.
206+
207+
If still inside an unterminated block, held-back content is
208+
discarded — leaking partial reasoning is worse than a
209+
truncated answer. Otherwise the held-back partial-tag tail is
210+
emitted verbatim (it turned out not to be a real tag prefix).
211+
"""
212+
if self._in_block:
213+
self._buf = ""
214+
self._in_block = False
215+
return ""
216+
tail = self._buf
217+
self._buf = ""
218+
if not tail:
219+
return ""
220+
tail = self._strip_orphan_close_tags(tail)
221+
if tail:
222+
self._last_emitted_ended_newline = tail.endswith("\n")
223+
return tail
224+
225+
# ── internal helpers ───────────────────────────────────────────────
226+
227+
@staticmethod
228+
def _find_first_tag(
229+
buf: str, tags: Tuple[str, ...],
230+
) -> Tuple[int, int]:
231+
"""Return (earliest_index, tag_length) over *tags*, or (-1, 0).
232+
233+
Case-insensitive match.
234+
"""
235+
buf_lower = buf.lower()
236+
best_idx = -1
237+
best_len = 0
238+
for tag in tags:
239+
idx = buf_lower.find(tag.lower())
240+
if idx != -1 and (best_idx == -1 or idx < best_idx):
241+
best_idx = idx
242+
best_len = len(tag)
243+
return best_idx, best_len
244+
245+
def _find_earliest_closed_pair(self, buf: str):
246+
"""Return (start_idx, end_idx) of the earliest closed pair, else None.
247+
248+
A closed pair is ``<tag>...</tag>`` of any variant. Matches are
249+
case-insensitive and non-greedy (the closest close tag after
250+
an open tag wins), matching the regex ``<tag>.*?</tag>``
251+
semantics of ``_strip_think_blocks`` case 1. When two tag
252+
variants could both match, the one whose open tag appears
253+
earlier wins.
254+
"""
255+
buf_lower = buf.lower()
256+
best: "tuple[int, int] | None" = None
257+
for open_tag, close_tag in zip(self._OPEN_TAGS, self._CLOSE_TAGS):
258+
open_lower = open_tag.lower()
259+
close_lower = close_tag.lower()
260+
open_idx = buf_lower.find(open_lower)
261+
if open_idx == -1:
262+
continue
263+
close_idx = buf_lower.find(
264+
close_lower, open_idx + len(open_lower),
265+
)
266+
if close_idx == -1:
267+
continue
268+
end_idx = close_idx + len(close_lower)
269+
if best is None or open_idx < best[0]:
270+
best = (open_idx, end_idx)
271+
return best
272+
273+
def _find_open_at_boundary(
274+
self, buf: str, already_emitted: list[str],
275+
) -> Tuple[int, int]:
276+
"""Return the earliest block-boundary open-tag (idx, len).
277+
278+
Returns (-1, 0) if no boundary-legal opener is present.
279+
"""
280+
buf_lower = buf.lower()
281+
best_idx = -1
282+
best_len = 0
283+
for tag in self._OPEN_TAGS:
284+
tag_lower = tag.lower()
285+
search_start = 0
286+
while True:
287+
idx = buf_lower.find(tag_lower, search_start)
288+
if idx == -1:
289+
break
290+
if self._is_block_boundary(buf, idx, already_emitted):
291+
if best_idx == -1 or idx < best_idx:
292+
best_idx = idx
293+
best_len = len(tag)
294+
break # first boundary hit for this tag is enough
295+
search_start = idx + 1
296+
return best_idx, best_len
297+
298+
def _is_block_boundary(
299+
self, buf: str, idx: int, already_emitted: list[str],
300+
) -> bool:
301+
"""True iff position *idx* in *buf* is a block boundary.
302+
303+
A block boundary is:
304+
- buf position 0 AND the most recent emission ended with
305+
a newline (or nothing has been emitted yet)
306+
- any position whose preceding text on the current line
307+
(since the last newline in buf) is whitespace-only, AND
308+
if there is no newline in the preceding buf portion, the
309+
most recent prior emission ended with a newline
310+
"""
311+
if idx == 0:
312+
# Check whether the last already-emitted chunk in THIS
313+
# feed() call ended with a newline, otherwise fall back
314+
# to the cross-feed flag.
315+
if already_emitted:
316+
return already_emitted[-1].endswith("\n")
317+
return self._last_emitted_ended_newline
318+
preceding = buf[:idx]
319+
last_nl = preceding.rfind("\n")
320+
if last_nl == -1:
321+
# No newline in buf before the tag — boundary only if the
322+
# prior emission ended with a newline AND everything since
323+
# is whitespace.
324+
if already_emitted:
325+
prior_newline = already_emitted[-1].endswith("\n")
326+
else:
327+
prior_newline = self._last_emitted_ended_newline
328+
return prior_newline and preceding.strip() == ""
329+
# Newline present — text between it and the tag must be
330+
# whitespace-only.
331+
return preceding[last_nl + 1:].strip() == ""
332+
333+
@classmethod
334+
def _max_partial_suffix(
335+
cls, buf: str, tags: Tuple[str, ...],
336+
) -> int:
337+
"""Return the longest buf-suffix that is a prefix of any tag.
338+
339+
Only prefixes strictly shorter than the tag itself count
340+
(full-length suffixes are the tag and are handled as matches,
341+
not held-back partials). Case-insensitive.
342+
"""
343+
if not buf:
344+
return 0
345+
buf_lower = buf.lower()
346+
max_check = min(len(buf_lower), cls._MAX_TAG_LEN - 1)
347+
for i in range(max_check, 0, -1):
348+
suffix = buf_lower[-i:]
349+
for tag in tags:
350+
tag_lower = tag.lower()
351+
if len(tag_lower) > i and tag_lower.startswith(suffix):
352+
return i
353+
return 0
354+
355+
@classmethod
356+
def _strip_orphan_close_tags(cls, text: str) -> str:
357+
"""Remove any close tags from *text* (orphan-close handling).
358+
359+
An orphan close tag has no matching open in the current
360+
scrubber state; it's always noise, stripped with any trailing
361+
whitespace so the surrounding prose flows naturally.
362+
"""
363+
if "</" not in text:
364+
return text
365+
text_lower = text.lower()
366+
out: list[str] = []
367+
i = 0
368+
while i < len(text):
369+
matched = False
370+
if text_lower[i:i + 2] == "</":
371+
for tag in cls._CLOSE_TAGS:
372+
tag_lower = tag.lower()
373+
tag_len = len(tag_lower)
374+
if text_lower[i:i + tag_len] == tag_lower:
375+
# Skip the tag and any trailing whitespace,
376+
# matching _strip_think_blocks case 3.
377+
j = i + tag_len
378+
while j < len(text) and text[j] in " \t\n\r":
379+
j += 1
380+
i = j
381+
matched = True
382+
break
383+
if not matched:
384+
out.append(text[i])
385+
i += 1
386+
return "".join(out)

0 commit comments

Comments
 (0)