Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3774.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `broadcast_sibling_id` field to the base `Frame` class. This field is automatically set by `broadcast_frame()` and `broadcast_frame_instance()` to the ID of the paired frame pushed in the opposite direction, allowing receivers to identify broadcast pairs.
1 change: 1 addition & 0 deletions changelog/3774.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `RTVIObserver` not processing upstream-only frames. Previously, all upstream frames were filtered out to avoid duplicate messages from broadcasted frames. Now only upstream copies of broadcasted frames are skipped.
5 changes: 5 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class Frame:
id: Unique identifier for the frame instance.
name: Human-readable name combining class name and instance count.
pts: Presentation timestamp in nanoseconds.
broadcast_sibling_id: ID of the paired frame when this frame was
broadcast in both directions. Set automatically by
``broadcast_frame()`` and ``broadcast_frame_instance()``.
metadata: Dictionary for arbitrary frame metadata.
transport_source: Name of the transport source that created this frame.
transport_destination: Name of the transport destination for this frame.
Expand All @@ -131,6 +134,7 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
broadcast_sibling_id: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
Expand All @@ -139,6 +143,7 @@ def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.broadcast_sibling_id: Optional[int] = None
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None
Expand Down
23 changes: 15 additions & 8 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,12 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
downstream_frame = frame_cls(**kwargs)
upstream_frame = frame_cls(**kwargs)
downstream_frame.broadcast_sibling_id = upstream_frame.id
upstream_frame.broadcast_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.
Expand All @@ -812,15 +816,18 @@ async def broadcast_frame_instance(self, frame: Frame):
if not f.init and f.name not in ("id", "name")
}

new_frame = frame_cls(**init_fields)
downstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame)
setattr(downstream_frame, k, v)

new_frame = frame_cls(**init_fields)
upstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
setattr(upstream_frame, k, v)

downstream_frame.broadcast_sibling_id = upstream_frame.id
upstream_frame.broadcast_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
Expand Down
7 changes: 3 additions & 4 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,9 @@ async def on_push_frame(self, data: FramePushed):
frame = data.frame
direction = data.direction

# Only process downstream frames. Some frames are broadcast in both
# directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame),
# and we only want to send one RTVI message per event.
if direction != FrameDirection.DOWNSTREAM:
# For broadcasted frames (pushed in both directions), only process
Comment thread
filipi87 marked this conversation as resolved.
Outdated
# the downstream copy to avoid sending duplicate RTVI messages.
if frame.broadcast_sibling_id is not None and direction != FrameDirection.DOWNSTREAM:
return

# If we have already seen this frame, let's skip it.
Expand Down
10 changes: 10 additions & 0 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ async def _bot_started_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcast_sibling_id = downstream_frame.id
downstream_frame.broadcast_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand All @@ -635,6 +640,11 @@ async def _bot_stopped_speaking(self):
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination

# Setting the siblings id
upstream_frame.broadcast_sibling_id = downstream_frame.id
downstream_frame.broadcast_sibling_id = upstream_frame.id

await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)

Expand Down