From 5000b040ddd5aa10eac3239b87fbbded0362075e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 17 Mar 2026 11:31:08 -0400 Subject: [PATCH 1/2] Fix stale state in user turn stop strategies between turns Reset stop strategies at turn start (not just turn stop) so that late transcriptions arriving between turns do not leave stale _text that causes premature stops on the next turn. Also cancel pending timeout tasks in reset() for both SpeechTimeout and TurnAnalyzer strategies. --- .../speech_timeout_user_turn_stop_strategy.py | 3 + .../turn_analyzer_user_turn_stop_strategy.py | 3 + src/pipecat/turns/user_turn_controller.py | 4 ++ tests/test_user_turn_controller.py | 68 +++++++++++++++++++ tests/test_user_turn_stop_strategy.py | 44 ++++++++++++ 5 files changed, 122 insertions(+) diff --git a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py index 66d6fa703c..ec94d91760 100644 --- a/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/speech_timeout_user_turn_stop_strategy.py @@ -64,6 +64,9 @@ async def reset(self): self._vad_user_speaking = False self._transcript_finalized = False self._vad_stopped_time = None + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py index f141a75b77..232bde2231 100644 --- a/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/turn_analyzer_user_turn_stop_strategy.py @@ -68,6 +68,9 @@ async def reset(self): self._vad_user_speaking = False self._vad_stopped_time = None self._transcript_finalized = False + if self._timeout_task: + await self.task_manager.cancel_task(self._timeout_task) + self._timeout_task = None async def setup(self, task_manager: BaseTaskManager): """Initialize the strategy with the given task manager. diff --git a/src/pipecat/turns/user_turn_controller.py b/src/pipecat/turns/user_turn_controller.py index adaafd298f..a064fc47b3 100644 --- a/src/pipecat/turns/user_turn_controller.py +++ b/src/pipecat/turns/user_turn_controller.py @@ -256,6 +256,10 @@ async def _trigger_user_turn_start( for s in self._user_turn_strategies.start or []: await s.reset() + # Reset all user turn stop strategies to start fresh for the new turn. + for s in self._user_turn_strategies.stop or []: + await s.reset() + await self._call_event_handler("on_user_turn_started", strategy, params) async def _trigger_user_turn_stop( diff --git a/tests/test_user_turn_controller.py b/tests/test_user_turn_controller.py index 72a04a5199..2883d39bd1 100644 --- a/tests/test_user_turn_controller.py +++ b/tests/test_user_turn_controller.py @@ -15,6 +15,7 @@ VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) +from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_start.min_words_user_turn_start_strategy import ( MinWordsUserTurnStartStrategy, ) @@ -199,6 +200,73 @@ async def on_user_turn_stop_timeout(controller): self.assertTrue(should_stop) self.assertTrue(timeout) + async def test_late_transcription_between_turns_no_premature_stop(self): + """Test that a late transcription arriving between turns does not cause a premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and reset() + clears state, a late TranscriptionFrame sets _text to stale content. On + the next turn, that stale _text gates a premature turn stop via timeout(0) + before the current turn's transcript arrives. + + Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy) + so the late transcription doesn't trigger a spurious turn start. + """ + controller = UserTurnController( + user_turn_strategies=UserTurnStrategies( + start=[VADUserTurnStartStrategy()], + stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)], + ), + user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT, + ) + + await controller.setup(self.task_manager) + + start_count = 0 + stop_count = 0 + + @controller.event_handler("on_user_turn_started") + async def on_user_turn_started(controller, strategy, params): + nonlocal start_count + start_count += 1 + + @controller.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(controller, strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 1) + + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # === Between turns: late transcription arrives === + # This sets _text on the stop strategy while _user_turn is False. + await controller.process_frame( + TranscriptionFrame(text="Hello!", user_id="", timestamp="now") + ) + + # === Turn 2: S-T-E (transcription arrives during turn) === + # The fix resets stop strategies at turn start, clearing stale _text. + await controller.process_frame(VADUserStartedSpeakingFrame()) + self.assertEqual(start_count, 2) + + await controller.process_frame( + TranscriptionFrame(text="How are you?", user_id="", timestamp="now") + ) + + await controller.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for user_speech_timeout to elapse — should get turn 2 stop + await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_user_turn_stop_strategy.py b/tests/test_user_turn_stop_strategy.py index 80fb98efce..85f9f2752d 100644 --- a/tests/test_user_turn_stop_strategy.py +++ b/tests/test_user_turn_stop_strategy.py @@ -493,6 +493,50 @@ async def on_user_turn_stopped(strategy, params): # Finalized transcript received after timeout, triggers immediately self.assertTrue(should_start) + async def test_reset_clears_stale_text_no_premature_stop(self): + """Test that reset() clears stale text and cancels timeout, preventing premature stop. + + Reproduces the bug from issue #4053: after turn 1 completes and + reset() is called, a late transcription sets _text. If reset() is + called again at turn 2 start, the stale _text should be cleared + so no premature stop occurs on VAD stop. + """ + strategy = await self._create_strategy() + + stop_count = 0 + + @strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + nonlocal stop_count + stop_count += 1 + + # === Turn 1: S-T-E === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 1) + + # Reset after turn 1 (as controller would do at turn stop) + await strategy.reset() + + # === Late transcription arrives between turns === + await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="cat", timestamp="")) + + # Reset at turn 2 start (the fix: controller now resets stop strategies at turn start) + await strategy.reset() + + # === Turn 2: S-T-E (transcription arrives during turn) === + await strategy.process_frame(VADUserStartedSpeakingFrame()) + await strategy.process_frame( + TranscriptionFrame(text="How are you?", user_id="cat", timestamp="") + ) + await strategy.process_frame(VADUserStoppedSpeakingFrame()) + + # Wait for timeout — should get turn 2 stop with the real transcription + await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) + self.assertEqual(stop_count, 2) + class TestExternalUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): async def test_external_strategy(self): From d70df1d8b0cd1a11ce524617dc2a25a9266c06e3 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 17 Mar 2026 11:35:38 -0400 Subject: [PATCH 2/2] Add changelog for #4057 --- changelog/4057.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/4057.fixed.md diff --git a/changelog/4057.fixed.md b/changelog/4057.fixed.md new file mode 100644 index 0000000000..b63b8540e7 --- /dev/null +++ b/changelog/4057.fixed.md @@ -0,0 +1 @@ +- Fixed premature user turn stops caused by late transcriptions arriving between turns. A stale transcript from the previous turn could persist into the next turn and trigger a stop before the current turn's real transcript arrived. Stop strategies are now reset at both turn start and turn stop to prevent state from leaking across turn boundaries.