Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3729.fixed.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed context ID reuse issue in `ElevenLabsTTSService`, `InworldTTSService`, `RimeTTSService`, `CartesiaTTSService`, `AsyncAITTSService`, and `PlayHTTTSService`. Services now properly reuse the same context ID across multiple `run_tts()` invocations within a single LLM turn, preventing context tracking issues and incorrect lifecycle signaling.
1 change: 1 addition & 0 deletions changelog/3729.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed word timestamp interleaving issue in `ElevenLabsTTSService` when processing multiple sentences within a single LLM turn.
26 changes: 21 additions & 5 deletions src/pipecat/services/asyncai/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import base64
import json
import uuid
from typing import AsyncGenerator, Optional

import aiohttp
Expand Down Expand Up @@ -270,6 +271,20 @@ def _get_websocket(self):
return self._websocket
raise Exception("Websocket not connected")

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happen, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
return str(uuid.uuid4())
return self._context_id

async def flush_audio(self):
"""Flush any pending audio."""
if not self._context_id or not self._websocket:
Expand Down Expand Up @@ -379,13 +394,14 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
await self._connect()

try:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)

if not self._context_id:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)

self._context_id = context_id
if not self.audio_context_available(self._context_id):
await self.create_audio_context(self._context_id)

if not self.audio_context_available(self._context_id):
await self.create_audio_context(self._context_id)

msg = self._build_msg(text=text, force=True, context_id=self._context_id)
await self._get_websocket().send(msg)
Expand Down
15 changes: 15 additions & 0 deletions src/pipecat/services/cartesia/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import base64
import json
import uuid
import warnings
from enum import Enum
from typing import AsyncGenerator, List, Literal, Optional
Expand Down Expand Up @@ -539,6 +540,20 @@ async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameD
await self._get_websocket().send(cancel_msg)
self._context_id = None

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happen, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
return str(uuid.uuid4())
return self._context_id

async def flush_audio(self):
"""Flush any pending audio and finalize the current context."""
if not self._context_id or not self._websocket:
Expand Down
56 changes: 34 additions & 22 deletions src/pipecat/services/elevenlabs/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import asyncio
import base64
import json
import uuid
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union

import aiohttp
Expand Down Expand Up @@ -680,6 +681,20 @@ async def _send_text(self, text: str):
msg = {"text": text, "context_id": self._context_id}
await self._websocket.send(json.dumps(msg))

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happens, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just move this logic to the base class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think we should for now. Most of the TTS services don’t save the context_id, and we already have a reference to it in each TTS service.

So maybe, in a follow up refactor, we can check which services are currently saving the context_id and refactor them. But for now, I would keep it like this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markbackman, I’ve added this to our 1.0 wishlist, along with the other TTS improvements, so we can review how the classes are handling self._context_id. 👍

return str(uuid.uuid4())
return self._context_id

@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using ElevenLabs' streaming WebSocket API.
Expand All @@ -698,31 +713,28 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
await self._connect()

try:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0
# If a context ID does not exist, use the provided one.
# If an ID exists, that means the Pipeline doesn't allow
# user interruptions, so continue using the current ID.
# When interruptions are allowed, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._context_id = context_id
if not self.audio_context_available(self._context_id):
await self.create_audio_context(self._context_id)
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0

# Initialize context with voice settings and pronunciation dictionaries
msg = {"text": " ", "context_id": self._context_id}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
msg["pronunciation_dictionary_locators"] = [
locator.model_dump() for locator in self._pronunciation_dictionary_locators
]
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {self._context_id}")
if not self.audio_context_available(self._context_id):
await self.create_audio_context(self._context_id)

# Initialize context with voice settings and pronunciation dictionaries
msg = {"text": " ", "context_id": self._context_id}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
msg["pronunciation_dictionary_locators"] = [
locator.model_dump()
for locator in self._pronunciation_dictionary_locators
]
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {self._context_id}")

await self._send_text(text)
await self.start_tts_usage_metrics(text)
Expand Down
19 changes: 16 additions & 3 deletions src/pipecat/services/inworld/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,20 @@ async def _send_close_context(self, context_id: str):
msg = {"close_context": {}, "contextId": context_id}
await self.send_with_retry(json.dumps(msg), self._report_error)

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happen, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
return str(uuid.uuid4())
return self._context_id

@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
"""Generate TTS audio for the given text using the Inworld WebSocket TTS service.
Expand All @@ -942,10 +956,9 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
await self._connect()

try:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)

if not self._context_id:
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._context_id = context_id
logger.trace(f"{self}: Creating new context {self._context_id}")
await self.create_audio_context(self._context_id)
Expand Down
15 changes: 15 additions & 0 deletions src/pipecat/services/playht/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io
import json
import struct
import uuid
import warnings
from typing import AsyncGenerator, Optional

Expand Down Expand Up @@ -323,6 +324,20 @@ def _get_websocket(self):
return self._websocket
raise Exception("Websocket not connected")

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happen, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
return str(uuid.uuid4())
return self._context_id

async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by stopping metrics and clearing request ID."""
await super()._handle_interruption(frame, direction)
Expand Down
15 changes: 15 additions & 0 deletions src/pipecat/services/rime/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import base64
import json
import uuid
from typing import Any, AsyncGenerator, Mapping, Optional

import aiohttp
Expand Down Expand Up @@ -369,6 +370,20 @@ def _calculate_word_times(self, words: list, starts: list, ends: list) -> list:

return word_pairs

def create_context_id(self) -> str:
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.

Returns:
A unique string identifier for the TTS context.
"""
# If a context ID does not exist, create a new one.
# If an ID exists, continue using the current ID.
# When interruptions happen, user speech results in
# an interruption, which resets the context ID.
if not self._context_id:
return str(uuid.uuid4())
return self._context_id

async def flush_audio(self):
"""Flush any pending audio synthesis."""
if not self._context_id or not self._websocket:
Expand Down