Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/4057.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed premature user turn stops caused by late transcriptions arriving between turns. A stale transcript from the previous turn could persist into the next turn and trigger a stop before the current turn's real transcript arrived. Stop strategies are now reset at both turn start and turn stop to prevent state from leaking across turn boundaries.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ async def reset(self):
self._vad_user_speaking = False
self._transcript_finalized = False
self._vad_stopped_time = None
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None

async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ async def reset(self):
self._vad_user_speaking = False
self._vad_stopped_time = None
self._transcript_finalized = False
if self._timeout_task:
await self.task_manager.cancel_task(self._timeout_task)
self._timeout_task = None

async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Expand Down
4 changes: 4 additions & 0 deletions src/pipecat/turns/user_turn_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ async def _trigger_user_turn_start(
for s in self._user_turn_strategies.start or []:
await s.reset()

# Reset all user turn stop strategies to start fresh for the new turn.
for s in self._user_turn_strategies.stop or []:
await s.reset()

await self._call_event_handler("on_user_turn_started", strategy, params)

async def _trigger_user_turn_stop(
Expand Down
68 changes: 68 additions & 0 deletions tests/test_user_turn_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
Expand Down Expand Up @@ -199,6 +200,73 @@ async def on_user_turn_stop_timeout(controller):
self.assertTrue(should_stop)
self.assertTrue(timeout)

async def test_late_transcription_between_turns_no_premature_stop(self):
"""Test that a late transcription arriving between turns does not cause a premature stop.

Reproduces the bug from issue #4053: after turn 1 completes and reset()
clears state, a late TranscriptionFrame sets _text to stale content. On
the next turn, that stale _text gates a premature turn stop via timeout(0)
before the current turn's transcript arrives.

Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy)
so the late transcription doesn't trigger a spurious turn start.
"""
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)

await controller.setup(self.task_manager)

start_count = 0
stop_count = 0

@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal start_count
start_count += 1

@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal stop_count
stop_count += 1

# === Turn 1: S-T-E ===
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 1)

await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)

await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 1)

# === Between turns: late transcription arrives ===
# This sets _text on the stop strategy while _user_turn is False.
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)

# === Turn 2: S-T-E (transcription arrives during turn) ===
# The fix resets stop strategies at turn start, clearing stale _text.
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 2)

await controller.process_frame(
TranscriptionFrame(text="How are you?", user_id="", timestamp="now")
)

await controller.process_frame(VADUserStoppedSpeakingFrame())

# Wait for user_speech_timeout to elapse — should get turn 2 stop
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 2)


if __name__ == "__main__":
unittest.main()
44 changes: 44 additions & 0 deletions tests/test_user_turn_stop_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,50 @@ async def on_user_turn_stopped(strategy, params):
# Finalized transcript received after timeout, triggers immediately
self.assertTrue(should_start)

async def test_reset_clears_stale_text_no_premature_stop(self):
"""Test that reset() clears stale text and cancels timeout, preventing premature stop.

Reproduces the bug from issue #4053: after turn 1 completes and
reset() is called, a late transcription sets _text. If reset() is
called again at turn 2 start, the stale _text should be cleared
so no premature stop occurs on VAD stop.
"""
strategy = await self._create_strategy()

stop_count = 0

@strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
nonlocal stop_count
stop_count += 1

# === Turn 1: S-T-E ===
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))
await strategy.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 1)

# Reset after turn 1 (as controller would do at turn stop)
await strategy.reset()

# === Late transcription arrives between turns ===
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp=""))

# Reset at turn 2 start (the fix: controller now resets stop strategies at turn start)
await strategy.reset()

# === Turn 2: S-T-E (transcription arrives during turn) ===
await strategy.process_frame(VADUserStartedSpeakingFrame())
await strategy.process_frame(
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
)
await strategy.process_frame(VADUserStoppedSpeakingFrame())

# Wait for timeout — should get turn 2 stop with the real transcription
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 2)


class TestExternalUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
async def test_external_strategy(self):
Expand Down
Loading