Skip to content

Commit f6ed923

Browse files
committed
Add first-bot-speech latency to UserBotLatencyObserver
Measure time from ClientConnectedFrame to first BotStartedSpeakingFrame, emitting a one-time on_first_bot_speech_latency event with breakdown.
1 parent 05dcad9 commit f6ed923

5 files changed

Lines changed: 174 additions & 16 deletions

File tree

changelog/3885.added.2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added `on_first_bot_speech_latency` event to `UserBotLatencyObserver` measuring the time from client connection to first bot speech, including a latency breakdown with per-service metrics.

changelog/3885.added.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
- Added `LatencyBreakdown` dataclass and `on_latency_breakdown` event to `UserBotLatencyObserver` for per-service latency metrics (TTFB, text aggregation, user turn duration) collected during each user-to-bot response cycle.
1+
- Added `on_latency_breakdown` event to `UserBotLatencyObserver` providing per-service TTFB, text aggregation, and user turn duration metrics for each user-to-bot response cycle.

examples/foundational/29-turn-tracking-observer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
101101
observers=[latency_observer, startup_observer],
102102
)
103103

104+
@latency_observer.event_handler("on_first_bot_speech_latency")
105+
async def on_first_bot_speech_latency(observer, latency_seconds):
106+
logger.info(f"First bot speech: {latency_seconds:.3f}s after client connected")
107+
104108
@latency_observer.event_handler("on_latency_measured")
105109
async def on_latency_measured(observer, latency_seconds):
106110
logger.info(f"⏱️ User-to-bot latency: {latency_seconds:.3f}s")

src/pipecat/observers/user_bot_latency_observer.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from pipecat.frames.frames import (
2121
BotStartedSpeakingFrame,
22+
ClientConnectedFrame,
2223
InterruptionFrame,
2324
MetricsFrame,
2425
UserStoppedSpeakingFrame,
@@ -80,6 +81,10 @@ class UserBotLatencyObserver(BaseObserver):
8081
on_latency_breakdown(observer, breakdown): Emitted at each
8182
``BotStartedSpeakingFrame`` with a :class:`LatencyBreakdown`
8283
containing per-service metrics collected during the user→bot cycle.
84+
on_first_bot_speech_latency(observer, latency_seconds): Emitted once,
85+
the first time ``BotStartedSpeakingFrame`` arrives after
86+
``ClientConnectedFrame``. Measures the time from client connection
87+
to the first bot speech.
8388
"""
8489

8590
def __init__(self, *, max_frames=100, **kwargs):
@@ -97,6 +102,10 @@ def __init__(self, *, max_frames=100, **kwargs):
97102
self._user_stopped_time: Optional[float] = None
98103
self._user_turn: Optional[float] = None
99104

105+
# First bot speech tracking
106+
self._client_connected_time: Optional[float] = None
107+
self._first_bot_speech_measured: bool = False
108+
100109
# Frame deduplication (bounded deque + set pattern)
101110
self._processed_frames: set = set()
102111
self._frame_history: deque = deque(maxlen=max_frames)
@@ -107,6 +116,7 @@ def __init__(self, *, max_frames=100, **kwargs):
107116

108117
self._register_event_handler("on_latency_measured")
109118
self._register_event_handler("on_latency_breakdown")
119+
self._register_event_handler("on_first_bot_speech_latency")
110120

111121
async def on_push_frame(self, data: FramePushed):
112122
"""Process frames to track speech timing and calculate latency.
@@ -132,12 +142,21 @@ async def on_push_frame(self, data: FramePushed):
132142
if len(self._processed_frames) > len(self._frame_history):
133143
self._processed_frames = set(self._frame_history)
134144

145+
# Track client connection (first occurrence only)
146+
if isinstance(data.frame, ClientConnectedFrame):
147+
if self._client_connected_time is None:
148+
self._client_connected_time = time.time()
149+
return
150+
135151
# Track speech and pipeline events for latency
136152
if isinstance(data.frame, VADUserStartedSpeakingFrame):
137153
# Reset when user starts speaking
138154
self._user_stopped_time = None
139155
self._user_turn = None
140156
self._reset_accumulators()
157+
# If user speaks before the bot's first speech, abandon the
158+
# first-bot-speech measurement — it's only meaningful for greetings.
159+
self._first_bot_speech_measured = True
141160
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
142161
# Record the actual time the user stopped speaking, which is
143162
# the VAD determination time minus the stop_secs silence duration
@@ -159,28 +178,41 @@ async def on_push_frame(self, data: FramePushed):
159178

160179
async def _handle_bot_started_speaking(self):
161180
"""Handle BotStartedSpeakingFrame to emit latency and breakdown."""
162-
if self._user_stopped_time is None:
163-
return
181+
emit_breakdown = False
164182

165-
latency = time.time() - self._user_stopped_time
166-
self._user_stopped_time = None
167-
await self._call_event_handler("on_latency_measured", latency)
183+
# One-time first bot speech measurement (client connect → first speech)
184+
if self._client_connected_time is not None and not self._first_bot_speech_measured:
185+
self._first_bot_speech_measured = True
186+
latency = time.time() - self._client_connected_time
187+
await self._call_event_handler("on_first_bot_speech_latency", latency)
188+
emit_breakdown = True
168189

169-
breakdown = LatencyBreakdown(
170-
ttfb=list(self._ttfb),
171-
text_aggregation=self._text_aggregation,
172-
user_turn_secs=self._user_turn,
173-
)
174-
await self._call_event_handler("on_latency_breakdown", breakdown)
175-
self._reset_accumulators()
190+
if self._user_stopped_time is not None:
191+
latency = time.time() - self._user_stopped_time
192+
self._user_stopped_time = None
193+
await self._call_event_handler("on_latency_measured", latency)
194+
emit_breakdown = True
195+
196+
if emit_breakdown:
197+
breakdown = LatencyBreakdown(
198+
ttfb=list(self._ttfb),
199+
text_aggregation=self._text_aggregation,
200+
user_turn_secs=self._user_turn,
201+
)
202+
await self._call_event_handler("on_latency_breakdown", breakdown)
203+
self._reset_accumulators()
176204

177205
def _handle_metrics_frame(self, frame: MetricsFrame):
178206
"""Extract latency metrics from a MetricsFrame.
179207
180-
Only accumulates metrics when a user→bot measurement is in progress
181-
(after ``VADUserStoppedSpeakingFrame``).
208+
Accumulates metrics when a measurement is in progress: either a
209+
user→bot cycle (after ``VADUserStoppedSpeakingFrame``) or the
210+
first-bot-speech window (after ``ClientConnectedFrame``).
182211
"""
183-
if self._user_stopped_time is None:
212+
waiting_for_first_speech = (
213+
self._client_connected_time is not None and not self._first_bot_speech_measured
214+
)
215+
if self._user_stopped_time is None and not waiting_for_first_speech:
184216
return
185217

186218
for metrics_data in frame.data:

tests/test_user_bot_latency_observer.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pipecat.frames.frames import (
44
BotStartedSpeakingFrame,
5+
ClientConnectedFrame,
56
InterruptionFrame,
67
MetricsFrame,
78
UserStoppedSpeakingFrame,
@@ -342,6 +343,126 @@ async def on_breakdown(obs, breakdown):
342343
self.assertEqual(len(latencies), 0)
343344
self.assertEqual(len(breakdowns), 0)
344345

346+
async def test_first_bot_speech_latency(self):
347+
"""Test first bot speech latency and breakdown from ClientConnected to BotStartedSpeaking."""
348+
observer = UserBotLatencyObserver()
349+
processor = IdentityFilter()
350+
351+
first_speech_latencies = []
352+
breakdowns = []
353+
354+
@observer.event_handler("on_first_bot_speech_latency")
355+
async def on_first_bot_speech(obs, latency_seconds):
356+
first_speech_latencies.append(latency_seconds)
357+
358+
@observer.event_handler("on_latency_breakdown")
359+
async def on_breakdown(obs, breakdown):
360+
breakdowns.append(breakdown)
361+
362+
llm_ttfb = TTFBMetricsData(processor="OpenAILLMService#0", value=0.250)
363+
tts_ttfb = TTFBMetricsData(processor="CartesiaTTSService#0", value=0.070)
364+
365+
frames_to_send = [
366+
ClientConnectedFrame(),
367+
MetricsFrame(data=[llm_ttfb]),
368+
MetricsFrame(data=[tts_ttfb]),
369+
BotStartedSpeakingFrame(),
370+
]
371+
372+
expected_down_frames = [
373+
ClientConnectedFrame,
374+
MetricsFrame,
375+
MetricsFrame,
376+
BotStartedSpeakingFrame,
377+
]
378+
379+
await run_test(
380+
processor,
381+
frames_to_send=frames_to_send,
382+
expected_down_frames=expected_down_frames,
383+
observers=[observer],
384+
)
385+
386+
self.assertEqual(len(first_speech_latencies), 1)
387+
self.assertGreater(first_speech_latencies[0], 0)
388+
self.assertLess(first_speech_latencies[0], 1.0)
389+
390+
# Breakdown should also be emitted with the accumulated metrics
391+
self.assertEqual(len(breakdowns), 1)
392+
self.assertEqual(len(breakdowns[0].ttfb), 2)
393+
self.assertEqual(breakdowns[0].ttfb[0].processor, "OpenAILLMService#0")
394+
self.assertEqual(breakdowns[0].ttfb[1].processor, "CartesiaTTSService#0")
395+
396+
async def test_first_bot_speech_only_once(self):
397+
"""Test that first bot speech latency is only emitted once."""
398+
observer = UserBotLatencyObserver()
399+
processor = IdentityFilter()
400+
401+
first_speech_latencies = []
402+
403+
@observer.event_handler("on_first_bot_speech_latency")
404+
async def on_first_bot_speech(obs, latency_seconds):
405+
first_speech_latencies.append(latency_seconds)
406+
407+
frames_to_send = [
408+
ClientConnectedFrame(),
409+
BotStartedSpeakingFrame(),
410+
# Second bot speech should not trigger the event again
411+
VADUserStoppedSpeakingFrame(),
412+
BotStartedSpeakingFrame(),
413+
]
414+
415+
expected_down_frames = [
416+
ClientConnectedFrame,
417+
BotStartedSpeakingFrame,
418+
VADUserStoppedSpeakingFrame,
419+
BotStartedSpeakingFrame,
420+
]
421+
422+
await run_test(
423+
processor,
424+
frames_to_send=frames_to_send,
425+
expected_down_frames=expected_down_frames,
426+
observers=[observer],
427+
)
428+
429+
self.assertEqual(len(first_speech_latencies), 1)
430+
431+
async def test_first_bot_speech_skipped_when_user_speaks_first(self):
432+
"""Test that first bot speech event is not emitted when user speaks before the bot."""
433+
observer = UserBotLatencyObserver()
434+
processor = IdentityFilter()
435+
436+
first_speech_latencies = []
437+
438+
@observer.event_handler("on_first_bot_speech_latency")
439+
async def on_first_bot_speech(obs, latency_seconds):
440+
first_speech_latencies.append(latency_seconds)
441+
442+
frames_to_send = [
443+
ClientConnectedFrame(),
444+
# User speaks before bot has a chance to greet
445+
VADUserStartedSpeakingFrame(),
446+
VADUserStoppedSpeakingFrame(),
447+
BotStartedSpeakingFrame(),
448+
]
449+
450+
expected_down_frames = [
451+
ClientConnectedFrame,
452+
VADUserStartedSpeakingFrame,
453+
VADUserStoppedSpeakingFrame,
454+
BotStartedSpeakingFrame,
455+
]
456+
457+
await run_test(
458+
processor,
459+
frames_to_send=frames_to_send,
460+
expected_down_frames=expected_down_frames,
461+
observers=[observer],
462+
)
463+
464+
self.assertEqual(len(first_speech_latencies), 0)
465+
345466

346467
if __name__ == "__main__":
347468
unittest.main()

0 commit comments

Comments
 (0)