Skip to content

Commit ad4c22c

Browse files
authored
Merge pull request pipecat-ai#3316 from pipecat-ai/aleix/llm-user-aggreagtor-enable-interruptions
turns(user): add support for enabling/disabling interruptions
2 parents e22a6c9 + 9fe99ed commit ad4c22c

15 files changed

Lines changed: 227 additions & 65 deletions

changelog/3297.deprecated.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,12 @@
1-
- `PipelineParams.allow_interruptions` is now deprecated, use `LLMUserAggregator`'s new parameter `user_mute_strategies` instead.
1+
- `PipelineParams.allow_interruptions` is now deprecated, use `LLMUserAggregator`'s new parameter `turn_start_strategies` instead. For example, to disable interruptions but still get user turns you can do:
2+
3+
```python
4+
context_aggregator = LLMContextAggregatorPair(
5+
context,
6+
user_params=LLMUserAggregatorParams(
7+
turn_start_strategies=TurnStartStrategies(
8+
user=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
9+
),
10+
),
11+
)
12+
```

changelog/3316.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added `enable_interruptions` constructor argument to all user turn strategies. This tells the `LLMUserAggregator` to push or not push an `InterruptionFrame`.

changelog/3316.other.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added `52-live-transcription.py` foundational example demonstrating live transcription and translation from English to Spanish. In this example, the bot is not interruptible: as the user continues speaking, English transcriptions are queued, and the bot continuously translates and speaks each queued sentence in Spanish without being interrupted by new user speech.
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#
2+
# Copyright (c) 2024–2025, Daily
3+
#
4+
# SPDX-License-Identifier: BSD 2-Clause License
5+
#
6+
7+
8+
import os
9+
10+
from dotenv import load_dotenv
11+
from loguru import logger
12+
13+
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
14+
from pipecat.audio.vad.silero import SileroVADAnalyzer
15+
from pipecat.audio.vad.vad_analyzer import VADParams
16+
from pipecat.pipeline.pipeline import Pipeline
17+
from pipecat.pipeline.runner import PipelineRunner
18+
from pipecat.pipeline.task import PipelineParams, PipelineTask
19+
from pipecat.processors.aggregators.llm_context import LLMContext
20+
from pipecat.processors.aggregators.llm_response_universal import (
21+
LLMContextAggregatorPair,
22+
LLMUserAggregatorParams,
23+
)
24+
from pipecat.runner.types import RunnerArguments
25+
from pipecat.runner.utils import create_transport
26+
from pipecat.services.cartesia.tts import CartesiaTTSService
27+
from pipecat.services.deepgram.stt import DeepgramSTTService
28+
from pipecat.services.openai.llm import OpenAILLMService
29+
from pipecat.transports.base_transport import BaseTransport, TransportParams
30+
from pipecat.transports.daily.transport import DailyParams
31+
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
32+
from pipecat.turns.bot import TurnAnalyzerBotTurnStartStrategy
33+
from pipecat.turns.turn_start_strategies import TurnStartStrategies
34+
from pipecat.turns.user import TranscriptionUserTurnStartStrategy
35+
36+
load_dotenv(override=True)
37+
38+
39+
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
40+
# instantiated. The function will be called when the desired transport gets
41+
# selected.
42+
transport_params = {
43+
"daily": lambda: DailyParams(
44+
audio_in_enabled=True,
45+
audio_out_enabled=True,
46+
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
47+
),
48+
"twilio": lambda: FastAPIWebsocketParams(
49+
audio_in_enabled=True,
50+
audio_out_enabled=True,
51+
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
52+
),
53+
"webrtc": lambda: TransportParams(
54+
audio_in_enabled=True,
55+
audio_out_enabled=True,
56+
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
57+
),
58+
}
59+
60+
61+
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
62+
logger.info(f"Starting bot")
63+
64+
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
65+
66+
tts = CartesiaTTSService(
67+
api_key=os.getenv("CARTESIA_API_KEY"),
68+
voice_id="d4db5fb9-f44b-4bd1-85fa-192e0f0d75f9", # Spanish-speaking Lady
69+
)
70+
71+
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
72+
73+
messages = [
74+
{
75+
"role": "system",
76+
"content": "You are a live translation assistant. Your sole purpose is to translate English text into Spanish. When you receive English text from the user, immediately translate it into natural, fluent Spanish. Do not add explanations, commentary, or extra information—only provide the Spanish translation of the text you receive.",
77+
},
78+
]
79+
80+
context = LLMContext(messages)
81+
82+
# We use the TranscriptionUserTurnStartStrategy to start a new user turn
83+
# every time a transcription is received. We disable interruptions, so the
84+
# user can continue speaking while the bot is transcribing, without
85+
# interrupting the bot.
86+
context_aggregator = LLMContextAggregatorPair(
87+
context,
88+
user_params=LLMUserAggregatorParams(
89+
turn_start_strategies=TurnStartStrategies(
90+
user=[TranscriptionUserTurnStartStrategy(enable_interruptions=False)],
91+
bot=[TurnAnalyzerBotTurnStartStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())],
92+
),
93+
),
94+
)
95+
96+
pipeline = Pipeline(
97+
[
98+
transport.input(), # Transport user input
99+
stt, # STT
100+
context_aggregator.user(), # User responses
101+
llm, # LLM
102+
tts, # TTS (bot will speak the chosen language)
103+
transport.output(), # Transport bot output
104+
context_aggregator.assistant(), # Assistant spoken responses
105+
]
106+
)
107+
108+
task = PipelineTask(
109+
pipeline,
110+
params=PipelineParams(
111+
enable_metrics=True,
112+
enable_usage_metrics=True,
113+
),
114+
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
115+
)
116+
117+
@transport.event_handler("on_client_connected")
118+
async def on_client_connected(transport, client):
119+
logger.info(f"Client connected")
120+
121+
@transport.event_handler("on_client_disconnected")
122+
async def on_client_disconnected(transport, client):
123+
logger.info(f"Client disconnected")
124+
await task.cancel()
125+
126+
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
127+
128+
await runner.run(task)
129+
130+
131+
async def bot(runner_args: RunnerArguments):
132+
"""Main bot entry point compatible with Pipecat Cloud."""
133+
transport = await create_transport(runner_args, transport_params)
134+
await run_bot(transport, runner_args)
135+
136+
137+
if __name__ == "__main__":
138+
from pipecat.runner.run import main
139+
140+
main()

src/pipecat/pipeline/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class PipelineParams(BaseModel):
107107
allow_interruptions: Whether to allow pipeline interruptions.
108108
109109
.. deprecated:: 0.0.99
110-
Use `LLMUserAggregator`'s new `user_mute_strategies` parameter instead.
110+
Use `LLMUserAggregator`'s new `turn_start_strategies` parameter instead.
111111
112112
audio_in_sample_rate: Input audio sample rate in Hz.
113113
audio_out_sample_rate: Output audio sample rate in Hz.

src/pipecat/processors/aggregators/llm_response_universal.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,8 +561,11 @@ async def _trigger_user_turn_start(
561561
await s.reset()
562562

563563
if params.enable_user_speaking_frames:
564-
# TODO(aleix): These frames should really come from the top of the pipeline.
564+
# TODO(aleix): This frame should really come from the top of the pipeline.
565565
await self.broadcast_frame(UserStartedSpeakingFrame)
566+
567+
if params.enable_interruptions:
568+
# TODO(aleix): This frame should really come from the top of the pipeline.
566569
await self.broadcast_frame(InterruptionFrame)
567570

568571
await self._call_event_handler("on_user_turn_started", strategy)

src/pipecat/turns/bot/external_bot_turn_start_strategy.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy):
2929
3030
"""
3131

32-
def __init__(self, *, timeout: float = 0.5):
32+
def __init__(self, *, timeout: float = 0.5, **kwargs):
3333
"""Initialize the external bot turn start strategy.
3434
3535
Args:
3636
timeout: A short delay used internally to handle consecutive or
3737
slightly delayed transcriptions.
38+
**kwargs: Additional keyword arguments.
3839
"""
39-
super().__init__(enable_user_speaking_frames=False)
40+
super().__init__(enable_user_speaking_frames=False, **kwargs)
4041
self._timeout = timeout
4142
self._text = ""
4243
self._user_speaking = False

src/pipecat/turns/bot/transcription_bot_turn_start_strategy.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ class TranscriptionBotTurnStartStrategy(BaseBotTurnStartStrategy):
2828
multiple or delayed transcription frames gracefully.
2929
"""
3030

31-
def __init__(self, *, timeout: float = 0.5):
31+
def __init__(self, *, timeout: float = 0.5, **kwargs):
3232
"""Initialize the transcription-based bot turn start strategy.
3333
3434
Args:
3535
timeout: A short delay used internally to handle consecutive or
3636
slightly delayed transcriptions.
37+
**kwargs: Additional keyword arguments.
3738
"""
38-
super().__init__()
39+
super().__init__(**kwargs)
3940
self._timeout = timeout
4041
self._text = ""
4142
self._vad_user_speaking = False

src/pipecat/turns/bot/turn_analyzer_bot_turn_start_strategy.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ class TurnAnalyzerBotTurnStartStrategy(BaseBotTurnStartStrategy):
3535
3636
"""
3737

38-
def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, timeout: float = 0.5):
38+
def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, timeout: float = 0.5, **kwargs):
3939
"""Initialize the bot turn start strategy.
4040
4141
Args:
4242
turn_analyzer: The turn detection analyzer instance to detect end of user turn.
4343
timeout: Short delay used internally to handle frame timing and event triggering.
44+
**kwargs: Additional keyword arguments.
4445
"""
45-
super().__init__()
46+
super().__init__(**kwargs)
4647
self._turn_analyzer = turn_analyzer
4748
self._timeout = timeout
4849
self._text = ""

src/pipecat/turns/user/base_user_turn_start_strategy.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class UserTurnStartedParams:
3232
3333
"""
3434

35+
enable_interruptions: bool
3536
enable_user_speaking_frames: bool
3637

3738

@@ -49,18 +50,27 @@ class BaseUserTurnStartStrategy(BaseObject):
4950
- `on_user_turn_started`: Signals that a user turn has started.
5051
"""
5152

52-
def __init__(self, *, enable_user_speaking_frames: bool = True, **kwargs):
53+
def __init__(
54+
self,
55+
*,
56+
enable_interruptions: bool = True,
57+
enable_user_speaking_frames: bool = True,
58+
**kwargs,
59+
):
5360
"""Initialize the base user turn start strategy.
5461
5562
Args:
56-
enable_user_speaking_frames: If True, the aggregator will emit frames
57-
indicating when the user starts speaking, as well as interruption
58-
frames. This is enabled by default, but you may want to disable it
59-
if another component (e.g., an STT service) is already generating
60-
these frames.
63+
enable_interruptions: If True, the user aggregator will emit an
64+
interruption frame when the user turn starts.
65+
enable_user_speaking_frames: If True, the user aggregator will emit
66+
frames indicating when the user starts speaking, as well as
67+
interruption frames. This is enabled by default, but you may want
68+
to disable it if another component (e.g., an STT service) is
69+
already generating these frames.
6170
**kwargs: Additional keyword arguments.
6271
"""
6372
super().__init__(**kwargs)
73+
self._enable_interruptions = enable_interruptions
6474
self._enable_user_speaking_frames = enable_user_speaking_frames
6575
self._task_manager: Optional[BaseTaskManager] = None
6676
self._register_event_handler("on_push_frame", sync=True)
@@ -123,5 +133,8 @@ async def trigger_user_turn_started(self):
123133
"""Trigger the `on_user_turn_started` event."""
124134
await self._call_event_handler(
125135
"on_user_turn_started",
126-
UserTurnStartedParams(enable_user_speaking_frames=self._enable_user_speaking_frames),
136+
UserTurnStartedParams(
137+
enable_interruptions=self._enable_interruptions,
138+
enable_user_speaking_frames=self._enable_user_speaking_frames,
139+
),
127140
)

0 commit comments

Comments
 (0)