Skip to content

Commit 432870c

Browse files
authored
Merge pull request #3729 from pipecat-ai/filipi/elevenlabs_issue
TTS services fixes.
2 parents e065907 + 9569625 commit 432870c

8 files changed

Lines changed: 118 additions & 30 deletions

File tree

changelog/3729.fixed.2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed context ID reuse issue in `ElevenLabsTTSService`, `InworldTTSService`, `RimeTTSService`, `CartesiaTTSService`, `AsyncAITTSService`, and `PlayHTTTSService`. Services now properly reuse the same context ID across multiple `run_tts()` invocations within a single LLM turn, preventing context tracking issues and incorrect lifecycle signaling.

changelog/3729.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed word timestamp interleaving issue in `ElevenLabsTTSService` when processing multiple sentences within a single LLM turn.

src/pipecat/services/asyncai/tts.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import asyncio
1010
import base64
1111
import json
12+
import uuid
1213
from typing import AsyncGenerator, Optional
1314

1415
import aiohttp
@@ -270,6 +271,20 @@ def _get_websocket(self):
270271
return self._websocket
271272
raise Exception("Websocket not connected")
272273

274+
def create_context_id(self) -> str:
275+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
276+
277+
Returns:
278+
A unique string identifier for the TTS context.
279+
"""
280+
# If a context ID does not exist, create a new one.
281+
# If an ID exists, continue using the current ID.
282+
# When interruptions happen, user speech results in
283+
# an interruption, which resets the context ID.
284+
if not self._context_id:
285+
return str(uuid.uuid4())
286+
return self._context_id
287+
273288
async def flush_audio(self):
274289
"""Flush any pending audio."""
275290
if not self._context_id or not self._websocket:
@@ -379,13 +394,14 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
379394
await self._connect()
380395

381396
try:
382-
await self.start_ttfb_metrics()
383-
yield TTSStartedFrame(context_id=context_id)
384-
385397
if not self._context_id:
398+
await self.start_ttfb_metrics()
399+
yield TTSStartedFrame(context_id=context_id)
400+
386401
self._context_id = context_id
387-
if not self.audio_context_available(self._context_id):
388-
await self.create_audio_context(self._context_id)
402+
403+
if not self.audio_context_available(self._context_id):
404+
await self.create_audio_context(self._context_id)
389405

390406
msg = self._build_msg(text=text, force=True, context_id=self._context_id)
391407
await self._get_websocket().send(msg)

src/pipecat/services/cartesia/tts.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import base64
1010
import json
11+
import uuid
1112
import warnings
1213
from enum import Enum
1314
from typing import AsyncGenerator, List, Literal, Optional
@@ -539,6 +540,20 @@ async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameD
539540
await self._get_websocket().send(cancel_msg)
540541
self._context_id = None
541542

543+
def create_context_id(self) -> str:
544+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
545+
546+
Returns:
547+
A unique string identifier for the TTS context.
548+
"""
549+
# If a context ID does not exist, create a new one.
550+
# If an ID exists, continue using the current ID.
551+
# When interruptions happen, user speech results in
552+
# an interruption, which resets the context ID.
553+
if not self._context_id:
554+
return str(uuid.uuid4())
555+
return self._context_id
556+
542557
async def flush_audio(self):
543558
"""Flush any pending audio and finalize the current context."""
544559
if not self._context_id or not self._websocket:

src/pipecat/services/elevenlabs/tts.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import asyncio
1414
import base64
1515
import json
16+
import uuid
1617
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union
1718

1819
import aiohttp
@@ -680,6 +681,20 @@ async def _send_text(self, text: str):
680681
msg = {"text": text, "context_id": self._context_id}
681682
await self._websocket.send(json.dumps(msg))
682683

684+
def create_context_id(self) -> str:
685+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
686+
687+
Returns:
688+
A unique string identifier for the TTS context.
689+
"""
690+
# If a context ID does not exist, create a new one.
691+
# If an ID exists, continue using the current ID.
692+
# When interruptions happens, user speech results in
693+
# an interruption, which resets the context ID.
694+
if not self._context_id:
695+
return str(uuid.uuid4())
696+
return self._context_id
697+
683698
@traced_tts
684699
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
685700
"""Generate speech from text using ElevenLabs' streaming WebSocket API.
@@ -698,31 +713,28 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
698713
await self._connect()
699714

700715
try:
701-
await self.start_ttfb_metrics()
702-
yield TTSStartedFrame(context_id=context_id)
703-
self._cumulative_time = 0
704-
self._partial_word = ""
705-
self._partial_word_start_time = 0.0
706-
# If a context ID does not exist, use the provided one.
707-
# If an ID exists, that means the Pipeline doesn't allow
708-
# user interruptions, so continue using the current ID.
709-
# When interruptions are allowed, user speech results in
710-
# an interruption, which resets the context ID.
711716
if not self._context_id:
717+
await self.start_ttfb_metrics()
718+
yield TTSStartedFrame(context_id=context_id)
712719
self._context_id = context_id
713-
if not self.audio_context_available(self._context_id):
714-
await self.create_audio_context(self._context_id)
720+
self._cumulative_time = 0
721+
self._partial_word = ""
722+
self._partial_word_start_time = 0.0
715723

716-
# Initialize context with voice settings and pronunciation dictionaries
717-
msg = {"text": " ", "context_id": self._context_id}
718-
if self._voice_settings:
719-
msg["voice_settings"] = self._voice_settings
720-
if self._pronunciation_dictionary_locators:
721-
msg["pronunciation_dictionary_locators"] = [
722-
locator.model_dump() for locator in self._pronunciation_dictionary_locators
723-
]
724-
await self._websocket.send(json.dumps(msg))
725-
logger.trace(f"Created new context {self._context_id}")
724+
if not self.audio_context_available(self._context_id):
725+
await self.create_audio_context(self._context_id)
726+
727+
# Initialize context with voice settings and pronunciation dictionaries
728+
msg = {"text": " ", "context_id": self._context_id}
729+
if self._voice_settings:
730+
msg["voice_settings"] = self._voice_settings
731+
if self._pronunciation_dictionary_locators:
732+
msg["pronunciation_dictionary_locators"] = [
733+
locator.model_dump()
734+
for locator in self._pronunciation_dictionary_locators
735+
]
736+
await self._websocket.send(json.dumps(msg))
737+
logger.trace(f"Created new context {self._context_id}")
726738

727739
await self._send_text(text)
728740
await self.start_tts_usage_metrics(text)

src/pipecat/services/inworld/tts.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,20 @@ async def _send_close_context(self, context_id: str):
924924
msg = {"close_context": {}, "contextId": context_id}
925925
await self.send_with_retry(json.dumps(msg), self._report_error)
926926

927+
def create_context_id(self) -> str:
928+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
929+
930+
Returns:
931+
A unique string identifier for the TTS context.
932+
"""
933+
# If a context ID does not exist, create a new one.
934+
# If an ID exists, continue using the current ID.
935+
# When interruptions happen, user speech results in
936+
# an interruption, which resets the context ID.
937+
if not self._context_id:
938+
return str(uuid.uuid4())
939+
return self._context_id
940+
927941
@traced_tts
928942
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
929943
"""Generate TTS audio for the given text using the Inworld WebSocket TTS service.
@@ -942,10 +956,9 @@ async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, Non
942956
await self._connect()
943957

944958
try:
945-
await self.start_ttfb_metrics()
946-
yield TTSStartedFrame(context_id=context_id)
947-
948959
if not self._context_id:
960+
await self.start_ttfb_metrics()
961+
yield TTSStartedFrame(context_id=context_id)
949962
self._context_id = context_id
950963
logger.trace(f"{self}: Creating new context {self._context_id}")
951964
await self.create_audio_context(self._context_id)

src/pipecat/services/playht/tts.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io
1414
import json
1515
import struct
16+
import uuid
1617
import warnings
1718
from typing import AsyncGenerator, Optional
1819

@@ -323,6 +324,20 @@ def _get_websocket(self):
323324
return self._websocket
324325
raise Exception("Websocket not connected")
325326

327+
def create_context_id(self) -> str:
328+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
329+
330+
Returns:
331+
A unique string identifier for the TTS context.
332+
"""
333+
# If a context ID does not exist, create a new one.
334+
# If an ID exists, continue using the current ID.
335+
# When interruptions happen, user speech results in
336+
# an interruption, which resets the context ID.
337+
if not self._context_id:
338+
return str(uuid.uuid4())
339+
return self._context_id
340+
326341
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
327342
"""Handle interruption by stopping metrics and clearing request ID."""
328343
await super()._handle_interruption(frame, direction)

src/pipecat/services/rime/tts.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import base64
1414
import json
15+
import uuid
1516
from typing import Any, AsyncGenerator, Mapping, Optional
1617

1718
import aiohttp
@@ -369,6 +370,20 @@ def _calculate_word_times(self, words: list, starts: list, ends: list) -> list:
369370

370371
return word_pairs
371372

373+
def create_context_id(self) -> str:
374+
"""Generate a unique context ID for a TTS request in case we don't have one already in progress.
375+
376+
Returns:
377+
A unique string identifier for the TTS context.
378+
"""
379+
# If a context ID does not exist, create a new one.
380+
# If an ID exists, continue using the current ID.
381+
# When interruptions happen, user speech results in
382+
# an interruption, which resets the context ID.
383+
if not self._context_id:
384+
return str(uuid.uuid4())
385+
return self._context_id
386+
372387
async def flush_audio(self):
373388
"""Flush any pending audio synthesis."""
374389
if not self._context_id or not self._websocket:

0 commit comments

Comments
 (0)