Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3774.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `broadcasted_sibling_id` field to the base `Frame` class. This field is automatically set by `broadcast_frame()` and `broadcast_frame_instance()` to the ID of the paired frame pushed in the opposite direction, allowing receivers to identify broadcast pairs.
1 change: 1 addition & 0 deletions changelog/3774.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `RTVIObserver` not processing upstream-only frames. Previously, all upstream frames were filtered out to avoid duplicate messages from broadcasted frames. Now only upstream copies of broadcasted frames are skipped.
5 changes: 5 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class Frame:
id: Unique identifier for the frame instance.
name: Human-readable name combining class name and instance count.
pts: Presentation timestamp in nanoseconds.
broadcasted_sibling_id: ID of the paired frame when this frame was
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Searching for the correct past tense, it seems broadcast is more common that broadcasted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, wondering if this should be a string instead. 🤔 . I'm just thinking mainly for Whisker. The id is hard to track, but not the name. For example, we could have id 1376434 but frame name UserSpeakingFrame#56. So, it would be broadcast_sibling_name, WDYT? I'm not sure though.

Copy link
Copy Markdown
Contributor

@filipi87 filipi87 Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that right now this is mostly for debugging, so if we think that using the name would make things easier to debug, I can do that.

However, the id still feels more natural for referencing something than the name. Maybe Whisker could simply create a link between the frames using the ID and retrieve any other information as needed ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed to broadcast_sibling_id, but let me know in case you think we should use the name instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's use id for now.

broadcast in both directions. Set automatically by
``broadcast_frame()`` and ``broadcast_frame_instance()``.
metadata: Dictionary for arbitrary frame metadata.
transport_source: Name of the transport source that created this frame.
transport_destination: Name of the transport destination for this frame.
Expand All @@ -131,6 +134,7 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
broadcasted_sibling_id: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
Expand All @@ -139,6 +143,7 @@ def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.broadcasted_sibling_id: Optional[int] = None
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None
Expand Down
23 changes: 15 additions & 8 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,12 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
downstream_frame = frame_cls(**kwargs)
upstream_frame = frame_cls(**kwargs)
downstream_frame.broadcasted_sibling_id = upstream_frame.id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.
Expand All @@ -812,15 +816,18 @@ async def broadcast_frame_instance(self, frame: Frame):
if not f.init and f.name not in ("id", "name")
}

new_frame = frame_cls(**init_fields)
downstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame)
setattr(downstream_frame, k, v)

new_frame = frame_cls(**init_fields)
upstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
setattr(upstream_frame, k, v)

downstream_frame.broadcasted_sibling_id = upstream_frame.id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
Expand Down
7 changes: 3 additions & 4 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,9 @@ async def on_push_frame(self, data: FramePushed):
frame = data.frame
direction = data.direction

# Only process downstream frames. Some frames are broadcast in both
# directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame),
# and we only want to send one RTVI message per event.
if direction != FrameDirection.DOWNSTREAM:
# For broadcasted frames (pushed in both directions), only process
Comment thread
filipi87 marked this conversation as resolved.
Outdated
# the downstream copy to avoid sending duplicate RTVI messages.
if frame.broadcasted_sibling_id is not None and direction != FrameDirection.DOWNSTREAM:
return

# If we have already seen this frame, let's skip it.
Expand Down
10 changes: 10 additions & 0 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ async def _bot_started_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
downstream_frame.broadcasted_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand All @@ -635,6 +640,11 @@ async def _bot_stopped_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
downstream_frame.broadcasted_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand Down