Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3774.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added `broadcasted` field to the base `Frame` class. This field is automatically set to `True` by `broadcast_frame()` and `broadcast_frame_instance()` to distinguish broadcasted frames from single-direction frames.
1 change: 1 addition & 0 deletions changelog/3774.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed `RTVIObserver` not processing upstream-only frames. Previously, all upstream frames were filtered out to avoid duplicate messages from broadcasted frames. Now only upstream copies of broadcasted frames are skipped.
2 changes: 2 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
broadcasted: bool = field(init=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markbackman, after discussing this with @kompfner and @aconchillo, we agreed that instead we should store broadcasted_sibling_id, which will reference the frame ID of the other frame that has been broadcast. This will be useful for debugging in the future.

So, I will apply this change so we can include this PR in the next release.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
Expand All @@ -139,6 +140,7 @@ def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.broadcasted: bool = False
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None
Expand Down
10 changes: 8 additions & 2 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,12 @@ async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
downstream_frame = frame_cls(**kwargs)
downstream_frame.broadcasted = True
await self.push_frame(downstream_frame)
upstream_frame = frame_cls(**kwargs)
upstream_frame.broadcasted = True
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)

async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.
Expand All @@ -815,11 +819,13 @@ async def broadcast_frame_instance(self, frame: Frame):
new_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
new_frame.broadcasted = True
await self.push_frame(new_frame)

new_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
new_frame.broadcasted = True
await self.push_frame(new_frame, FrameDirection.UPSTREAM)

async def __start(self, frame: StartFrame):
Expand Down
7 changes: 3 additions & 4 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,9 @@ async def on_push_frame(self, data: FramePushed):
frame = data.frame
direction = data.direction

# Only process downstream frames. Some frames are broadcast in both
# directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame),
# and we only want to send one RTVI message per event.
if direction != FrameDirection.DOWNSTREAM:
# For broadcasted frames (pushed in both directions), only process
Comment thread
filipi87 marked this conversation as resolved.
Outdated
# the downstream copy to avoid sending duplicate RTVI messages.
if frame.broadcasted and direction != FrameDirection.DOWNSTREAM:
return

# If we have already seen this frame, let's skip it.
Expand Down