Skip to content

Commit aa2589d

Browse files
committed
Update examples to use transcription events from context aggregators
1 parent d0f2271 commit aa2589d

6 files changed

Lines changed: 69 additions & 76 deletions

File tree

examples/foundational/19-openai-realtime.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
from pipecat.adapters.schemas.function_schema import FunctionSchema
1616
from pipecat.adapters.schemas.tools_schema import ToolsSchema
1717
from pipecat.audio.vad.silero import SileroVADAnalyzer
18-
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame, TranscriptionMessage
18+
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
1919
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
2020
from pipecat.pipeline.pipeline import Pipeline
2121
from pipecat.pipeline.runner import PipelineRunner
2222
from pipecat.pipeline.task import PipelineParams, PipelineTask
2323
from pipecat.processors.aggregators.llm_context import LLMContext
24-
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
25-
from pipecat.processors.transcript_processor import TranscriptProcessor
24+
from pipecat.processors.aggregators.llm_response_universal import (
25+
AssistantTurnStoppedMessage,
26+
LLMContextAggregatorPair,
27+
UserTurnStoppedMessage,
28+
)
2629
from pipecat.runner.types import RunnerArguments
2730
from pipecat.runner.utils import create_transport
2831
from pipecat.services.llm_service import FunctionCallParams
@@ -177,8 +180,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
177180
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
178181
llm.register_function("get_news", get_news)
179182

180-
transcript = TranscriptProcessor()
181-
182183
# Create a standard OpenAI LLM context object using the normal messages format. The
183184
# OpenAIRealtimeLLMService will convert this internally to messages that the
184185
# openai WebSocket API can understand.
@@ -189,15 +190,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
189190

190191
context_aggregator = LLMContextAggregatorPair(context)
191192

193+
user_aggregator = context_aggregator.user()
194+
assistant_aggregator = context_aggregator.assistant()
195+
192196
pipeline = Pipeline(
193197
[
194198
transport.input(), # Transport user input
195-
context_aggregator.user(),
196-
transcript.user(), # LLM pushes TranscriptionFrames upstream
199+
user_aggregator,
197200
llm, # LLM
198201
transport.output(), # Transport bot output
199-
transcript.assistant(), # After the transcript output, to time with the audio output
200-
context_aggregator.assistant(),
202+
assistant_aggregator,
201203
]
202204
)
203205

@@ -238,14 +240,18 @@ async def on_client_disconnected(transport, client):
238240
logger.info(f"Client disconnected")
239241
await task.cancel()
240242

241-
# Register event handler for transcript updates
242-
@transcript.event_handler("on_transcript_update")
243-
async def on_transcript_update(processor, frame):
244-
for msg in frame.messages:
245-
if isinstance(msg, TranscriptionMessage):
246-
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
247-
line = f"{timestamp}{msg.role}: {msg.content}"
248-
logger.info(f"Transcript: {line}")
243+
# Log transcript updates
244+
@user_aggregator.event_handler("on_user_turn_stopped")
245+
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
246+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
247+
line = f"{timestamp}user: {message.content}"
248+
logger.info(f"Transcript: {line}")
249+
250+
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
251+
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
252+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
253+
line = f"{timestamp}assistant: {message.content}"
254+
logger.info(f"Transcript: {line}")
249255

250256
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
251257

examples/foundational/19b-openai-realtime-beta-text.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
from pipecat.adapters.schemas.function_schema import FunctionSchema
1515
from pipecat.adapters.schemas.tools_schema import ToolsSchema
1616
from pipecat.audio.vad.silero import SileroVADAnalyzer
17-
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
17+
from pipecat.frames.frames import LLMRunFrame
1818
from pipecat.pipeline.pipeline import Pipeline
1919
from pipecat.pipeline.runner import PipelineRunner
2020
from pipecat.pipeline.task import PipelineParams, PipelineTask
2121
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
22-
from pipecat.processors.transcript_processor import TranscriptProcessor
2322
from pipecat.runner.types import RunnerArguments
2423
from pipecat.runner.utils import create_transport
2524
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -157,8 +156,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
157156
llm.register_function("get_current_weather", fetch_weather_from_api)
158157
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
159158

160-
transcript = TranscriptProcessor()
161-
162159
# Create a standard OpenAI LLM context object using the normal messages format. The
163160
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
164161
# openai WebSocket API can understand.
@@ -175,9 +172,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
175172
context_aggregator.user(),
176173
llm, # LLM
177174
tts, # TTS
178-
transcript.user(), # Placed after the LLM, as LLM pushes TranscriptionFrames downstream
179175
transport.output(), # Transport bot output
180-
transcript.assistant(), # After the transcript output, to time with the audio output
181176
context_aggregator.assistant(),
182177
]
183178
)
@@ -202,15 +197,6 @@ async def on_client_disconnected(transport, client):
202197
logger.info(f"Client disconnected")
203198
await task.cancel()
204199

205-
# Register event handler for transcript updates
206-
@transcript.event_handler("on_transcript_update")
207-
async def on_transcript_update(processor, frame):
208-
for msg in frame.messages:
209-
if isinstance(msg, TranscriptionMessage):
210-
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
211-
line = f"{timestamp}{msg.role}: {msg.content}"
212-
logger.info(f"Transcript: {line}")
213-
214200
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
215201

216202
await runner.run(task)

examples/foundational/19b-openai-realtime-text.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
from pipecat.adapters.schemas.function_schema import FunctionSchema
1515
from pipecat.adapters.schemas.tools_schema import ToolsSchema
1616
from pipecat.audio.vad.silero import SileroVADAnalyzer
17-
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
17+
from pipecat.frames.frames import LLMRunFrame
1818
from pipecat.pipeline.pipeline import Pipeline
1919
from pipecat.pipeline.runner import PipelineRunner
2020
from pipecat.pipeline.task import PipelineParams, PipelineTask
2121
from pipecat.processors.aggregators.llm_context import LLMContext
2222
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
23-
from pipecat.processors.transcript_processor import TranscriptProcessor
2423
from pipecat.runner.types import RunnerArguments
2524
from pipecat.runner.utils import create_transport
2625
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -164,8 +163,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
164163
llm.register_function("get_current_weather", fetch_weather_from_api)
165164
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
166165

167-
transcript = TranscriptProcessor()
168-
169166
# Create a standard OpenAI LLM context object using the normal messages format. The
170167
# OpenAIRealtimeLLMService will convert this internally to messages that the
171168
# openai WebSocket API can understand.
@@ -180,11 +177,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
180177
[
181178
transport.input(), # Transport user input
182179
context_aggregator.user(),
183-
transcript.user(), # LLM pushes TranscriptionFrames upstream
184180
llm, # LLM
185181
tts, # TTS
186182
transport.output(), # Transport bot output
187-
transcript.assistant(), # After the transcript output, to time with the audio output
188183
context_aggregator.assistant(),
189184
]
190185
)
@@ -209,15 +204,6 @@ async def on_client_disconnected(transport, client):
209204
logger.info(f"Client disconnected")
210205
await task.cancel()
211206

212-
# Register event handler for transcript updates
213-
@transcript.event_handler("on_transcript_update")
214-
async def on_transcript_update(processor, frame):
215-
for msg in frame.messages:
216-
if isinstance(msg, TranscriptionMessage):
217-
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
218-
line = f"{timestamp}{msg.role}: {msg.content}"
219-
logger.info(f"Transcript: {line}")
220-
221207
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
222208

223209
await runner.run(task)

examples/foundational/40-aws-nova-sonic.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
from pipecat.pipeline.runner import PipelineRunner
2222
from pipecat.pipeline.task import PipelineParams, PipelineTask
2323
from pipecat.processors.aggregators.llm_context import LLMContext
24-
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
24+
from pipecat.processors.aggregators.llm_response_universal import (
25+
AssistantTurnStoppedMessage,
26+
LLMContextAggregatorPair,
27+
UserTurnStoppedMessage,
28+
)
2529
from pipecat.runner.types import RunnerArguments
2630
from pipecat.runner.utils import create_transport
2731
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
@@ -154,14 +158,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
154158
)
155159
context_aggregator = LLMContextAggregatorPair(context)
156160

161+
user_aggregator = context_aggregator.user()
162+
assistant_aggregator = context_aggregator.assistant()
163+
157164
# Build the pipeline
158165
pipeline = Pipeline(
159166
[
160167
transport.input(),
161-
context_aggregator.user(),
168+
user_aggregator,
162169
llm,
163170
transport.output(),
164-
context_aggregator.assistant(),
171+
assistant_aggregator,
165172
]
166173
)
167174

@@ -192,6 +199,18 @@ async def on_client_disconnected(transport, client):
192199
logger.info(f"Client disconnected")
193200
await task.cancel()
194201

202+
@user_aggregator.event_handler("on_user_turn_stopped")
203+
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
204+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
205+
line = f"{timestamp}user: {message.content}"
206+
logger.info(f"Transcript: {line}")
207+
208+
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
209+
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
210+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
211+
line = f"{timestamp}assistant: {message.content}"
212+
logger.info(f"Transcript: {line}")
213+
195214
# Run the pipeline
196215
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
197216
await runner.run(task)

examples/foundational/42-interruption-config.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from pipecat.audio.vad.silero import SileroVADAnalyzer
1414
from pipecat.audio.vad.vad_analyzer import VADParams
1515
from pipecat.frames.frames import LLMRunFrame
16+
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
1617
from pipecat.pipeline.pipeline import Pipeline
1718
from pipecat.pipeline.runner import PipelineRunner
1819
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -21,7 +22,6 @@
2122
LLMContextAggregatorPair,
2223
LLMUserAggregatorParams,
2324
)
24-
from pipecat.processors.transcript_processor import TranscriptProcessor
2525
from pipecat.runner.types import RunnerArguments
2626
from pipecat.runner.utils import create_transport
2727
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -36,6 +36,7 @@
3636

3737
load_dotenv(override=True)
3838

39+
3940
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
4041
# instantiated. The function will be called when the desired transport gets
4142
# selected.
@@ -70,8 +71,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
7071

7172
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
7273

73-
transcript = TranscriptProcessor()
74-
7574
messages = [
7675
{
7776
"role": "system",
@@ -94,7 +93,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
9493
[
9594
transport.input(), # Transport user input
9695
stt,
97-
transcript.user(), # User transcripts
9896
context_aggregator.user(), # User responses
9997
llm, # LLM
10098
tts, # TTS
@@ -110,6 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
110108
enable_usage_metrics=True,
111109
),
112110
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
111+
observers=[TranscriptionLogObserver()],
113112
)
114113

115114
@transport.event_handler("on_client_connected")
@@ -124,12 +123,6 @@ async def on_client_disconnected(transport, client):
124123
logger.info(f"Client disconnected")
125124
await task.cancel()
126125

127-
# Register event handler for transcript updates
128-
@transcript.event_handler("on_transcript_update")
129-
async def on_transcript_update(processor, frame):
130-
for message in frame.messages:
131-
logger.info(f"Transcription [{message.role}]: {message.content}")
132-
133126
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
134127

135128
await runner.run(task)

examples/foundational/51-grok-realtime.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
# Note: Grok has built-in server-side VAD, so we don't need local VAD
3838
# from pipecat.audio.vad.silero import SileroVADAnalyzer
39-
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage
39+
from pipecat.frames.frames import LLMRunFrame
4040
from pipecat.observers.loggers.transcription_log_observer import (
4141
TranscriptionLogObserver,
4242
)
@@ -45,9 +45,10 @@
4545
from pipecat.pipeline.task import PipelineParams, PipelineTask
4646
from pipecat.processors.aggregators.llm_context import LLMContext
4747
from pipecat.processors.aggregators.llm_response_universal import (
48+
AssistantTurnStoppedMessage,
4849
LLMContextAggregatorPair,
50+
UserTurnStoppedMessage,
4951
)
50-
from pipecat.processors.transcript_processor import TranscriptProcessor
5152
from pipecat.runner.types import RunnerArguments
5253
from pipecat.runner.utils import create_transport
5354
from pipecat.services.grok.realtime.events import (
@@ -208,9 +209,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
208209
llm.register_function("get_current_time", get_current_time)
209210
llm.register_function("get_restaurant_recommendation", get_restaurant_recommendation)
210211

211-
# Create transcript processor for logging
212-
transcript = TranscriptProcessor()
213-
214212
# Create context with initial message and tools
215213
context = LLMContext(
216214
[{"role": "user", "content": "Say hello and introduce yourself!"}],
@@ -219,18 +217,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
219217

220218
context_aggregator = LLMContextAggregatorPair(context)
221219

220+
user_aggregator = context_aggregator.user()
221+
assistant_aggregator = context_aggregator.assistant()
222+
222223
# Build the pipeline
223224
# Note: In realtime mode, transcription comes from Grok (upstream),
224225
# so transcript.user() goes BEFORE llm
225226
pipeline = Pipeline(
226227
[
227228
transport.input(), # Transport user input (audio)
228-
context_aggregator.user(),
229-
transcript.user(), # Transcription from Grok goes upstream
229+
user_aggregator,
230230
llm, # Grok Realtime LLM (handles STT + LLM + TTS)
231231
transport.output(), # Transport bot output (audio)
232-
transcript.assistant(), # Log assistant speech
233-
context_aggregator.assistant(),
232+
assistant_aggregator,
234233
]
235234
)
236235

@@ -256,13 +255,17 @@ async def on_client_disconnected(transport, client):
256255
await task.cancel()
257256

258257
# Log transcript updates
259-
@transcript.event_handler("on_transcript_update")
260-
async def on_transcript_update(processor, frame):
261-
for msg in frame.messages:
262-
if isinstance(msg, TranscriptionMessage):
263-
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
264-
line = f"{timestamp}{msg.role}: {msg.content}"
265-
logger.info(f"Transcript: {line}")
258+
@user_aggregator.event_handler("on_user_turn_stopped")
259+
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
260+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
261+
line = f"{timestamp}user: {message.content}"
262+
logger.info(f"Transcript: {line}")
263+
264+
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
265+
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
266+
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
267+
line = f"{timestamp}assistant: {message.content}"
268+
logger.info(f"Transcript: {line}")
266269

267270
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
268271

0 commit comments

Comments
 (0)