Skip to content
1 change: 1 addition & 0 deletions changelog/3519.added.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `RTVIProcessor.create_rtvi_observer()` factory method for creating RTVI observers.
1 change: 1 addition & 0 deletions changelog/3519.added.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `FrameProcessor.broadcast_frame_instance(frame)` method to broadcast a frame instance by extracting its fields and creating new instances for each direction.
1 change: 1 addition & 0 deletions changelog/3519.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `PipelineTask` now automatically adds `RTVIProcessor` and registers `RTVIObserver` when `enable_rtvi=True` (default), simplifying pipeline setup.
1 change: 1 addition & 0 deletions changelog/3519.fixed.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `FrameProcessor.broadcast_frame()` to deep copy kwargs, preventing shared mutable references between the downstream and upstream frame instances.
1 change: 1 addition & 0 deletions changelog/3519.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Transports now properly broadcast `InputTransportMessageFrame` frames both upstream and downstream instead of only pushing downstream.
5 changes: 0 additions & 5 deletions examples/foundational/07zb-interruptible-inworld-http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
Expand Down Expand Up @@ -93,12 +92,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)

rtvi = RTVIProcessor()

pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
llm,
Expand All @@ -115,7 +111,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
Expand Down
5 changes: 0 additions & 5 deletions examples/foundational/07zb-interruptible-inworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
Expand Down Expand Up @@ -88,12 +87,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
llm,
Expand All @@ -110,7 +106,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
Expand Down
9 changes: 0 additions & 9 deletions examples/foundational/07ze-interruptible-hume.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
Expand Down Expand Up @@ -90,12 +89,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
user_aggregator, # User responses
llm, # LLM
Expand All @@ -114,7 +110,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
Expand All @@ -123,10 +118,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
)

@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
Expand Down
7 changes: 1 addition & 6 deletions examples/foundational/37-mem0.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
Expand Down Expand Up @@ -255,12 +254,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
),
)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
memory,
Expand All @@ -278,12 +275,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
)

@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Get personalized greeting based on user memories. Can pass agent_id and run_id as per requirement of the application to manage short term memory or agent specific memory.
greeting = await get_initial_greeting(
memory_client=memory.memory_client, user_id=USER_ID, agent_id=None, run_id=None
Expand Down
7 changes: 1 addition & 6 deletions examples/foundational/38b-smart-turn-local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
Expand Down Expand Up @@ -87,8 +86,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)

rtvi = RTVIProcessor()

pipeline = Pipeline(
[
transport.input(), # Transport user input
Expand All @@ -108,13 +105,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)

@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
Expand Down
11 changes: 2 additions & 9 deletions examples/foundational/46-video-processing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2025, Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
Expand All @@ -22,7 +22,6 @@
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
Expand Down Expand Up @@ -125,14 +124,10 @@ async def run_bot(pipecat_transport):
),
)

# RTVI events for Pipecat client UI
rtvi = RTVIProcessor()

pipeline = Pipeline(
[
pipecat_transport.input(),
user_aggregator,
rtvi,
llm, # LLM
EdgeDetectionProcessor(
pipecat_transport._params.video_out_width,
Expand All @@ -149,13 +144,11 @@ async def run_bot(pipecat_transport):
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)

@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
logger.info("Pipecat client ready.")
await rtvi.set_bot_ready()
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])

Expand Down
31 changes: 30 additions & 1 deletion src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.processors.frameworks.rtvi import RTVIObserverParams, RTVIProcessor
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
Expand Down Expand Up @@ -225,9 +226,12 @@ def __init__(
conversation_id: Optional[str] = None,
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_rtvi: bool = True,
idle_timeout_frames: Tuple[Type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame),
idle_timeout_secs: Optional[float] = IDLE_TIMEOUT_SECS,
observers: Optional[List[BaseObserver]] = None,
rtvi_processor: Optional[RTVIProcessor] = None,
rtvi_observer_params: Optional[RTVIObserverParams] = None,
task_manager: Optional[BaseTaskManager] = None,
):
"""Initialize the PipelineTask.
Expand All @@ -244,6 +248,7 @@ def __init__(
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
clock: Clock implementation for timing operations.
conversation_id: Optional custom ID for the conversation.
enable_rtvi: Whether to automatically add RTVI support to the pipeline.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
idle_timeout_frames: A tuple with the frames that should trigger an idle
Expand All @@ -252,6 +257,8 @@ def __init__(
None. If a pipeline is idle the pipeline task will be cancelled
automatically.
observers: List of observers for monitoring pipeline execution.
rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor: The RTVI processor to add if RTVI is enabled.
task_manager: Optional task manager for handling asyncio tasks.
"""
super().__init__()
Expand Down Expand Up @@ -306,6 +313,16 @@ def __init__(
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None

# RTVI support
self._rtvi = None
if enable_rtvi:
self._rtvi = rtvi_processor or RTVIProcessor()
observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params))

@self.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi: RTVIProcessor):
await rtvi.set_bot_ready()

# This is the idle event. When selected frames are pushed from any
# processor we consider the pipeline is not idle. We use an observer
# which will be listening any part of the pipeline.
Expand Down Expand Up @@ -335,7 +352,8 @@ def __init__(
# allows us to receive and react to downstream frames.
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
self._pipeline = Pipeline([pipeline], source=source, sink=sink)
processors = [self._rtvi, pipeline] if self._rtvi else [pipeline]
self._pipeline = Pipeline(processors, source=source, sink=sink)

# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
Expand Down Expand Up @@ -398,6 +416,17 @@ def turn_trace_observer(self) -> Optional[TurnTraceObserver]:
"""
return self._turn_trace_observer

@property
def rtvi(self) -> RTVIProcessor:
"""Get the RTVI processor if RTVI is enabled.

Returns:
The RTVI processor added to the pipeline when RTVI is enabled.
"""
if not self._rtvi:
raise Exception(f"{self} RTVI is not enabled.")
return self._rtvi

def event_handler(self, event_name: str):
"""Decorator for registering event handlers.

Expand Down
38 changes: 36 additions & 2 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
"""

import asyncio
import dataclasses
import traceback
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from typing import (
Expand Down Expand Up @@ -779,8 +781,40 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
await self.push_frame(frame_cls(**deepcopy(kwargs)))
await self.push_frame(frame_cls(**deepcopy(kwargs)), FrameDirection.UPSTREAM)

async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.

This method creates two new frame instances copying all fields from the
original frame except `id` and `name`, which get fresh values.

Args:
frame: The frame instance to broadcast.

Note:
Prefer using `broadcast_frame()` when possible, as it is more
efficient. This method should only be used when you are not the
creator of the frame and need to broadcast an existing instance.
"""
frame_cls = type(frame)
init_fields = {f.name: getattr(frame, f.name) for f in dataclasses.fields(frame) if f.init}
extra_fields = {
f.name: getattr(frame, f.name)
for f in dataclasses.fields(frame)
if not f.init and f.name not in ("id", "name")
}

new_frame = frame_cls(**deepcopy(init_fields))
for k, v in deepcopy(extra_fields).items():
setattr(new_frame, k, v)
await self.push_frame(new_frame)

new_frame = frame_cls(**deepcopy(init_fields))
for k, v in deepcopy(extra_fields).items():
setattr(new_frame, k, v)
await self.push_frame(new_frame, FrameDirection.UPSTREAM)

async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
Expand Down
14 changes: 12 additions & 2 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,13 +1100,11 @@ async def on_push_frame(self, data: FramePushed):

if (
isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame))
and (direction == FrameDirection.DOWNSTREAM)
and self._params.user_speaking_enabled
):
await self._handle_interruptions(frame)
elif (
isinstance(frame, (BotStartedSpeakingFrame, BotStoppedSpeakingFrame))
and (direction == FrameDirection.UPSTREAM)
and self._params.bot_speaking_enabled
):
await self._handle_bot_speaking(frame)
Expand Down Expand Up @@ -1413,6 +1411,18 @@ def register_service(self, service: RTVIService):

self._registered_services[service.name] = service

def create_rtvi_observer(self, *, params: Optional[RTVIObserverParams] = None, **kwargs):
"""Creates a new RTVI Observer.

Args:
params: Settings to enable/disable specific messages.
**kwargs: Additional arguments passed to the observer.

Returns:
A new RTVI observer.
"""
return RTVIObserver(self, params=params, **kwargs)

async def set_client_ready(self):
"""Mark the client as ready and trigger the ready event."""
self._client_ready = True
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/serializers/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def deserialize(self, data: str | bytes) -> Frame | None:
if "pts" in args_dict:
del args_dict["pts"]

# Special handling for MessageFrame -> OutputTransportMessageUrgentFrame
# Special handling for MessageFrame -> InputTransportMessageFrame
if class_name == MessageFrame:
try:
msg = json.loads(args_dict["data"])
Expand Down
Loading