Skip to content

Commit 012ef41

Browse files
committed
Redesign UserIdleController to use BotStoppedSpeakingFrame
Replace the continuous heartbeat-based timer (UserSpeakingFrame/BotSpeakingFrame + asyncio.Event loop) with a simple one-shot timer that starts when BotStoppedSpeakingFrame is received and cancels on UserStartedSpeakingFrame or BotStartedSpeakingFrame. This eliminates false idle triggers caused by gaps between the user finishing speaking and the bot starting to speak (LLM/TTS latency). Guard the timer start with two conditions to prevent false triggers: - User turn in progress: during interruptions, BotStoppedSpeaking arrives while the user is still speaking mid-turn. - Function calls in progress: FunctionCallsStarted arrives before BotStoppedSpeaking because the bot speaks concurrently with the function call starting, so the timer must wait for the result and subsequent bot response.
1 parent 73cb96b commit 012ef41

5 files changed

Lines changed: 237 additions & 167 deletions

File tree

examples/foundational/17-detect-user-idle.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
#
66

77

8+
import asyncio
89
import os
910

1011
from dotenv import load_dotenv
1112
from loguru import logger
1213

14+
from pipecat.adapters.schemas.function_schema import FunctionSchema
15+
from pipecat.adapters.schemas.tools_schema import ToolsSchema
1316
from pipecat.audio.vad.silero import SileroVADAnalyzer
1417
from pipecat.frames.frames import (
1518
EndTaskFrame,
@@ -30,6 +33,7 @@
3033
from pipecat.runner.utils import create_transport
3134
from pipecat.services.cartesia.tts import CartesiaTTSService
3235
from pipecat.services.deepgram.stt import DeepgramSTTService
36+
from pipecat.services.llm_service import FunctionCallParams
3337
from pipecat.services.openai.llm import OpenAILLMService
3438
from pipecat.transports.base_transport import BaseTransport, TransportParams
3539
from pipecat.transports.daily.transport import DailyParams
@@ -74,6 +78,17 @@ async def handle_idle(self, aggregator):
7478
await aggregator.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
7579

7680

81+
async def fetch_weather_from_api(params: FunctionCallParams):
82+
# Simulate a slow API call, waiting longer than the user idle timeout.
83+
await asyncio.sleep(3)
84+
await params.result_callback({"conditions": "nice", "temperature": "75"})
85+
86+
87+
async def fetch_restaurant_recommendation(params: FunctionCallParams):
88+
await asyncio.sleep(6)
89+
await params.result_callback({"name": "The Golden Dragon"})
90+
91+
7792
# We use lambdas to defer transport parameter creation until the transport
7893
# type is selected at runtime.
7994
transport_params = {
@@ -104,14 +119,50 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
104119

105120
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
106121

122+
llm.register_function("get_current_weather", fetch_weather_from_api)
123+
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
124+
125+
@llm.event_handler("on_function_calls_started")
126+
async def on_function_calls_started(service, function_calls):
127+
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
128+
129+
weather_function = FunctionSchema(
130+
name="get_current_weather",
131+
description="Get the current weather",
132+
properties={
133+
"location": {
134+
"type": "string",
135+
"description": "The city and state, e.g. San Francisco, CA",
136+
},
137+
"format": {
138+
"type": "string",
139+
"enum": ["celsius", "fahrenheit"],
140+
"description": "The temperature unit to use. Infer this from the user's location.",
141+
},
142+
},
143+
required=["location", "format"],
144+
)
145+
restaurant_function = FunctionSchema(
146+
name="get_restaurant_recommendation",
147+
description="Get a restaurant recommendation",
148+
properties={
149+
"location": {
150+
"type": "string",
151+
"description": "The city and state, e.g. San Francisco, CA",
152+
},
153+
},
154+
required=["location"],
155+
)
156+
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
157+
107158
messages = [
108159
{
109160
"role": "system",
110161
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
111162
},
112163
]
113164

114-
context = LLMContext(messages)
165+
context = LLMContext(messages, tools)
115166
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
116167
context,
117168
user_params=LLMUserAggregatorParams(
@@ -146,6 +197,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
146197

147198
@user_aggregator.event_handler("on_user_turn_idle")
148199
async def on_user_turn_idle(aggregator):
200+
logger.info(f"User turn idle")
149201
await idle_handler.handle_idle(aggregator)
150202

151203
@user_aggregator.event_handler("on_user_turn_started")

src/pipecat/processors/aggregators/llm_response_universal.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,9 @@ async def _on_user_turn_started(
689689
if params.enable_user_speaking_frames:
690690
await self.broadcast_frame(UserStartedSpeakingFrame)
691691

692+
if self._user_idle_controller:
693+
await self._user_idle_controller.process_frame(UserStartedSpeakingFrame())
694+
692695
if params.enable_interruptions and self._allow_interruptions:
693696
await self.push_interruption_task_frame_and_wait()
694697

@@ -705,6 +708,9 @@ async def _on_user_turn_stopped(
705708
if params.enable_user_speaking_frames:
706709
await self.broadcast_frame(UserStoppedSpeakingFrame)
707710

711+
if self._user_idle_controller:
712+
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
713+
708714
await self._maybe_emit_user_turn_stopped(strategy)
709715

710716
async def _on_user_turn_stop_timeout(self, controller):

src/pipecat/turns/user_idle_controller.py

Lines changed: 63 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
from typing import Optional
1111

1212
from pipecat.frames.frames import (
13-
BotSpeakingFrame,
13+
BotStartedSpeakingFrame,
14+
BotStoppedSpeakingFrame,
1415
Frame,
16+
FunctionCallCancelFrame,
1517
FunctionCallResultFrame,
1618
FunctionCallsStartedFrame,
17-
UserSpeakingFrame,
1819
UserStartedSpeakingFrame,
20+
UserStoppedSpeakingFrame,
1921
)
2022
from pipecat.utils.asyncio.task_manager import BaseTaskManager
2123
from pipecat.utils.base_object import BaseObject
@@ -25,14 +27,14 @@ class UserIdleController(BaseObject):
2527
"""Controller for managing user idle detection.
2628
2729
This class monitors user activity and triggers an event when the user has been
28-
idle (not speaking) for a configured timeout period. It only starts monitoring
29-
after the first conversation activity and does not trigger while the bot is
30-
speaking or function calls are in progress.
30+
idle (not speaking) for a configured timeout period after the bot finishes
31+
speaking. The timer starts when BotStoppedSpeakingFrame is received and is
32+
cancelled when someone starts speaking again (UserStartedSpeakingFrame or
33+
BotStartedSpeakingFrame).
3134
32-
The controller tracks activity using continuous frames (UserSpeakingFrame and
33-
BotSpeakingFrame) which are emitted repeatedly while speaking is happening, and
34-
state-based tracking for function calls (FunctionCallsStartedFrame and
35-
FunctionCallResultFrame) which are only sent at start and end.
35+
The timer is suppressed while a user turn is in progress to avoid false
36+
triggers during interruptions (where BotStoppedSpeakingFrame arrives while
37+
the user is still speaking).
3638
3739
Event handlers available:
3840
@@ -62,11 +64,9 @@ def __init__(
6264

6365
self._task_manager: Optional[BaseTaskManager] = None
6466

65-
self._conversation_started = False
66-
self._function_call_in_progress = False
67-
68-
self.user_idle_event = asyncio.Event()
69-
self.user_idle_task: Optional[asyncio.Task] = None
67+
self._user_turn_in_progress: bool = False
68+
self._function_calls_in_progress: int = 0
69+
self._idle_timer_task: Optional[asyncio.Task] = None
7070

7171
self._register_event_handler("on_user_turn_idle", sync=True)
7272

@@ -85,89 +85,63 @@ async def setup(self, task_manager: BaseTaskManager):
8585
"""
8686
self._task_manager = task_manager
8787

88-
if not self.user_idle_task:
89-
self.user_idle_task = self.task_manager.create_task(
90-
self.user_idle_task_handler(),
91-
f"{self}::user_idle_task_handler",
92-
)
93-
9488
async def cleanup(self):
9589
"""Cleanup the controller."""
9690
await super().cleanup()
97-
98-
if self.user_idle_task:
99-
await self.task_manager.cancel_task(self.user_idle_task)
100-
self.user_idle_task = None
91+
await self._cancel_idle_timer()
10192

10293
async def process_frame(self, frame: Frame):
10394
"""Process an incoming frame to track user activity state.
10495
10596
Args:
10697
frame: The frame to be processed.
10798
"""
108-
# Start monitoring on first conversation activity
109-
if not self._conversation_started:
110-
if isinstance(frame, (UserStartedSpeakingFrame, BotSpeakingFrame)):
111-
self._conversation_started = True
112-
self.user_idle_event.set()
113-
else:
114-
return
115-
116-
# Reset idle timer on continuous activity frames
117-
if isinstance(frame, (UserSpeakingFrame, BotSpeakingFrame)):
118-
await self._handle_activity(frame)
119-
# Track function call state (start/end frames, not continuous)
99+
if isinstance(frame, BotStoppedSpeakingFrame):
100+
# Only start the timer if the user isn't mid-turn and no function
101+
# calls are pending.
102+
#
103+
# Interruption case: the frame order is UserStartedSpeaking →
104+
# BotStoppedSpeaking → (user keeps talking) → UserStoppedSpeaking.
105+
# Without the user-turn guard the timer would start while the user
106+
# is still speaking.
107+
#
108+
# Function call case: normally FunctionCallsStarted arrives after
109+
# BotStoppedSpeaking and cancels the timer directly. But a race
110+
# condition can cause FunctionCallsStarted to arrive before
111+
# BotStoppedSpeaking when pushing a TTSSpeakFrame in the
112+
# on_function_calls_started event handler, so the counter guard
113+
# prevents the timer from starting while a function call is in progress.
114+
if not self._user_turn_in_progress and self._function_calls_in_progress == 0:
115+
await self._start_idle_timer()
116+
elif isinstance(frame, BotStartedSpeakingFrame):
117+
await self._cancel_idle_timer()
118+
elif isinstance(frame, UserStartedSpeakingFrame):
119+
self._user_turn_in_progress = True
120+
await self._cancel_idle_timer()
121+
elif isinstance(frame, UserStoppedSpeakingFrame):
122+
self._user_turn_in_progress = False
120123
elif isinstance(frame, FunctionCallsStartedFrame):
121-
await self._handle_function_calls_started(frame)
122-
elif isinstance(frame, FunctionCallResultFrame):
123-
await self._handle_function_call_result(frame)
124-
125-
async def _handle_activity(self, _: UserSpeakingFrame | BotSpeakingFrame):
126-
"""Handle continuous activity frames that should reset the idle timer.
127-
128-
These frames are emitted continuously while the user or bot is speaking,
129-
so we simply reset the timer whenever we receive them.
130-
131-
Args:
132-
frame: The activity frame to process.
133-
"""
134-
self.user_idle_event.set()
135-
136-
async def _handle_function_calls_started(self, _: FunctionCallsStartedFrame):
137-
"""Handle function calls started event.
138-
139-
Function calls can take longer than the timeout, so we track their state
140-
to prevent idle callbacks while they're in progress.
141-
142-
Args:
143-
frame: The FunctionCallsStartedFrame to process.
144-
"""
145-
self._function_call_in_progress = True
146-
self.user_idle_event.set()
147-
148-
async def _handle_function_call_result(self, _: FunctionCallResultFrame):
149-
"""Handle function call result event.
150-
151-
Args:
152-
frame: The FunctionCallResultFrame to process.
153-
"""
154-
self._function_call_in_progress = False
155-
self.user_idle_event.set()
156-
157-
async def user_idle_task_handler(self):
158-
"""Monitors for idle timeout and triggers events.
159-
160-
Runs in a loop until cancelled. The idle timer is reset whenever activity
161-
frames are received (UserSpeakingFrame or BotSpeakingFrame). Function calls
162-
are tracked via state since they only send start/end frames. If no activity
163-
is detected for the configured timeout period and no function call is in
164-
progress, the on_user_turn_idle event is triggered.
165-
"""
166-
while True:
167-
try:
168-
await asyncio.wait_for(self.user_idle_event.wait(), timeout=self._user_idle_timeout)
169-
self.user_idle_event.clear()
170-
except asyncio.TimeoutError:
171-
# Only trigger if conversation has started and no function call is in progress
172-
if self._conversation_started and not self._function_call_in_progress:
173-
await self._call_event_handler("on_user_turn_idle")
124+
self._function_calls_in_progress += len(frame.function_calls)
125+
await self._cancel_idle_timer()
126+
elif isinstance(frame, (FunctionCallResultFrame, FunctionCallCancelFrame)):
127+
self._function_calls_in_progress = max(0, self._function_calls_in_progress - 1)
128+
129+
async def _start_idle_timer(self):
130+
"""Start (or restart) the idle timer."""
131+
await self._cancel_idle_timer()
132+
self._idle_timer_task = self.task_manager.create_task(
133+
self._idle_timer_expired(),
134+
f"{self}::idle_timer",
135+
)
136+
137+
async def _cancel_idle_timer(self):
138+
"""Cancel the idle timer if running."""
139+
if self._idle_timer_task:
140+
await self.task_manager.cancel_task(self._idle_timer_task)
141+
self._idle_timer_task = None
142+
143+
async def _idle_timer_expired(self):
144+
"""Sleep for the timeout duration then fire the idle event."""
145+
await asyncio.sleep(self._user_idle_timeout)
146+
self._idle_timer_task = None
147+
await self._call_event_handler("on_user_turn_idle")

src/pipecat/turns/user_turn_processor.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ async def _on_user_turn_started(
189189
if params.enable_user_speaking_frames:
190190
await self.broadcast_frame(UserStartedSpeakingFrame)
191191

192+
if self._user_idle_controller:
193+
await self._user_idle_controller.process_frame(UserStartedSpeakingFrame())
194+
192195
if params.enable_interruptions and self._allow_interruptions:
193196
await self.push_interruption_task_frame_and_wait()
194197

@@ -205,6 +208,9 @@ async def _on_user_turn_stopped(
205208
if params.enable_user_speaking_frames:
206209
await self.broadcast_frame(UserStoppedSpeakingFrame)
207210

211+
if self._user_idle_controller:
212+
await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame())
213+
208214
await self._call_event_handler("on_user_turn_stopped", strategy)
209215

210216
async def _on_user_turn_stop_timeout(self, controller):

0 commit comments

Comments
 (0)