Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog/3881.added.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
- Added `StartupTimingObserver` for measuring how long each processor's `start()` method takes during pipeline startup. Also measures transport readiness — the time from `StartFrame` to first client connection — via the `on_transport_readiness_measured` event. Useful for diagnosing cold start slowness and identifying initialization bottlenecks.
- Added `StartupTimingObserver` for measuring how long each processor's `start()` method takes during pipeline startup. Also measures transport readiness — the time from `StartFrame` to first client connection — via the `on_transport_timing_report` event.
1 change: 1 addition & 0 deletions changelog/3885.added.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `on_first_bot_speech_latency` event to `UserBotLatencyObserver` measuring the time from client connection to first bot speech. An `on_latency_breakdown` is also emitted for this first speech event.
1 change: 1 addition & 0 deletions changelog/3885.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `on_latency_breakdown` event to `UserBotLatencyObserver` providing per-service TTFB, text aggregation, user turn duration, and function call latency metrics for each user-to-bot response cycle.
58 changes: 57 additions & 1 deletion examples/foundational/29-turn-tracking-observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
#


import asyncio
import os

from dotenv import load_dotenv
from loguru import logger

from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.startup_timing_observer import StartupTimingObserver
Expand All @@ -26,13 +29,25 @@
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

load_dotenv(override=True)


async def fetch_weather_from_api(params: FunctionCallParams):
await asyncio.sleep(0.25)
await params.result_callback({"conditions": "nice", "temperature": "75"})


async def fetch_restaurant_recommendation(params: FunctionCallParams):
await asyncio.sleep(0.1)
await params.result_callback({"name": "The Golden Dragon"})


# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
Expand Down Expand Up @@ -63,14 +78,46 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):

llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)

weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])

messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]

context = LLMContext(messages)
context = LLMContext(messages, tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
Expand Down Expand Up @@ -101,6 +148,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
observers=[latency_observer, startup_observer],
)

@latency_observer.event_handler("on_first_bot_speech_latency")
async def on_first_bot_speech_latency(observer, latency_seconds):
logger.info(f"First bot speech: {latency_seconds:.3f}s after client connected")

@latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
logger.info(f"⏱️ User-to-bot latency: {latency_seconds:.3f}s")
Expand Down Expand Up @@ -131,6 +182,11 @@ async def on_turn_ended(observer, turn_number, duration, was_interrupted):
else:
logger.info(f"🏁 Turn {turn_number} completed in {duration:.2f}s")

@latency_observer.event_handler("on_latency_breakdown")
async def on_latency_breakdown(observer, breakdown):
for event in breakdown.chronological_events():
logger.info(f" {event}")

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
Expand Down
Loading