1515import dataclasses
1616import time
1717import uuid
18+ from collections .abc import Coroutine
1819from dataclasses import dataclass
19- from typing import Coroutine , Optional , Union
2020
2121from loguru import logger
2222from pipecat .frames .frames import (
@@ -83,7 +83,7 @@ class AgentActivationArgs:
8383 metadata: Optional structured data passed during activation.
8484 """
8585
86- metadata : Optional [ dict ] = None
86+ metadata : dict | None = None
8787
8888 @classmethod
8989 def from_dict (cls , data : dict ) -> "AgentActivationArgs" :
@@ -118,7 +118,7 @@ def __init__(
118118 agent : "BaseAgent" ,
119119 direction : FrameDirection ,
120120 bridges : tuple [str , ...] = (),
121- exclude_frames : Optional [ tuple [type [Frame ], ...]] = None ,
121+ exclude_frames : tuple [type [Frame ], ...] | None = None ,
122122 ** kwargs ,
123123 ):
124124 super ().__init__ (** kwargs )
@@ -240,8 +240,8 @@ def __init__(
240240 * ,
241241 bus : AgentBus ,
242242 active : bool = True ,
243- bridged : Optional [ tuple [str , ...]] = None ,
244- exclude_frames : Optional [ tuple [type [Frame ], ...]] = None ,
243+ bridged : tuple [str , ...] | None = None ,
244+ exclude_frames : tuple [type [Frame ], ...] | None = None ,
245245 ):
246246 """Initialize the BaseAgent.
247247
@@ -268,21 +268,21 @@ def __init__(
268268 # starts, then on_activated fires.
269269 self ._active = active
270270 self ._pending_activation = active
271- self ._activation_args : Optional [ dict ] = None
271+ self ._activation_args : dict | None = None
272272
273273 # Agent lifecycle. Parent/children form a tree. The pipeline task
274274 # runs the agent's pipeline. Finished is set when the agent stops.
275- self ._parent : Optional [ str ] = None
276- self ._children : list [" BaseAgent" ] = []
277- self ._pipeline_task : Optional [ PipelineTask ] = None
275+ self ._parent : str | None = None
276+ self ._children : list [BaseAgent ] = []
277+ self ._pipeline_task : PipelineTask | None = None
278278 self ._pipeline_started = False
279- self ._started_at : Optional [ float ] = None
279+ self ._started_at : float | None = None
280280 self ._finished : asyncio .Event = asyncio .Event ()
281281
282282 # Shared infrastructure, set by the runner via set_registry()
283283 # and set_task_manager().
284- self ._registry : Optional [ AgentRegistry ] = None
285- self ._task_manager : Optional [ TaskManager ] = None
284+ self ._registry : AgentRegistry | None = None
285+ self ._task_manager : TaskManager | None = None
286286
287287 # Task coordination. Worker state tracks active task requests
288288 # keyed by task_id, supporting multiple tasks in flight
@@ -332,17 +332,17 @@ def active(self) -> bool:
332332 return self ._active
333333
334334 @property
335- def activation_args (self ) -> Optional [ dict ] :
335+ def activation_args (self ) -> dict | None :
336336 """The arguments from the most recent activation, or None if inactive."""
337337 return self ._activation_args
338338
339339 @property
340- def parent (self ) -> Optional [ str ] :
340+ def parent (self ) -> str | None :
341341 """The name of the parent agent, or None if this is a root agent."""
342342 return self ._parent
343343
344344 @property
345- def registry (self ) -> Optional [ AgentRegistry ] :
345+ def registry (self ) -> AgentRegistry | None :
346346 """The shared agent registry, if set by a runner."""
347347 return self ._registry
348348
@@ -357,7 +357,7 @@ def ready(self) -> bool:
357357 return self ._pipeline_started
358358
359359 @property
360- def started_at (self ) -> Optional [ float ] :
360+ def started_at (self ) -> float | None :
361361 """Unix timestamp when this agent became ready, or None if not yet started."""
362362 return self ._started_at
363363
@@ -396,7 +396,7 @@ def set_registry(self, registry: AgentRegistry) -> None:
396396 self ._registry = registry
397397
398398 @property
399- def task_manager (self ) -> Optional [ TaskManager ] :
399+ def task_manager (self ) -> TaskManager | None :
400400 """The shared task manager for asyncio task creation."""
401401 return self ._task_manager
402402
@@ -466,7 +466,7 @@ async def on_error(self, error: str, fatal: bool) -> None:
466466 """
467467 pass
468468
469- async def on_activated (self , args : Optional [ dict ] ) -> None :
469+ async def on_activated (self , args : dict | None ) -> None :
470470 """Called when this agent is activated.
471471
472472 Override in subclasses to react to activation.
@@ -716,7 +716,7 @@ async def on_pipeline_finished(task, frame):
716716
717717 return task
718718
719- async def end (self , * , reason : Optional [ str ] = None ) -> None :
719+ async def end (self , * , reason : str | None = None ) -> None :
720720 """Request a graceful end of the session.
721721
722722 Args:
@@ -802,7 +802,7 @@ async def activate_agent(
802802 self ,
803803 agent_name : str ,
804804 * ,
805- args : Optional [ AgentActivationArgs ] = None ,
805+ args : AgentActivationArgs | None = None ,
806806 ) -> None :
807807 """Activate an agent by name.
808808
@@ -834,7 +834,7 @@ async def handoff_to(
834834 self ,
835835 agent_name : str ,
836836 * ,
837- activation_args : Optional [ AgentActivationArgs ] = None ,
837+ activation_args : AgentActivationArgs | None = None ,
838838 ) -> None :
839839 """Hand off to another agent.
840840
@@ -868,9 +868,9 @@ async def request_task(
868868 self ,
869869 agent_name : str ,
870870 * ,
871- name : Optional [ str ] = None ,
872- payload : Optional [ dict ] = None ,
873- timeout : Optional [ float ] = None ,
871+ name : str | None = None ,
872+ payload : dict | None = None ,
873+ timeout : float | None = None ,
874874 ) -> str :
875875 """Send a task request to a single agent (fire-and-forget).
876876
@@ -903,9 +903,9 @@ def task(
903903 self ,
904904 agent_name : str ,
905905 * ,
906- name : Optional [ str ] = None ,
907- payload : Optional [ dict ] = None ,
908- timeout : Optional [ float ] = None ,
906+ name : str | None = None ,
907+ payload : dict | None = None ,
908+ timeout : float | None = None ,
909909 ) -> TaskContext :
910910 """Create a single-agent task context manager.
911911
@@ -944,7 +944,7 @@ def task(
944944 timeout = timeout ,
945945 )
946946
947- async def cancel_task (self , task_id : str , * , reason : Optional [ str ] = None ) -> None :
947+ async def cancel_task (self , task_id : str , * , reason : str | None = None ) -> None :
948948 """Cancel a running task group.
949949
950950 Args:
@@ -966,9 +966,9 @@ async def cancel_task(self, task_id: str, *, reason: Optional[str] = None) -> No
966966 async def request_task_group (
967967 self ,
968968 * agent_names : str ,
969- name : Optional [ str ] = None ,
970- payload : Optional [ dict ] = None ,
971- timeout : Optional [ float ] = None ,
969+ name : str | None = None ,
970+ payload : dict | None = None ,
971+ timeout : float | None = None ,
972972 cancel_on_error : bool = True ,
973973 ) -> str :
974974 """Send a task request to multiple agents (fire-and-forget).
@@ -1009,9 +1009,9 @@ async def request_task_group(
10091009 def task_group (
10101010 self ,
10111011 * agent_names : str ,
1012- name : Optional [ str ] = None ,
1013- payload : Optional [ dict ] = None ,
1014- timeout : Optional [ float ] = None ,
1012+ name : str | None = None ,
1013+ payload : dict | None = None ,
1014+ timeout : float | None = None ,
10151015 cancel_on_error : bool = True ,
10161016 ) -> TaskGroupContext :
10171017 """Create a task group context manager.
@@ -1076,7 +1076,7 @@ async def request_task_update(self, task_id: str, agent_name: str) -> None:
10761076 async def send_task_response (
10771077 self ,
10781078 task_id : str ,
1079- response : Optional [ dict ] = None ,
1079+ response : dict | None = None ,
10801080 * ,
10811081 status : TaskStatus = TaskStatus .COMPLETED ,
10821082 urgent : bool = False ,
@@ -1111,7 +1111,7 @@ async def send_task_response(
11111111 self ._active_tasks .pop (task_id , None )
11121112
11131113 async def send_task_update (
1114- self , task_id : str , update : Optional [ dict ] = None , * , urgent : bool = False
1114+ self , task_id : str , update : dict | None = None , * , urgent : bool = False
11151115 ) -> None :
11161116 """Send a progress update to the requester.
11171117
@@ -1137,7 +1137,7 @@ async def send_task_update(
11371137 )
11381138 )
11391139
1140- async def send_task_stream_start (self , task_id : str , data : Optional [ dict ] = None ) -> None :
1140+ async def send_task_stream_start (self , task_id : str , data : dict | None = None ) -> None :
11411141 """Begin streaming task results back to the requester.
11421142
11431143 Args:
@@ -1159,7 +1159,7 @@ async def send_task_stream_start(self, task_id: str, data: Optional[dict] = None
11591159 )
11601160 )
11611161
1162- async def send_task_stream_data (self , task_id : str , data : Optional [ dict ] = None ) -> None :
1162+ async def send_task_stream_data (self , task_id : str , data : dict | None = None ) -> None :
11631163 """Send a streaming chunk to the requester.
11641164
11651165 Args:
@@ -1181,7 +1181,7 @@ async def send_task_stream_data(self, task_id: str, data: Optional[dict] = None)
11811181 )
11821182 )
11831183
1184- async def send_task_stream_end (self , task_id : str , data : Optional [ dict ] = None ) -> None :
1184+ async def send_task_stream_end (self , task_id : str , data : dict | None = None ) -> None :
11851185 """End the current stream and mark this agent's task as complete.
11861186
11871187 Args:
@@ -1282,7 +1282,7 @@ def _create_task_group(
12821282 self ,
12831283 agent_names : list [str ],
12841284 * ,
1285- timeout : Optional [ float ] = None ,
1285+ timeout : float | None = None ,
12861286 cancel_on_error : bool = True ,
12871287 ) -> TaskGroup :
12881288 task_id = str (uuid .uuid4 ())
@@ -1326,9 +1326,9 @@ async def create_task_group_and_request_task(
13261326 self ,
13271327 agent_names : list [str ],
13281328 * ,
1329- name : Optional [ str ] = None ,
1330- payload : Optional [ dict ] = None ,
1331- timeout : Optional [ float ] = None ,
1329+ name : str | None = None ,
1330+ payload : dict | None = None ,
1331+ timeout : float | None = None ,
13321332 cancel_on_error : bool = True ,
13331333 ) -> TaskGroup :
13341334 """Wait for agents to be ready, create a task group, and send requests.
@@ -1356,7 +1356,7 @@ async def create_task_group_and_request_task(
13561356 all_ready = await self ._wait_agents_ready (agent_names )
13571357 try :
13581358 await asyncio .wait_for (all_ready , timeout = timeout )
1359- except asyncio . TimeoutError :
1359+ except TimeoutError :
13601360 raise TaskGroupError ("agents not ready within timeout" )
13611361
13621362 group = self ._create_task_group (
@@ -1374,8 +1374,8 @@ async def _send_task_request(
13741374 self ,
13751375 agent_name : str ,
13761376 task_id : str ,
1377- task_name : Optional [ str ] = None ,
1378- payload : Optional [ dict ] = None ,
1377+ task_name : str | None = None ,
1378+ payload : dict | None = None ,
13791379 ) -> None :
13801380 await self .send_message (
13811381 BusTaskRequestMessage (
@@ -1395,7 +1395,7 @@ async def _task_timeout(self, task_id: str, timeout: float) -> None:
13951395 await self .cancel_task (task_id , reason = "timeout" )
13961396
13971397 async def _handle_agent_error (
1398- self , message : Union [ BusAgentErrorMessage , BusAgentLocalErrorMessage ]
1398+ self , message : BusAgentErrorMessage | BusAgentLocalErrorMessage
13991399 ) -> None :
14001400 """Handle an error reported by a child or remote agent."""
14011401 child_names = {child .name for child in self ._children }
@@ -1499,7 +1499,7 @@ async def _run_task_handler(self, task_id: str, handler, message) -> None:
14991499 self ._task_handler_tasks .pop (task_id , None )
15001500
15011501 async def _handle_task_response (
1502- self , message : Union [ BusTaskResponseMessage , BusTaskResponseUrgentMessage ]
1502+ self , message : BusTaskResponseMessage | BusTaskResponseUrgentMessage
15031503 ) -> None :
15041504 """Handle a task response and track group completion."""
15051505 await self .on_task_response (message )
@@ -1518,7 +1518,7 @@ async def _handle_task_response(
15181518 await self ._track_task_group_response (message .task_id , message .source , message .response )
15191519
15201520 async def _handle_task_update (
1521- self , message : Union [ BusTaskUpdateMessage , BusTaskUpdateUrgentMessage ]
1521+ self , message : BusTaskUpdateMessage | BusTaskUpdateUrgentMessage
15221522 ) -> None :
15231523 """Handle a task progress update."""
15241524 await self .on_task_update (message )
@@ -1583,7 +1583,7 @@ def _push_task_group_event(self, task_id: str, event: TaskGroupEvent) -> None:
15831583 group .event_queue .put_nowait (event )
15841584
15851585 async def _track_task_group_response (
1586- self , task_id : str , source : str , response : Optional [ dict ]
1586+ self , task_id : str , source : str , response : dict | None
15871587 ) -> None :
15881588 """Record a task agent's response and fire completion when all have responded."""
15891589 group = self ._task_groups .get (task_id )
0 commit comments