Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3774.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `broadcast_sibling_id` field to the base `Frame` class. This field is automatically set by `broadcast_frame()` and `broadcast_frame_instance()` to the ID of the paired frame pushed in the opposite direction, allowing receivers to identify broadcast pairs.
1 change: 1 addition & 0 deletions changelog/3774.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `RTVIObserver` not processing upstream-only frames. Previously, all upstream frames were filtered out to avoid duplicate messages from broadcasted frames. Now only upstream copies of broadcasted frames are skipped.
5 changes: 5 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class Frame:
id: Unique identifier for the frame instance.
name: Human-readable name combining class name and instance count.
pts: Presentation timestamp in nanoseconds.
broadcast_sibling_id: ID of the paired frame when this frame was
broadcast in both directions. Set automatically by
``broadcast_frame()`` and ``broadcast_frame_instance()``.
metadata: Dictionary for arbitrary frame metadata.
transport_source: Name of the transport source that created this frame.
transport_destination: Name of the transport destination for this frame.
Expand All @@ -131,6 +134,7 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
broadcast_sibling_id: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
Expand All @@ -139,6 +143,7 @@ def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.broadcast_sibling_id: Optional[int] = None
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None
Expand Down
23 changes: 15 additions & 8 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,12 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
downstream_frame = frame_cls(**kwargs)
upstream_frame = frame_cls(**kwargs)
downstream_frame.broadcast_sibling_id = upstream_frame.id
upstream_frame.broadcast_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.
Expand All @@ -812,15 +816,18 @@ async def broadcast_frame_instance(self, frame: Frame):
if not f.init and f.name not in ("id", "name")
}

new_frame = frame_cls(**init_fields)
downstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame)
setattr(downstream_frame, k, v)

new_frame = frame_cls(**init_fields)
upstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
setattr(upstream_frame, k, v)

downstream_frame.broadcast_sibling_id = upstream_frame.id
upstream_frame.broadcast_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
Expand Down
7 changes: 3 additions & 4 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,9 @@ async def on_push_frame(self, data: FramePushed):
frame = data.frame
direction = data.direction

# Only process downstream frames. Some frames are broadcast in both
# directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame),
# and we only want to send one RTVI message per event.
if direction != FrameDirection.DOWNSTREAM:
# For broadcast frames (pushed in both directions), only process
# the downstream copy to avoid sending duplicate RTVI messages.
if frame.broadcast_sibling_id is not None and direction != FrameDirection.DOWNSTREAM:
return

# If we have already seen this frame, let's skip it.
Expand Down
10 changes: 10 additions & 0 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ async def _bot_started_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcast_sibling_id = downstream_frame.id
downstream_frame.broadcast_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand All @@ -635,6 +640,11 @@ async def _bot_stopped_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcast_sibling_id = downstream_frame.id
downstream_frame.broadcast_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand Down