Skip to content

Commit 51488ce

Browse files
authored
Merge pull request #133 from pipecat-ai/support-waiting-until-user-speaks-attempt-2
Support waiting until user speaks attempt 2
2 parents 0386044 + 62ec945 commit 51488ce

6 files changed

Lines changed: 71 additions & 17 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
### Changed
1515

16+
- Added `respond_immediately` to `NodeConfig`. Setting it to `False` has the effect of making the
17+
bot wait, after the node is activated, for the user to speak before responding.
18+
1619
- Bumped the minimum required `pipecat-ai` version to 0.0.67 to align with AWS
1720
Bedrock additions in Pipecat. This also adds support for `FunctionCallParams`
1821
which were added in 0.0.66.

examples/dynamic/restaurant_reservation.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig
2727

2828
sys.path.append(str(Path(__file__).parent.parent))
29+
import argparse
30+
2931
from runner import configure
3032

3133
load_dotenv(override=True)
@@ -166,7 +168,7 @@ async def handle_end(_: Dict, result: FlowResult, flow_manager: FlowManager):
166168

167169

168170
# Node configurations
169-
def create_initial_node() -> NodeConfig:
171+
def create_initial_node(wait_for_user: bool) -> NodeConfig:
170172
"""Create initial node for party size collection."""
171173
return {
172174
"role_messages": [
@@ -178,10 +180,11 @@ def create_initial_node() -> NodeConfig:
178180
"task_messages": [
179181
{
180182
"role": "system",
181-
"content": "Warmly greet the customer and ask how many people are in their party.",
183+
"content": "Warmly greet the customer and ask how many people are in their party. This is your only job for now; if the customer asks for something else, politely remind them you can't do it.",
182184
}
183185
],
184186
"functions": [party_size_schema],
187+
"respond_immediately": not wait_for_user,
185188
}
186189

187190

@@ -245,7 +248,7 @@ def create_end_node() -> NodeConfig:
245248

246249

247250
# Main setup
248-
async def main():
251+
async def main(wait_for_user: bool):
249252
async with aiohttp.ClientSession() as session:
250253
(room_url, _) = await configure(session)
251254

@@ -297,11 +300,19 @@ async def on_first_participant_joined(transport, participant):
297300
logger.debug("Initializing flow manager")
298301
await flow_manager.initialize()
299302
logger.debug("Setting initial node")
300-
await flow_manager.set_node("initial", create_initial_node())
303+
await flow_manager.set_node("initial", create_initial_node(wait_for_user))
301304

302305
runner = PipelineRunner()
303306
await runner.run(task)
304307

305308

306309
if __name__ == "__main__":
307-
asyncio.run(main())
310+
parser = argparse.ArgumentParser(description="Restaurant reservation bot")
311+
parser.add_argument(
312+
"--wait-for-user",
313+
action="store_true",
314+
help="If set, the bot will wait for the user to speak first",
315+
)
316+
args = parser.parse_args()
317+
318+
asyncio.run(main(args.wait_for_user))

examples/runner.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
import os
99

1010
import aiohttp
11-
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyMeetingTokenParams, DailyMeetingTokenProperties
11+
from pipecat.transports.services.helpers.daily_rest import (
12+
DailyMeetingTokenParams,
13+
DailyMeetingTokenProperties,
14+
DailyRESTHelper,
15+
)
1216

1317

1418
async def configure(aiohttp_session: aiohttp.ClientSession):
@@ -58,11 +62,9 @@ async def configure_with_args(
5862
expiry_time: float = 60 * 60
5963

6064
token = await daily_rest_helper.get_token(
61-
url,
62-
expiry_time,
63-
params=DailyMeetingTokenParams(properties=DailyMeetingTokenProperties(
64-
user_id="bot"
65-
))
65+
url,
66+
expiry_time,
67+
params=DailyMeetingTokenParams(properties=DailyMeetingTokenProperties(user_id="bot")),
6668
)
6769

6870
return (url, token, args)

src/pipecat_flows/actions.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@
2222
"""
2323

2424
import asyncio
25-
from dataclasses import dataclass
2625
import inspect
26+
from dataclasses import dataclass
2727
from typing import Callable, Dict, List, Optional
2828

2929
from loguru import logger
3030
from pipecat.frames.frames import (
31+
BotStoppedSpeakingFrame,
32+
ControlFrame,
3133
EndFrame,
3234
TTSSpeakFrame,
3335
)
3436
from pipecat.pipeline.task import PipelineTask
35-
from pipecat.frames.frames import ControlFrame
3637

3738
from .exceptions import ActionError
3839
from .types import ActionConfig, FlowActionHandler
@@ -73,19 +74,23 @@ def __init__(self, task: PipelineTask, flow_manager: "FlowManager", tts=None):
7374
self._flow_manager = flow_manager
7475
self.tts = tts
7576
self.function_finished_event = asyncio.Event()
77+
self._deferred_post_actions: List[ActionConfig] = []
7678

7779
# Register built-in actions
7880
self._register_action("tts_say", self._handle_tts_action)
7981
self._register_action("end_conversation", self._handle_end_action)
8082
self._register_action("function", self._handle_function_action)
8183

8284
# Wire up function actions
83-
task.set_reached_downstream_filter((FunctionActionFrame,))
85+
task.set_reached_downstream_filter((FunctionActionFrame, BotStoppedSpeakingFrame))
86+
8487
@task.event_handler("on_frame_reached_downstream")
8588
async def on_frame_reached_downstream(task, frame):
8689
if isinstance(frame, FunctionActionFrame):
8790
await frame.function(frame.action, flow_manager)
8891
self.function_finished_event.set()
92+
elif isinstance(frame, BotStoppedSpeakingFrame):
93+
await self._execute_deferred_post_actions()
8994

9095
def _register_action(self, action_type: str, handler: Callable) -> None:
9196
"""Register a handler for a specific action type.
@@ -159,6 +164,25 @@ async def execute_actions(self, actions: Optional[List[ActionConfig]]) -> None:
159164
except Exception as e:
160165
raise ActionError(f"Failed to execute action {action_type}: {str(e)}") from e
161166

167+
def schedule_deferred_post_actions(self, post_actions: List[ActionConfig]) -> None:
168+
"""Schedule "deferred" post-actions to be executed after next LLM completion.
169+
170+
Args:
171+
post_actions: List of actions to execute
172+
"""
173+
self._deferred_post_actions = post_actions
174+
175+
def clear_deferred_post_actions(self) -> None:
176+
"""Clear any scheduled deferred post-actions."""
177+
self._deferred_post_actions = []
178+
179+
async def _execute_deferred_post_actions(self) -> None:
180+
"""Execute deferred post-actions."""
181+
actions = self._deferred_post_actions
182+
self._deferred_post_actions = []
183+
if actions:
184+
await self.execute_actions(actions)
185+
162186
async def _handle_tts_action(self, action: dict) -> None:
163187
"""Built-in handler for TTS actions.
164188
@@ -209,7 +233,7 @@ async def _handle_function_action(self, action: dict) -> None:
209233
if not handler:
210234
logger.error("Function action missing 'handler' field")
211235
return
212-
# the reason we're queueing a frame here is to ensure it happens after bot turn is over in
236+
# the reason we're queueing a frame here is to ensure it happens after bot turn is over in
213237
# post_actions
214238
await self.task.queue_frame(FunctionActionFrame(action=action, function=handler))
215239
await self.function_finished_event.wait()

src/pipecat_flows/manager.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,9 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None:
482482
self._validate_node_config(node_id, node_config)
483483
logger.debug(f"Setting node: {node_id}")
484484

485+
# Clear any deferred post-actions from previous node
486+
self.action_manager.clear_deferred_post_actions()
487+
485488
# Register action handlers from config
486489
for action_list in [
487490
node_config.get("pre_actions", []),
@@ -558,19 +561,28 @@ async def register_function_schema(schema):
558561
self.current_functions = new_functions
559562

560563
# Trigger completion with new context
561-
if self._context_aggregator:
564+
respond_immediately = node_config.get("respond_immediately", True)
565+
if self._context_aggregator and respond_immediately:
562566
await self.task.queue_frames([self._context_aggregator.user().get_context_frame()])
563567

564568
# Execute post-actions if any
565569
if post_actions := node_config.get("post_actions"):
566-
await self._execute_actions(post_actions=post_actions)
570+
if respond_immediately:
571+
await self._execute_actions(post_actions=post_actions)
572+
else:
573+
# Schedule post-actions for execution after first LLM response in this node
574+
print("[pk] Scheduling post-actions for execution after LLM response")
575+
self._schedule_deferred_post_actions(post_actions=post_actions)
567576

568577
logger.debug(f"Successfully set node: {node_id}")
569578

570579
except Exception as e:
571580
logger.error(f"Error setting node {node_id}: {str(e)}")
572581
raise FlowError(f"Failed to set node {node_id}: {str(e)}") from e
573582

583+
def _schedule_deferred_post_actions(self, post_actions: List[ActionConfig]) -> None:
584+
self.action_manager.schedule_deferred_post_actions(post_actions=post_actions)
585+
574586
async def _create_conversation_summary(
575587
self, summary_prompt: str, messages: List[dict]
576588
) -> Optional[str]:

src/pipecat_flows/types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class NodeConfig(NodeConfigRequired, total=False):
228228
pre_actions: Actions to execute before LLM inference
229229
post_actions: Actions to execute after LLM inference
230230
context_strategy: Strategy for updating context during transitions
231+
respond_immediately: Whether to run LLM inference as soon as the node is set (default: True)
231232
232233
Example:
233234
{
@@ -254,6 +255,7 @@ class NodeConfig(NodeConfigRequired, total=False):
254255
pre_actions: List[ActionConfig]
255256
post_actions: List[ActionConfig]
256257
context_strategy: ContextStrategyConfig
258+
respond_immediately: bool
257259

258260

259261
class FlowConfig(TypedDict):

0 commit comments

Comments
 (0)