From eacd2a4b71071dbd9108afa838ba637b0c39c350 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:10:56 -0800 Subject: [PATCH 1/9] FrameProcessor: add broadcast_frame_instance() --- src/pipecat/processors/frame_processor.py | 34 +++++ src/pipecat/serializers/protobuf.py | 2 +- tests/test_frame_processor.py | 166 +++++++++++++++++++++- 3 files changed, 200 insertions(+), 2 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 70f44dfca5..10aafc5a86 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -12,7 +12,9 @@ """ import asyncio +import dataclasses import traceback +from copy import deepcopy from dataclasses import dataclass from enum import Enum from typing import ( @@ -782,6 +784,38 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs): await self.push_frame(frame_cls(**kwargs)) await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) + async def broadcast_frame_instance(self, frame: Frame): + """Broadcasts a frame instance upstream and downstream. + + This method extracts the class and init fields from the given frame + instance and creates two new instances to push upstream and downstream. + + Args: + frame: The frame instance to broadcast. + + Note: + Prefer using `broadcast_frame()` when possible, as it is more + efficient. This method should only be used when you are not the + creator of the frame and need to broadcast an existing instance. + """ + frame_cls = type(frame) + init_fields = {f.name: getattr(frame, f.name) for f in dataclasses.fields(frame) if f.init} + extra_fields = { + f.name: getattr(frame, f.name) + for f in dataclasses.fields(frame) + if not f.init and f.name not in ("id", "name") + } + + new_frame = frame_cls(**deepcopy(init_fields)) + for k, v in deepcopy(extra_fields).items(): + setattr(new_frame, k, v) + await self.push_frame(new_frame) + + new_frame = frame_cls(**deepcopy(init_fields)) + for k, v in deepcopy(extra_fields).items(): + setattr(new_frame, k, v) + await self.push_frame(new_frame, FrameDirection.UPSTREAM) + async def __start(self, frame: StartFrame): """Handle the start frame to initialize processor state. diff --git a/src/pipecat/serializers/protobuf.py b/src/pipecat/serializers/protobuf.py index 6d989c7dd5..912f20b42d 100644 --- a/src/pipecat/serializers/protobuf.py +++ b/src/pipecat/serializers/protobuf.py @@ -126,7 +126,7 @@ async def deserialize(self, data: str | bytes) -> Frame | None: if "pts" in args_dict: del args_dict["pts"] - # Special handling for MessageFrame -> OutputTransportMessageUrgentFrame + # Special handling for MessageFrame -> InputTransportMessageFrame if class_name == MessageFrame: try: msg = json.loads(args_dict["data"]) diff --git a/tests/test_frame_processor.py b/tests/test_frame_processor.py index b3a11ab431..3f8c8ae700 100644 --- a/tests/test_frame_processor.py +++ b/tests/test_frame_processor.py @@ -6,7 +6,8 @@ import asyncio import unittest -from dataclasses import dataclass +from dataclasses import dataclass, field +from typing import List from pipecat.frames.frames import ( DataFrame, @@ -24,6 +25,15 @@ from pipecat.tests.utils import SleepFrame, run_test +@dataclass +class BroadcastTestFrame(DataFrame): + """Test frame with init fields for broadcast testing.""" + + text: str = "" + value: int = 0 + items: List[str] = field(default_factory=list) + + class TestFrameProcessor(unittest.IsolatedAsyncioTestCase): async def test_before_after_events(self): identity = IdentityFilter() @@ -186,3 +196,157 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): frames_to_send=frames_to_send, expected_down_frames=expected_down_frames, ) + + async def test_broadcast_frame(self): + """Test that broadcast_frame creates two separate frames with fresh IDs.""" + downstream_frames: List[Frame] = [] + upstream_frames: List[Frame] = [] + + class BroadcastTestProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, TextFrame): + await self.broadcast_frame( + BroadcastTestFrame, text="hello", value=42, items=["a", "b"] + ) + else: + await self.push_frame(frame, direction) + + class CaptureProcessor(FrameProcessor): + def __init__(self, capture_list: List[Frame], direction: FrameDirection): + super().__init__() + self._capture_list = capture_list + self._capture_direction = direction + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if direction == self._capture_direction and isinstance(frame, BroadcastTestFrame): + self._capture_list.append(frame) + await self.push_frame(frame, direction) + + up_capture = CaptureProcessor(upstream_frames, FrameDirection.UPSTREAM) + broadcaster = BroadcastTestProcessor() + down_capture = CaptureProcessor(downstream_frames, FrameDirection.DOWNSTREAM) + + pipeline = Pipeline([up_capture, broadcaster, down_capture]) + + frames_to_send = [TextFrame(text="trigger")] + expected_down_frames = [BroadcastTestFrame] + expected_up_frames = [BroadcastTestFrame] + + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + expected_up_frames=expected_up_frames, + ) + + # Verify we got one frame in each direction + self.assertEqual(len(downstream_frames), 1) + self.assertEqual(len(upstream_frames), 1) + + down_frame = downstream_frames[0] + up_frame = upstream_frames[0] + + # Verify the frames have different IDs (they are separate instances) + self.assertNotEqual(down_frame.id, up_frame.id) + + # Verify the frames have the correct field values + self.assertEqual(down_frame.text, "hello") + self.assertEqual(down_frame.value, 42) + self.assertEqual(down_frame.items, ["a", "b"]) + self.assertEqual(up_frame.text, "hello") + self.assertEqual(up_frame.value, 42) + self.assertEqual(up_frame.items, ["a", "b"]) + + # Verify the items lists are separate instances (not shared references) + self.assertIsNot(down_frame.items, up_frame.items) + + async def test_broadcast_frame_instance(self): + """Test that broadcast_frame_instance copies all fields except id and name.""" + downstream_frames: List[Frame] = [] + upstream_frames: List[Frame] = [] + original_frame: List[Frame] = [] + + class BroadcastInstanceTestProcessor(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, BroadcastTestFrame): + # Set some non-init fields on the frame + frame.pts = 12345 + frame.metadata = {"key": "value", "nested": {"a": 1}} + original_frame.append(frame) + await self.broadcast_frame_instance(frame) + else: + await self.push_frame(frame, direction) + + class CaptureProcessor(FrameProcessor): + def __init__(self, capture_list: List[Frame], direction: FrameDirection): + super().__init__() + self._capture_list = capture_list + self._capture_direction = direction + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if direction == self._capture_direction and isinstance(frame, BroadcastTestFrame): + self._capture_list.append(frame) + await self.push_frame(frame, direction) + + up_capture = CaptureProcessor(upstream_frames, FrameDirection.UPSTREAM) + broadcaster = BroadcastInstanceTestProcessor() + down_capture = CaptureProcessor(downstream_frames, FrameDirection.DOWNSTREAM) + + pipeline = Pipeline([up_capture, broadcaster, down_capture]) + + # Create a frame with mutable fields to test deep copying + test_frame = BroadcastTestFrame(text="test", value=99, items=["x", "y", "z"]) + + frames_to_send = [test_frame] + expected_down_frames = [BroadcastTestFrame] + expected_up_frames = [BroadcastTestFrame] + + await run_test( + pipeline, + frames_to_send=frames_to_send, + expected_down_frames=expected_down_frames, + expected_up_frames=expected_up_frames, + ) + + # Verify we got one frame in each direction + self.assertEqual(len(downstream_frames), 1) + self.assertEqual(len(upstream_frames), 1) + self.assertEqual(len(original_frame), 1) + + orig = original_frame[0] + down_frame = downstream_frames[0] + up_frame = upstream_frames[0] + + # Verify the frames have different IDs and names (fresh values) + self.assertNotEqual(down_frame.id, orig.id) + self.assertNotEqual(up_frame.id, orig.id) + self.assertNotEqual(down_frame.id, up_frame.id) + self.assertNotEqual(down_frame.name, orig.name) + self.assertNotEqual(up_frame.name, orig.name) + + # Verify init fields are copied correctly + self.assertEqual(down_frame.text, "test") + self.assertEqual(down_frame.value, 99) + self.assertEqual(down_frame.items, ["x", "y", "z"]) + self.assertEqual(up_frame.text, "test") + self.assertEqual(up_frame.value, 99) + self.assertEqual(up_frame.items, ["x", "y", "z"]) + + # Verify non-init fields (except id/name) are copied + self.assertEqual(down_frame.pts, 12345) + self.assertEqual(down_frame.metadata, {"key": "value", "nested": {"a": 1}}) + self.assertEqual(up_frame.pts, 12345) + self.assertEqual(up_frame.metadata, {"key": "value", "nested": {"a": 1}}) + + # Verify mutable fields are deep copied (not shared references) + self.assertIsNot(down_frame.items, orig.items) + self.assertIsNot(up_frame.items, orig.items) + self.assertIsNot(down_frame.items, up_frame.items) + self.assertIsNot(down_frame.metadata, orig.metadata) + self.assertIsNot(up_frame.metadata, orig.metadata) + self.assertIsNot(down_frame.metadata, up_frame.metadata) + self.assertIsNot(down_frame.metadata["nested"], up_frame.metadata["nested"]) From ba0ddb1832a62297c214e4f60fcf5235cf933a04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 18:05:15 -0800 Subject: [PATCH 2/9] FrameProcessor: copy kwargs when broadcasting frame --- src/pipecat/processors/frame_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 10aafc5a86..1ad57e0d6c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -781,14 +781,14 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs): frame_cls: The class of the frame to be broadcasted. **kwargs: Keyword arguments to be passed to the frame's constructor. """ - await self.push_frame(frame_cls(**kwargs)) - await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) + await self.push_frame(frame_cls(**deepcopy(kwargs))) + await self.push_frame(frame_cls(**deepcopy(kwargs)), FrameDirection.UPSTREAM) async def broadcast_frame_instance(self, frame: Frame): """Broadcasts a frame instance upstream and downstream. - This method extracts the class and init fields from the given frame - instance and creates two new instances to push upstream and downstream. + This method creates two new frame instances copying all fields from the + original frame except `id` and `name`, which get fresh values. Args: frame: The frame instance to broadcast. From 62f4708d43a4f393ff386e305ed37304ed48ab57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:12:47 -0800 Subject: [PATCH 3/9] transports: broadcast InputTransportMessageFrame frames --- src/pipecat/transports/daily/transport.py | 5 +++-- src/pipecat/transports/smallwebrtc/transport.py | 3 +-- src/pipecat/transports/websocket/client.py | 3 +++ src/pipecat/transports/websocket/fastapi.py | 3 +++ src/pipecat/transports/websocket/server.py | 4 ++++ 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 3b652b5170..e220ab0d83 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -1728,8 +1728,9 @@ async def push_app_message(self, message: Any, sender: str): message: The message data to send. sender: ID of the message sender. """ - frame = DailyInputTransportMessageFrame(message=message, participant_id=sender) - await self.push_frame(frame) + await self.broadcast_frame_class( + DailyInputTransportMessageFrame, message=message, participant_id=sender + ) # # Audio in diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index e9a07cf406..bcc3c8b799 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -698,8 +698,7 @@ async def push_app_message(self, message: Any): message: The application message to process. """ logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}") - frame = InputTransportMessageFrame(message=message) - await self.push_frame(frame) + await self.broadcast_frame_class(InputTransportMessageFrame, message=message) # Add this method similar to DailyInputTransport.request_participant_image async def request_participant_image(self, frame: UserImageRequestFrame): diff --git a/src/pipecat/transports/websocket/client.py b/src/pipecat/transports/websocket/client.py index 02c891c54b..f55b1f9198 100644 --- a/src/pipecat/transports/websocket/client.py +++ b/src/pipecat/transports/websocket/client.py @@ -27,6 +27,7 @@ EndFrame, Frame, InputAudioRawFrame, + InputTransportMessageFrame, OutputAudioRawFrame, OutputTransportMessageFrame, OutputTransportMessageUrgentFrame, @@ -298,6 +299,8 @@ async def on_message(self, websocket, message): return if isinstance(frame, InputAudioRawFrame) and self._params.audio_in_enabled: await self.push_audio_frame(frame) + elif isinstance(frame, InputTransportMessageFrame): + await self.broadcast_frame(frame) else: await self.push_frame(frame) diff --git a/src/pipecat/transports/websocket/fastapi.py b/src/pipecat/transports/websocket/fastapi.py index e1d02ac00a..a0d02ccec0 100644 --- a/src/pipecat/transports/websocket/fastapi.py +++ b/src/pipecat/transports/websocket/fastapi.py @@ -26,6 +26,7 @@ EndFrame, Frame, InputAudioRawFrame, + InputTransportMessageFrame, InterruptionFrame, OutputAudioRawFrame, OutputTransportMessageFrame, @@ -311,6 +312,8 @@ async def _receive_messages(self): if isinstance(frame, InputAudioRawFrame): await self.push_audio_frame(frame) + elif isinstance(frame, InputTransportMessageFrame): + await self.broadcast_frame(frame) else: await self.push_frame(frame) except Exception as e: diff --git a/src/pipecat/transports/websocket/server.py b/src/pipecat/transports/websocket/server.py index a31ac5487f..acd4faf922 100644 --- a/src/pipecat/transports/websocket/server.py +++ b/src/pipecat/transports/websocket/server.py @@ -25,6 +25,8 @@ EndFrame, Frame, InputAudioRawFrame, + InputTransportMessageFrame, + InputTransportMessageUrgentFrame, InterruptionFrame, OutputAudioRawFrame, OutputTransportMessageFrame, @@ -214,6 +216,8 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol): if isinstance(frame, InputAudioRawFrame): await self.push_audio_frame(frame) + elif isinstance(frame, InputTransportMessageFrame): + await self.broadcast_frame(frame) else: await self.push_frame(frame) except Exception as e: From cc61cdbba37ad191e0049de463485b7ccdb09b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:13:17 -0800 Subject: [PATCH 4/9] RTVIProcessor: add create_rtvi_observer() --- src/pipecat/processors/frameworks/rtvi.py | 12 +++++++++++ src/pipecat/services/google/rtvi.py | 25 ++++++++++++++++++++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 7ea04ecf2b..ee25af839a 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1413,6 +1413,18 @@ def register_service(self, service: RTVIService): self._registered_services[service.name] = service + def create_rtvi_observer(self, *, params: Optional[RTVIObserverParams] = None, **kwargs): + """Creates a new RTVI Observer. + + Args: + params: Settings to enable/disable specific messages. + **kwargs: Additional arguments passed to the observer. + + Returns: + A new RTVI observer. + """ + return RTVIObserver(self, params=params, **kwargs) + async def set_client_ready(self): """Mark the client as ready and trigger the ready event.""" self._client_ready = True diff --git a/src/pipecat/services/google/rtvi.py b/src/pipecat/services/google/rtvi.py index 1ef70d67da..738b0ab9d4 100644 --- a/src/pipecat/services/google/rtvi.py +++ b/src/pipecat/services/google/rtvi.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Google RTVI integration models and observer implementation. +"""Google RTVI processor and observer implementation. This module provides integration with Google's services through the RTVI framework, including models for search responses and an observer for handling Google-specific @@ -16,7 +16,7 @@ from pydantic import BaseModel from pipecat.observers.base_observer import FramePushed -from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor +from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame @@ -86,4 +86,23 @@ async def _handle_llm_search_response_frame(self, frame: LLMSearchResponseFrame) rendered_content=frame.rendered_content, ) ) - await self.push_transport_message_urgent(message) + await self.send_rtvi_message(message) + + +class GoogleRTVIProcessor(RTVIProcessor): + """RTVI processor for Google service integration. + + Creates a specific Google RTVI Observer. + """ + + def create_rtvi_observer(self, *, params: Optional[RTVIObserverParams] = None, **kwargs): + """Creates a new RTVI Observer. + + Args: + params: Settings to enable/disable specific messages. + **kwargs: Additional arguments passed to the observer. + + Returns: + A new RTVI observer. + """ + return GoogleRTVIObserver(self) From e85a00cc0ee435eff8eb931c11aa0bccd018bed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:14:43 -0800 Subject: [PATCH 5/9] PipelineTask: automatically add RTVI processor and RTVI observer If `enable_rtvi` is enabled (enabled by default) and RTVI processor will be added automatically to the pipeline. Also, and RTVI observer will be registered. --- src/pipecat/pipeline/task.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 6d4b4c039b..9a2f1d33df 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -49,6 +49,7 @@ from pipecat.pipeline.task_observer import TaskObserver from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup +from pipecat.processors.frameworks.rtvi import RTVIObserverParams, RTVIProcessor from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver @@ -225,9 +226,12 @@ def __init__( conversation_id: Optional[str] = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, + enable_rtvi: bool = True, idle_timeout_frames: Tuple[Type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame), idle_timeout_secs: Optional[float] = IDLE_TIMEOUT_SECS, observers: Optional[List[BaseObserver]] = None, + rtvi_processor: Optional[RTVIProcessor] = None, + rtvi_observer_params: Optional[RTVIObserverParams] = None, task_manager: Optional[BaseTaskManager] = None, ): """Initialize the PipelineTask. @@ -244,6 +248,7 @@ def __init__( check_dangling_tasks: Whether to check for processors' tasks finishing properly. clock: Clock implementation for timing operations. conversation_id: Optional custom ID for the conversation. + enable_rtvi: Whether to automatically add RTVI support to the pipeline. enable_tracing: Whether to enable tracing. enable_turn_tracking: Whether to enable turn tracking. idle_timeout_frames: A tuple with the frames that should trigger an idle @@ -252,6 +257,8 @@ def __init__( None. If a pipeline is idle the pipeline task will be cancelled automatically. observers: List of observers for monitoring pipeline execution. + rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled. + rtvi_processor: The RTVI processor to add if RTVI is enabled. task_manager: Optional task manager for handling asyncio tasks. """ super().__init__() @@ -306,6 +313,16 @@ def __init__( self._heartbeat_push_task: Optional[asyncio.Task] = None self._heartbeat_monitor_task: Optional[asyncio.Task] = None + # RTVI support + self._rtvi = None + if enable_rtvi: + self._rtvi = rtvi_processor or RTVIProcessor() + observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params)) + + @self.rtvi.event_handler("on_client_ready") + async def on_client_ready(rtvi: RTVIProcessor): + await rtvi.set_bot_ready() + # This is the idle event. When selected frames are pushed from any # processor we consider the pipeline is not idle. We use an observer # which will be listening any part of the pipeline. @@ -335,7 +352,8 @@ def __init__( # allows us to receive and react to downstream frames. source = PipelineSource(self._source_push_frame, name=f"{self}::Source") sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink") - self._pipeline = Pipeline([pipeline], source=source, sink=sink) + processors = [self._rtvi, pipeline] if self._rtvi else [pipeline] + self._pipeline = Pipeline(processors, source=source, sink=sink) # The task observer acts as a proxy to the provided observers. This way, # we only need to pass a single observer (using the StartFrame) which @@ -398,6 +416,17 @@ def turn_trace_observer(self) -> Optional[TurnTraceObserver]: """ return self._turn_trace_observer + @property + def rtvi(self) -> RTVIProcessor: + """Get the RTVI processor if RTVI is enabled. + + Returns: + The RTVI processor added to the pipeline when RTVI is enabled. + """ + if not self._rtvi: + raise Exception(f"{self} RTVI is not enabled.") + return self._rtvi + def event_handler(self, event_name: str): """Decorator for registering event handlers. From 054e50486824a9b18a3c0faa0556d44cfb940d0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:16:51 -0800 Subject: [PATCH 6/9] examples(foundational): remove RTVI (automatically added by PipelineTask) --- .../foundational/07zb-interruptible-inworld-http.py | 5 ----- examples/foundational/07zb-interruptible-inworld.py | 5 ----- examples/foundational/07ze-interruptible-hume.py | 9 --------- examples/foundational/37-mem0.py | 7 +------ examples/foundational/38b-smart-turn-local.py | 7 +------ examples/foundational/46-video-processing.py | 11 ++--------- 6 files changed, 4 insertions(+), 40 deletions(-) diff --git a/examples/foundational/07zb-interruptible-inworld-http.py b/examples/foundational/07zb-interruptible-inworld-http.py index b2bf056605..c67984f7c7 100644 --- a/examples/foundational/07zb-interruptible-inworld-http.py +++ b/examples/foundational/07zb-interruptible-inworld-http.py @@ -23,7 +23,6 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService @@ -93,12 +92,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - rtvi = RTVIProcessor() - pipeline = Pipeline( [ transport.input(), - rtvi, stt, user_aggregator, llm, @@ -115,7 +111,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), observers=[ - RTVIObserver(rtvi), DebugLogObserver( frame_types={ TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE), diff --git a/examples/foundational/07zb-interruptible-inworld.py b/examples/foundational/07zb-interruptible-inworld.py index 8d3d351a5d..46e07dc5f2 100644 --- a/examples/foundational/07zb-interruptible-inworld.py +++ b/examples/foundational/07zb-interruptible-inworld.py @@ -22,7 +22,6 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService @@ -88,12 +87,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - pipeline = Pipeline( [ transport.input(), - rtvi, stt, user_aggregator, llm, @@ -110,7 +106,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), observers=[ - RTVIObserver(rtvi), DebugLogObserver( frame_types={ TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE), diff --git a/examples/foundational/07ze-interruptible-hume.py b/examples/foundational/07ze-interruptible-hume.py index 33ea4d278a..e9a8e355c9 100644 --- a/examples/foundational/07ze-interruptible-hume.py +++ b/examples/foundational/07ze-interruptible-hume.py @@ -22,7 +22,6 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService @@ -90,12 +89,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - pipeline = Pipeline( [ transport.input(), # Transport user input - rtvi, stt, user_aggregator, # User responses llm, # LLM @@ -114,7 +110,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, observers=[ - RTVIObserver(rtvi), DebugLogObserver( frame_types={ TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE), @@ -123,10 +118,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ], ) - @rtvi.event_handler("on_client_ready") - async def on_client_ready(rtvi): - await rtvi.set_bot_ready() - @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected") diff --git a/examples/foundational/37-mem0.py b/examples/foundational/37-mem0.py index c82d39edcf..08a739c0ab 100644 --- a/examples/foundational/37-mem0.py +++ b/examples/foundational/37-mem0.py @@ -59,7 +59,6 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService @@ -255,12 +254,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ), ) - rtvi = RTVIProcessor(config=RTVIConfig(config=[])) pipeline = Pipeline( [ transport.input(), - rtvi, stt, user_aggregator, memory, @@ -278,12 +275,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_usage_metrics=True, ), idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - observers=[RTVIObserver(rtvi)], ) - @rtvi.event_handler("on_client_ready") + @task.rtvi.event_handler("on_client_ready") async def on_client_ready(rtvi): - await rtvi.set_bot_ready() # Get personalized greeting based on user memories. Can pass agent_id and run_id as per requirement of the application to manage short term memory or agent specific memory. greeting = await get_initial_greeting( memory_client=memory.memory_client, user_id=USER_ID, agent_id=None, run_id=None diff --git a/examples/foundational/38b-smart-turn-local.py b/examples/foundational/38b-smart-turn-local.py index 37112de4d6..85a3206310 100644 --- a/examples/foundational/38b-smart-turn-local.py +++ b/examples/foundational/38b-smart-turn-local.py @@ -22,7 +22,6 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -87,8 +86,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - rtvi = RTVIProcessor() - pipeline = Pipeline( [ transport.input(), # Transport user input @@ -108,13 +105,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): enable_metrics=True, enable_usage_metrics=True, ), - observers=[RTVIObserver(rtvi)], idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @rtvi.event_handler("on_client_ready") + @task.rtvi.event_handler("on_client_ready") async def on_client_ready(rtvi): - await rtvi.set_bot_ready() # Kick off the conversation messages.append({"role": "system", "content": "Please introduce yourself to the user."}) await task.queue_frames([LLMRunFrame()]) diff --git a/examples/foundational/46-video-processing.py b/examples/foundational/46-video-processing.py index c35843eab5..bb3d57f048 100644 --- a/examples/foundational/46-video-processing.py +++ b/examples/foundational/46-video-processing.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2025, Daily +# Copyright (c) 2024-2026, Daily # # SPDX-License-Identifier: BSD 2-Clause License # @@ -22,7 +22,6 @@ LLMUserAggregatorParams, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService @@ -125,14 +124,10 @@ async def run_bot(pipecat_transport): ), ) - # RTVI events for Pipecat client UI - rtvi = RTVIProcessor() - pipeline = Pipeline( [ pipecat_transport.input(), user_aggregator, - rtvi, llm, # LLM EdgeDetectionProcessor( pipecat_transport._params.video_out_width, @@ -149,13 +144,11 @@ async def run_bot(pipecat_transport): enable_metrics=True, enable_usage_metrics=True, ), - observers=[RTVIObserver(rtvi)], ) - @rtvi.event_handler("on_client_ready") + @task.rtvi.event_handler("on_client_ready") async def on_client_ready(rtvi): logger.info("Pipecat client ready.") - await rtvi.set_bot_ready() # Kick off the conversation. await task.queue_frames([LLMRunFrame()]) From 124a3c35afa0c52e1dedb543790de7d6f34919e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:25:25 -0800 Subject: [PATCH 7/9] RTVIObserver: don't handle some frames direction --- src/pipecat/processors/frameworks/rtvi.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index ee25af839a..87ad556d9c 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1100,13 +1100,11 @@ async def on_push_frame(self, data: FramePushed): if ( isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame)) - and (direction == FrameDirection.DOWNSTREAM) and self._params.user_speaking_enabled ): await self._handle_interruptions(frame) elif ( isinstance(frame, (BotStartedSpeakingFrame, BotStoppedSpeakingFrame)) - and (direction == FrameDirection.UPSTREAM) and self._params.bot_speaking_enabled ): await self._handle_bot_speaking(frame) From 0ee11ad3335743dfe9b01c4a1798aee89874fd68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:31:04 -0800 Subject: [PATCH 8/9] tests: disable RTVI in tests by default --- src/pipecat/tests/utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/pipecat/tests/utils.py b/src/pipecat/tests/utils.py index ac5739829a..356fb85450 100644 --- a/src/pipecat/tests/utils.py +++ b/src/pipecat/tests/utils.py @@ -123,9 +123,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): async def run_test( processor: FrameProcessor, *, - frames_to_send: Sequence[Frame], + enable_rtvi: bool = False, expected_down_frames: Optional[Sequence[type]] = None, expected_up_frames: Optional[Sequence[type]] = None, + frames_to_send: Sequence[Frame], ignore_start: bool = True, observers: Optional[List[BaseObserver]] = None, pipeline_params: Optional[PipelineParams] = None, @@ -139,9 +140,10 @@ async def run_test( Args: processor: The frame processor to test. - frames_to_send: Sequence of frames to send through the processor. + enable_rtvi: Whether RTVI should be enabled in this test. expected_down_frames: Expected frame types flowing downstream (optional). expected_up_frames: Expected frame types flowing upstream (optional). + frames_to_send: Sequence of frames to send through the processor. ignore_start: Whether to ignore StartFrames in frame validation. observers: Optional list of observers to attach to the pipeline. pipeline_params: Optional pipeline parameters. @@ -173,9 +175,10 @@ async def run_test( task = PipelineTask( pipeline, - params=pipeline_params, - observers=observers, cancel_on_idle_timeout=False, + enable_rtvi=enable_rtvi, + observers=observers, + params=pipeline_params, ) async def push_frames(): From 9e8f8b45c682633ac354149f5c4ac6f93fc33925 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 21 Jan 2026 14:39:57 -0800 Subject: [PATCH 9/9] added changelog files for #3519 --- changelog/3519.added.2.md | 1 + changelog/3519.added.3.md | 1 + changelog/3519.added.md | 1 + changelog/3519.fixed.2.md | 1 + changelog/3519.fixed.md | 1 + 5 files changed, 5 insertions(+) create mode 100644 changelog/3519.added.2.md create mode 100644 changelog/3519.added.3.md create mode 100644 changelog/3519.added.md create mode 100644 changelog/3519.fixed.2.md create mode 100644 changelog/3519.fixed.md diff --git a/changelog/3519.added.2.md b/changelog/3519.added.2.md new file mode 100644 index 0000000000..03e2372ad9 --- /dev/null +++ b/changelog/3519.added.2.md @@ -0,0 +1 @@ +- Added `RTVIProcessor.create_rtvi_observer()` factory method for creating RTVI observers. diff --git a/changelog/3519.added.3.md b/changelog/3519.added.3.md new file mode 100644 index 0000000000..7ea1e638ce --- /dev/null +++ b/changelog/3519.added.3.md @@ -0,0 +1 @@ +- Added `FrameProcessor.broadcast_frame_instance(frame)` method to broadcast a frame instance by extracting its fields and creating new instances for each direction. diff --git a/changelog/3519.added.md b/changelog/3519.added.md new file mode 100644 index 0000000000..5ed2bd5221 --- /dev/null +++ b/changelog/3519.added.md @@ -0,0 +1 @@ +- `PipelineTask` now automatically adds `RTVIProcessor` and registers `RTVIObserver` when `enable_rtvi=True` (default), simplifying pipeline setup. diff --git a/changelog/3519.fixed.2.md b/changelog/3519.fixed.2.md new file mode 100644 index 0000000000..bcd384f7fb --- /dev/null +++ b/changelog/3519.fixed.2.md @@ -0,0 +1 @@ +- Fixed `FrameProcessor.broadcast_frame()` to deep copy kwargs, preventing shared mutable references between the downstream and upstream frame instances. diff --git a/changelog/3519.fixed.md b/changelog/3519.fixed.md new file mode 100644 index 0000000000..cabaa5a3e8 --- /dev/null +++ b/changelog/3519.fixed.md @@ -0,0 +1 @@ +- Transports now properly broadcast `InputTransportMessageFrame` frames both upstream and downstream instead of only pushing downstream.