1+ #
2+ # Copyright (c) 2024-2026, Daily
3+ #
4+ # SPDX-License-Identifier: BSD 2-Clause License
5+ #
6+
17"""Observer for tracking user-to-bot response latency.
28
39This module provides an observer that monitors the time between when a user
410stops speaking and when the bot starts speaking, emitting events when latency
5- is measured.
11+ is measured. Optionally collects per-service latency breakdown metrics
12+ (TTFB, text aggregation) when ``enable_metrics=True``.
613"""
714
815import time
9- from typing import Optional , Set
16+ from collections import deque
17+ from dataclasses import dataclass , field
18+ from typing import List , Optional
1019
1120from pipecat .frames .frames import (
1221 BotStartedSpeakingFrame ,
22+ InterruptionFrame ,
23+ MetricsFrame ,
24+ UserStoppedSpeakingFrame ,
1325 VADUserStartedSpeakingFrame ,
1426 VADUserStoppedSpeakingFrame ,
1527)
28+ from pipecat .metrics .metrics import (
29+ TextAggregationMetricsData ,
30+ TTFBMetricsData ,
31+ )
1632from pipecat .observers .base_observer import BaseObserver , FramePushed
1733from pipecat .processors .frame_processor import FrameDirection
1834
1935
36+ @dataclass
37+ class LatencyBreakdown :
38+ """Per-service latency breakdown for a single user-to-bot cycle.
39+
40+ Collected between ``VADUserStoppedSpeakingFrame`` and
41+ ``BotStartedSpeakingFrame`` when ``enable_metrics=True`` in
42+ :class:`~pipecat.pipeline.task.PipelineParams`.
43+
44+ Parameters:
45+ ttfb: Time-to-first-byte metrics from each service in the pipeline.
46+ text_aggregation: First text aggregation measurement, representing
47+ the latency cost of sentence aggregation in the TTS pipeline.
48+ user_turn_secs: Duration in seconds of the user's turn, measured
49+ from when the user actually stopped speaking to when the turn
50+ was released (``UserStoppedSpeakingFrame``). This includes
51+ VAD silence detection, STT finalization, and any turn analyzer
52+ wait. ``None`` if no ``UserStoppedSpeakingFrame`` was observed
53+ (e.g. no turn analyzer configured).
54+ """
55+
56+ ttfb : List [TTFBMetricsData ] = field (default_factory = list )
57+ text_aggregation : Optional [TextAggregationMetricsData ] = None
58+ user_turn_secs : Optional [float ] = None
59+
60+
2061class UserBotLatencyObserver (BaseObserver ):
2162 """Observer that tracks user-to-bot response latency.
2263
@@ -25,34 +66,54 @@ class UserBotLatencyObserver(BaseObserver):
2566 latency is measured, allowing consumers to log, trace, or otherwise process
2667 the latency data.
2768
69+ When ``enable_metrics=True`` in pipeline params, also collects per-service
70+ latency breakdown (TTFB, text aggregation) and emits an
71+ ``on_latency_breakdown`` event alongside the existing latency measurement.
72+
2873 This observer follows the composition pattern used by TurnTrackingObserver,
2974 acting as a reusable component for latency measurement.
3075
3176 Events:
32- on_latency_measured(observer, latency_seconds): Emitted when user-to-bot
33- latency is calculated. Includes the latency value in seconds as a float.
77+ on_latency_measured(observer, latency_seconds): Emitted when
78+ time-to-first-bot-speech is calculated. Measures the time from
79+ when the user stopped speaking to when the bot starts speaking.
80+ on_latency_breakdown(observer, breakdown): Emitted at each
81+ ``BotStartedSpeakingFrame`` with a :class:`LatencyBreakdown`
82+ containing per-service metrics collected during the user→bot cycle.
3483 """
3584
36- def __init__ (self , ** kwargs ):
85+ def __init__ (self , * , max_frames = 100 , * *kwargs ):
3786 """Initialize the user-bot latency observer.
3887
3988 Sets up tracking for processed frames and user speech timing
4089 to calculate response latencies.
4190
4291 Args:
92+ max_frames: Maximum number of frame IDs to keep in history for
93+ duplicate detection. Defaults to 100.
4394 **kwargs: Additional arguments passed to parent class.
4495 """
4596 super ().__init__ (** kwargs )
4697 self ._user_stopped_time : Optional [float ] = None
47- self ._processed_frames : Set [str ] = set ()
98+ self ._user_turn : Optional [float ] = None
99+
100+ # Frame deduplication (bounded deque + set pattern)
101+ self ._processed_frames : set = set ()
102+ self ._frame_history : deque = deque (maxlen = max_frames )
103+
104+ # Per-cycle metric accumulators
105+ self ._ttfb : List [TTFBMetricsData ] = []
106+ self ._text_aggregation : Optional [TextAggregationMetricsData ] = None
48107
49108 self ._register_event_handler ("on_latency_measured" )
109+ self ._register_event_handler ("on_latency_breakdown" )
50110
51111 async def on_push_frame (self , data : FramePushed ):
52112 """Process frames to track speech timing and calculate latency.
53113
54114 Tracks VAD events and bot speaking events to measure the time between
55- user stopping speech and bot starting speech.
115+ user stopping speech and bot starting speech. Also accumulates metrics
116+ from MetricsFrame for the latency breakdown.
56117
57118 Args:
58119 data: Frame push event containing the frame and direction information.
@@ -61,23 +122,78 @@ async def on_push_frame(self, data: FramePushed):
61122 if data .direction != FrameDirection .DOWNSTREAM :
62123 return
63124
64- # Skip already processed frames
125+ # Skip already processed frames (bounded deque + set)
65126 if data .frame .id in self ._processed_frames :
66127 return
67128
68129 self ._processed_frames .add (data .frame .id )
130+ self ._frame_history .append (data .frame .id )
131+
132+ if len (self ._processed_frames ) > len (self ._frame_history ):
133+ self ._processed_frames = set (self ._frame_history )
69134
70- # Track VAD and bot speaking events for latency
135+ # Track speech and pipeline events for latency
71136 if isinstance (data .frame , VADUserStartedSpeakingFrame ):
72137 # Reset when user starts speaking
73138 self ._user_stopped_time = None
139+ self ._user_turn = None
140+ self ._reset_accumulators ()
74141 elif isinstance (data .frame , VADUserStoppedSpeakingFrame ):
75142 # Record the actual time the user stopped speaking, which is
76143 # the VAD determination time minus the stop_secs silence duration
77144 # that had to elapse before the VAD confirmed speech ended.
78145 self ._user_stopped_time = data .frame .timestamp - data .frame .stop_secs
79- elif isinstance (data .frame , BotStartedSpeakingFrame ) and self ._user_stopped_time :
80- # Calculate and emit latency
81- latency = time .time () - self ._user_stopped_time
82- self ._user_stopped_time = None
83- await self ._call_event_handler ("on_latency_measured" , latency )
146+ elif isinstance (data .frame , UserStoppedSpeakingFrame ):
147+ # Measure the user turn duration: from actual user silence to
148+ # turn release. Includes VAD silence detection, STT finalization,
149+ # and any turn analyzer wait.
150+ if self ._user_stopped_time is not None :
151+ self ._user_turn = time .time () - self ._user_stopped_time
152+ elif isinstance (data .frame , InterruptionFrame ):
153+ # Discard stale metrics from cancelled LLM/TTS cycles
154+ self ._reset_accumulators ()
155+ elif isinstance (data .frame , MetricsFrame ):
156+ self ._handle_metrics_frame (data .frame )
157+ elif isinstance (data .frame , BotStartedSpeakingFrame ):
158+ await self ._handle_bot_started_speaking ()
159+
160+ async def _handle_bot_started_speaking (self ):
161+ """Handle BotStartedSpeakingFrame to emit latency and breakdown."""
162+ if self ._user_stopped_time is None :
163+ return
164+
165+ latency = time .time () - self ._user_stopped_time
166+ self ._user_stopped_time = None
167+ await self ._call_event_handler ("on_latency_measured" , latency )
168+
169+ breakdown = LatencyBreakdown (
170+ ttfb = list (self ._ttfb ),
171+ text_aggregation = self ._text_aggregation ,
172+ user_turn_secs = self ._user_turn ,
173+ )
174+ await self ._call_event_handler ("on_latency_breakdown" , breakdown )
175+ self ._reset_accumulators ()
176+
177+ def _handle_metrics_frame (self , frame : MetricsFrame ):
178+ """Extract latency metrics from a MetricsFrame.
179+
180+ Only accumulates metrics when a user→bot measurement is in progress
181+ (after ``VADUserStoppedSpeakingFrame``).
182+ """
183+ if self ._user_stopped_time is None :
184+ return
185+
186+ for metrics_data in frame .data :
187+ if isinstance (metrics_data , TTFBMetricsData ) and metrics_data .value > 0 :
188+ self ._ttfb .append (metrics_data )
189+ elif isinstance (metrics_data , TextAggregationMetricsData ):
190+ # Only keep the first measurement — it's the one that
191+ # impacts the initial speaking latency.
192+ if self ._text_aggregation is None :
193+ self ._text_aggregation = metrics_data
194+
195+ def _reset_accumulators (self ):
196+ """Clear per-cycle metric accumulators."""
197+ self ._ttfb = []
198+ self ._text_aggregation = None
199+ self ._user_turn = None
0 commit comments