Conversation
- Fix speaker diarization: Add field alias for speaker_label → speaker mapping in TurnMessage model - Add warning for non-optimal min_end_of_turn_silence_when_confident values (recommends 100ms for best latency) - Improve max_turn_silence override warning message clarity - Update custom prompt warning (remove 88% accuracy claim) - Add comprehensive logging for debugging: - Log final connection params after modifications - Log WebSocket URL and parsed parameters - Log speaker field in transcripts - Log text sent to LLM with speaker formatting - Support dynamic configuration updates via STTUpdateSettingsFrame: - keyterms_prompt (when AssemblyAI API supports it) - prompt - max_turn_silence - min_end_of_turn_silence_when_confident
There was a problem hiding this comment.
This generally looks good.
You might want to add another foundational example (07 series) showing how to set up AssemblyAISTTService using the u3-rt-pro model, where it's acting as user turn controller.
The key is that you set up the LLMContextAggregatorPair as:
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(user_turn_strategies=ExternalUserTurnStrategies()),
)
Using ExternalUserTurnStrategies tells the aggregator to defer turn control to the STT (or external processor), which in this case is AssemblyAISTTService.
Reach out on Slack if you have any questions.
Also, don't forget two additional steps:
- Submit a changelog: https://github.com/pipecat-ai/pipecat/blob/main/CONTRIBUTING.md#changelog-entries
- Lint the code (
uv run scripts/fix-ruff.shor install the pre-commit hook:uv run pre-commit install)
| import json | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, AsyncGenerator, Dict, Optional | ||
| from typing import Any, AsyncGenerator, Dict, Mapping, Optional |
There was a problem hiding this comment.
Nit: unused, remove.
| from typing import Any, AsyncGenerator, Dict, Mapping, Optional | |
| from typing import Any, AsyncGenerator, Dict, Optional |
| self._vad_speaking = False | ||
|
|
||
| # Log final connection params after any modifications | ||
| logger.info(f"{self} Final connection params being sent to AssemblyAI:") |
There was a problem hiding this comment.
Out of curiosity, why are lines L215-217 info logs?
There was a problem hiding this comment.
Hey Mark! I'll remove these, I was using these for debugging. Sorry about that!
| logger.info(f" max_turn_silence: {self._settings.connection_params.max_turn_silence}") | ||
|
|
||
| # Warn if min_end_of_turn_silence_when_confident is not 100ms | ||
| if self._settings.connection_params.min_end_of_turn_silence_when_confident != 100: |
There was a problem hiding this comment.
Should this ever be set to anything other than 100ms? If so, do you have docs you can link to in order to educate the user? (Maybe in docstrings?)
If it should never be set to anything other than 100ms, maybe remove?
There was a problem hiding this comment.
Hi Mark! Yes, setting to something higher than 100ms has the potential to improve accuracy for people who take larger gaps in speech. From our testing, 100ms is the optimal value, but we want to leave the parameter configurable in case anyone would like to change it for their use case.
| - else → InterimTranscriptionFrame | ||
| """ | ||
| # Log transcript details | ||
| logger.info(f"{self} ===== TRANSCRIPT RECEIVED =====") |
There was a problem hiding this comment.
Remove the info logs. If users want this info, they can use an observer to access the TranscriptionFrame.
There was a problem hiding this comment.
100% will remove!
| if is_final_turn: | ||
| finalize_confirmed = bool(message.turn_is_formatted) | ||
| if finalize_confirmed: | ||
| self.confirm_finalize() |
There was a problem hiding this comment.
To use confirm_finalize(), you need to also finalize the audio being sent using the request_finalize() method.
This pattern is appropriate when you have a message you send to Assembly to tell the service to finalize, which is when you call request_finalize(). Then, you have metadata in the transcript data returned from Assembly that indicate that the audio from the finalize request is received, which is when you call confirm_finalize(). Closing the loop in this way gives Pipecat confidence that the user's audio has been fully transcribed and it can proceed to the next processing step. This process is particularly important for services that emit multiple finals from user audio.
Not all services work like this though; others guarantee that an audio input equals an audio output (e.g. ElevenLabs with their commit process). Or, other stream tokens including an end token (e.g. Soniox, Speechmatics). Those services just finalize without the methods calls (e.g. set TranscriptionFrame.finalized=True).
You know best how your service works, so please follow one of these patterns. Or, ask questions if you're still unclear.
There was a problem hiding this comment.
Hey Mark, understood, basically confirm_finalize is used on transcript received but request_finalize is used when we force end of transcription. I'll make that update.
|
|
||
| logger.debug(f"{self} Processing SpeechStarted in STT mode") | ||
| await self.start_processing_metrics() | ||
| await self.broadcast_frame(UserStartedSpeakingFrame) |
There was a problem hiding this comment.
In which scenarios is the SpeechStartedMessage message received?
There was a problem hiding this comment.
Also, make sure there's only one broadcast of UserStartedSpeakingFrame and the paired UserStoppedSpeakingFrame for each case where Assembly is handling the role of "user turn controller" (e.g. emitting the User Speaking Frames).
There was a problem hiding this comment.
SpeechStarted is only received in STT turn detection mode (vad_force_turn_endpoint=False) with u3-rt-pro only. It arrives before any transcripts. The transcript-based fallback was for older streaming models, but since those models aren't supported in STT mode (validated in init), I removed it to ensure clean pairing of UserStarted/StoppedSpeakingFrame.
| logger.debug(f"{self} Transcript received in STT mode (_user_speaking={self._user_speaking})") | ||
| if not self._user_speaking: | ||
| logger.warning(f"{self} Transcript arrived before SpeechStarted, broadcasting fallback UserStartedSpeakingFrame") | ||
| await self.broadcast_frame(UserStartedSpeakingFrame) |
There was a problem hiding this comment.
I see UserStartedSpeakingFrame broadcasted again here. Is there any risk of double broadcasting UserStartedSpeakingFrame?
There was a problem hiding this comment.
This should also come with:
if self._should_interrupt:
await self.push_interruption_task_frame_and_wait()
The role of the User Turn controller is to:
- On speech started:
- Broadcast
UserStartedSpeakingFrame - Call
push_interruption_task_frame_and_wait()
- Broadcast
- On speech stopped:
- Broadcast
UserStoppedSpeakingFrame
- Broadcast
There was a problem hiding this comment.
Since u3-rt-pro guarantees SpeechStarted arrives before transcripts, I removed the fallback entirely. Now the pairing is clean.
Codecov Report❌ Patch coverage is
... and 46 files with indirect coverage changes 🚀 New features to boost your workflow:
|
…zed flag in STT mode - Add request_finalize() before sending ForceEndpoint in Pipecat mode - Keep confirm_finalize() when receiving formatted finals in Pipecat mode - Remove confirm_finalize() from STT mode (use finalized=True instead) This follows Pipecat's two-step finalization pattern where request_finalize() is called when sending a finalize request to the STT service, and confirm_finalize() is called when receiving confirmation back.
u3-rt-pro guarantees SpeechStarted is always sent before transcripts, so the fallback UserStartedSpeakingFrame broadcast is never needed. This ensures clean pairing of UserStarted/StoppedSpeakingFrame: - Start: Always from _handle_speech_started - Stop: Always from _handle_transcription on final turn
|
@kompfner it could be worth you looking at this from the perspective of the STTSettings. |
- Remove unused Mapping import - Remove info logs at initialization (connection params) - Remove info logs in _handle_transcription (transcript details, text sent to LLM) - Remove info logs in _build_ws_url (WebSocket URL and params) - Keep debug logs (less verbose, appropriate for development)
The request_finalize() method in STTService is synchronous (sets a flag), but was being called with await in the VAD turn endpoint handling code. This caused "object NoneType can't be used in 'await' expression" errors. Also includes automatic formatting improvements from ruff.
- 07o-interruptible-assemblyai.py: Basic example using Pipecat VAD mode - 07o-interruptible-assemblyai-stt.py: Advanced example using STT-controlled turn detection with comprehensive documentation on u3-rt-pro features (turn detection tuning, prompt-based enhancement, speaker diarization)
…nt to min_turn_silence - Add "beta feature" note to custom prompt warning - Rename min_end_of_turn_silence_when_confident parameter to min_turn_silence across all AssemblyAI code - Update documentation, examples, and test files to use new parameter name
- Update 13d-assemblyai-transcription.py to explicitly use u3-rt-pro model - Update 55d-update-settings-assemblyai-stt.py to demonstrate keyterms updates instead of language updates - Add helpful logging to show before/after keyterms boosting effect - Use difficult names (Xiomara, Saoirse, Krzystof) to demonstrate boosting effectiveness
… parameter - Keep old parameter name for backward compatibility - Add deprecation warning when old parameter is used - Automatically migrate old parameter value to new min_turn_silence parameter - Exclude deprecated parameter from WebSocket URL to avoid sending it to API - New parameter takes precedence if both are set
- Makes deprecation warning visible in logs without needing Python warning flags - Users will see the warning during normal operation
markbackman
left a comment
There was a problem hiding this comment.
Thanks for the update!
Two big things:
- I'm finding the code a bit hard to follow and I think this is because the mapping of what each model supports is divided in different places in the codebase.
It would be helpful if this definition were reinforced in code, or if the model and param combinations were documented in one place.
- The concept of calling one STT mode and the other not is confusing. It's an STT service, so having an STT mode doesn't really make much sense, at least to me. Perhaps we talking about turn detection, right? Maybe that's the terminology that we need to be clear about.
I'll continue to review more but wanted to get this early feedback in.
| Only applies to Mode 2 (STT turn detection). In Mode 1, VAD + | ||
| smart turn analyzer handle interruptions via the aggregator. | ||
| """ | ||
| logger.debug( |
| logger.debug(f"{self} SpeechStarted ignored in Pipecat mode") | ||
| return # Mode 1: handled by aggregator | ||
|
|
||
| logger.debug(f"{self} Processing SpeechStarted in STT mode") |
| if self._should_interrupt: | ||
| await self.push_interruption_task_frame_and_wait() | ||
| self._user_speaking = True | ||
| logger.debug(f"{self} _user_speaking set to True") |
| await self._trace_transcription(transcript_text, True, language) | ||
| await self.stop_processing_metrics() | ||
| else: | ||
| logger.debug(f'{self} Interim transcript: "{transcript_text}"') |
There was a problem hiding this comment.
Remove or set to trace.
| context = LLMContext(messages) | ||
| user_aggregator, assistant_aggregator = LLMContextAggregatorPair( | ||
| context, | ||
| user_params=LLMUserAggregatorParams(user_turn_strategies=ExternalUserTurnStrategies()), |
There was a problem hiding this comment.
Let's include the VADAnalyzer so we get TTFB measurements.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=ExternalUserTurnStrategies(),
vad_analyzer=SileroVADAnalyzer(),
),
)
| def _configure_manual_turn_mode( | ||
| self, connection_params: AssemblyAIConnectionParams | ||
| self._user_speaking = False | ||
| self._vad_speaking = False |
| old_conn_params = changed.get("connection_params") | ||
|
|
||
| # Check each potentially changed parameter | ||
| if hasattr(conn_params, "keyterms_prompt"): |
There was a problem hiding this comment.
These attributes always exist, so this will evaluate to True.
Instead, I think you want a simple check
if (
old_conn_params is None
or conn_params.keyterms_prompt != old_conn_params.keyterms_prompt
):
if conn_params.keyterms_prompt is not None:
...
|
One more: I see |
Co-authored-by: Mark Backman <m.backman@gmail.com>
Co-authored-by: Mark Backman <m.backman@gmail.com>
Co-authored-by: Mark Backman <m.backman@gmail.com>
…yAI turn detection' - Rename 07o-interruptible-assemblyai-stt.py -> 07o-interruptible-assemblyai-turn-detection.py - Replace 'STT mode' with 'AssemblyAI turn detection mode' throughout codebase - Replace 'Mode 1'/'Mode 2' with descriptive 'Pipecat turn detection'/'AssemblyAI turn detection' - Update changelog to use 'built-in turn detection' terminology - Addresses PR feedback about confusing terminology
…sal-streaming - u3-rt-pro: Does not set parameter (not used) - universal-streaming models: Set to 1.0 to maintain fast response - This ensures fast response time matches previous implementation
|
Changes Made Code Improvements
Terminology Improvements
Bug Fix
Answers to Your Questions Re: Terminology The concept of calling one STT mode and the other not is confusing. It's an STT service, so having an STT mode Absolutely right! I've updated all references to use "turn detection mode" terminology:
Re: end_of_turn_confidence_threshold One more: I see end_of_turn_confidence_threshold flipped from 1.0 to 0.0 for universal-streaming. This was required Good catch - this was actually a bug! I've corrected it. The parameter behavior is now:
So yes, with the corrected value of 1.0, universal-streaming models maintain the same fast response. This disables Re: Model/Parameter Mappings I'm finding the code a bit hard to follow and I think this is because the mapping of what each model supports is Great point. I've created comprehensive documentation with a model comparison table that shows exactly what |
Summary
Add support for AssemblyAI's u3-rt-pro streaming model with enhanced features including two-mode turn detection, dynamic parameter updates, speaker diarization,
and comprehensive debugging capabilities.
Key Features
🎯 u3-rt-pro Model Support
u3-rt-prospeech model option (set as default)🔄 Two-Mode Turn Detection
Pipecat Mode (
vad_force_turn_endpoint=True, default):ForceEndpointmessage sent on VAD stopmax_turn_silencesynchronized withmin_end_of_turn_silence_when_confidentto avoid double turn detectionSTT Mode (
vad_force_turn_endpoint=False, u3-rt-pro only):UserStartedSpeakingFrame/UserStoppedSpeakingFramefrom STTSpeechStartedevents for fast barge-in🔄 Dynamic Parameter Updates
Update configuration mid-stream without reconnection via
STTUpdateSettingsFrame:keyterms_prompt- Boost specific words/names (when API supports it)prompt- Custom transcription promptsmax_turn_silence- Maximum silence before forcing turn endmin_end_of_turn_silence_when_confident- Silence threshold for confident turn endings🎤 Speaker Diarization
speaker_labels=Truein connection paramsspeaker_formatparameter for custom formatting (e.g.,"<{speaker}>{text}</{speaker}>")🌍 Language Detection
Bug Fixes
speaker_label→speakerinTurnMessagemodelUserStartedSpeakingFrameandUserStoppedSpeakingFrameimports_update_settingsto properly sendUpdateConfigurationmessages to AssemblyAIImprovements
Enhanced Warnings
min_end_of_turn_silence_when_confidentis not set to optimal 100msmax_turn_silenceis overridden in Pipecat modeComprehensive Logging
Documentation
Models Support
u3-rt-prouniversal-streaming-englishuniversal-streaming-multilingualBreaking Changes
None - all changes are backward compatible. Default behavior unchanged for existing users.
Testing
Extensively tested with 23-test comprehensive suite covering:
Example Usage
Basic u3-rt-pro: