Skip to content

Commit e065907

Browse files
authored
Merge pull request #3718 from pipecat-ai/filipi/bot_started_speaking
Fixing an issue in RTVI where we were sometimes receiving bot output messages before the bot started speaking.
2 parents b7a5ca3 + ed7fde3 commit e065907

2 files changed

Lines changed: 21 additions & 3 deletions

File tree

changelog/3718.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed a race condition in `RTVIObserver` where bot output messages could be sent before the bot-started-speaking event.

src/pipecat/processors/frameworks/rtvi.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,10 @@ def __init__(
11161116
self._last_user_audio_level = 0
11171117
self._last_bot_audio_level = 0
11181118

1119+
# Track bot speaking state for queuing aggregated text frames
1120+
self._bot_is_speaking = False
1121+
self._queued_aggregated_text_frames: List[AggregatedTextFrame] = []
1122+
11191123
if self._params.system_logs_enabled:
11201124
self._system_logger_id = logger.add(self._logger_sink)
11211125

@@ -1384,17 +1388,30 @@ async def _handle_user_mute(self, frame: Frame):
13841388

13851389
async def _handle_bot_speaking(self, frame: Frame):
13861390
"""Handle bot speaking event frames."""
1387-
message = None
13881391
if isinstance(frame, BotStartedSpeakingFrame):
13891392
message = RTVIBotStartedSpeakingMessage()
1393+
await self.send_rtvi_message(message)
1394+
# Flush any queued aggregated text frames
1395+
for queued_frame in self._queued_aggregated_text_frames:
1396+
await self._send_aggregated_llm_text(queued_frame)
1397+
self._queued_aggregated_text_frames.clear()
1398+
self._bot_is_speaking = True
13901399
elif isinstance(frame, BotStoppedSpeakingFrame):
13911400
message = RTVIBotStoppedSpeakingMessage()
1392-
1393-
if message:
13941401
await self.send_rtvi_message(message)
1402+
self._bot_is_speaking = False
13951403

13961404
async def _handle_aggregated_llm_text(self, frame: AggregatedTextFrame):
13971405
"""Handle aggregated LLM text output frames."""
1406+
if self._bot_is_speaking:
1407+
# Bot has already started speaking, send directly
1408+
await self._send_aggregated_llm_text(frame)
1409+
else:
1410+
# Bot hasn't started speaking yet, queue the frame
1411+
self._queued_aggregated_text_frames.append(frame)
1412+
1413+
async def _send_aggregated_llm_text(self, frame: AggregatedTextFrame):
1414+
"""Send aggregated LLM text messages."""
13981415
# Skip certain aggregator types if configured to do so.
13991416
if (
14001417
self._params.skip_aggregator_types

0 commit comments

Comments
 (0)