Skip to content

Commit 903dc6c

Browse files
authored
Merge pull request #3883 from pipecat-ai/aleix/queue-frame-direction
Add direction parameter to PipelineTask.queue_frame() and queue_frames()
2 parents dee94b3 + 94a59de commit 903dc6c

5 files changed

Lines changed: 775 additions & 573 deletions

File tree

changelog/3883.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added optional `direction` parameter to `PipelineTask.queue_frame()` and `PipelineTask.queue_frames()`, allowing frames to be pushed upstream from the end of the pipeline.

src/pipecat/pipeline/task.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,12 @@ async def on_client_ready(rtvi: RTVIProcessor):
389389
# source allows us to receive and react to upstream frames, and the sink
390390
# allows us to receive and react to downstream frames.
391391
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
392-
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
392+
self._sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
393393
# Only prepend the RTVIProcessor if we created it ourselves. When the
394394
# user already placed it inside their pipeline we must not insert it
395395
# again or it will appear twice in the frame chain.
396396
processors = [self._rtvi, pipeline] if prepend_rtvi else [pipeline]
397-
self._pipeline = Pipeline(processors, source=source, sink=sink)
397+
self._pipeline = Pipeline(processors, source=source, sink=self._sink)
398398

399399
# The task observer acts as a proxy to the provided observers. This way,
400400
# we only need to pass a single observer (using the StartFrame) which
@@ -625,26 +625,43 @@ async def run(self, params: PipelineTaskParams):
625625
self._finished = True
626626
logger.debug(f"Pipeline task {self} has finished")
627627

628-
async def queue_frame(self, frame: Frame):
629-
"""Queue a single frame to be pushed down the pipeline.
628+
async def queue_frame(
629+
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
630+
):
631+
"""Queue a single frame to be pushed through the pipeline.
632+
633+
Downstream frames are pushed from the beginning of the pipeline.
634+
Upstream frames are pushed from the end of the pipeline.
630635
631636
Args:
632637
frame: The frame to be processed.
638+
direction: The direction to push the frame. Defaults to downstream.
633639
"""
634-
await self._push_queue.put(frame)
640+
if direction == FrameDirection.DOWNSTREAM:
641+
await self._push_queue.put(frame)
642+
else:
643+
await self._sink.queue_frame(frame, direction)
644+
645+
async def queue_frames(
646+
self,
647+
frames: Iterable[Frame] | AsyncIterable[Frame],
648+
direction: FrameDirection = FrameDirection.DOWNSTREAM,
649+
):
650+
"""Queue multiple frames to be pushed through the pipeline.
635651
636-
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
637-
"""Queues multiple frames to be pushed down the pipeline.
652+
Downstream frames are pushed from the beginning of the pipeline.
653+
Upstream frames are pushed from the end of the pipeline.
638654
639655
Args:
640656
frames: An iterable or async iterable of frames to be processed.
657+
direction: The direction to push the frames. Defaults to downstream.
641658
"""
642659
if isinstance(frames, AsyncIterable):
643660
async for frame in frames:
644-
await self.queue_frame(frame)
661+
await self.queue_frame(frame, direction)
645662
elif isinstance(frames, Iterable):
646663
for frame in frames:
647-
await self.queue_frame(frame)
664+
await self.queue_frame(frame, direction)
648665

649666
async def _cancel(self, *, reason: Optional[str] = None):
650667
"""Internal cancellation logic for the pipeline task.

src/pipecat/processors/frame_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
965965
try:
966966
timestamp = self._clock.get_time() if self._clock else 0
967967
if direction == FrameDirection.DOWNSTREAM and self._next:
968-
logger.trace(f"Pushing {frame} from {self} to {self._next}")
968+
logger.trace(f"Pushing {frame} downstream from {self} to {self._next}")
969969

970970
if self._observer:
971971
data = FramePushed(

tests/test_pipeline.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,63 @@ async def on_frame_reached_downstream(task, frame):
292292
assert upstream_received
293293
assert downstream_received
294294

295+
async def test_task_queue_frame_upstream(self):
296+
upstream_received = False
297+
298+
pipeline = Pipeline([IdentityFilter()])
299+
task = PipelineTask(pipeline, cancel_on_idle_timeout=False)
300+
task.set_reached_upstream_filter((TextFrame,))
301+
302+
@task.event_handler("on_frame_reached_upstream")
303+
async def on_frame_reached_upstream(task, frame):
304+
nonlocal upstream_received
305+
if isinstance(frame, TextFrame) and frame.text == "Hello Upstream!":
306+
upstream_received = True
307+
308+
@task.event_handler("on_pipeline_started")
309+
async def on_pipeline_started(task, frame):
310+
await task.queue_frame(TextFrame(text="Hello Upstream!"), FrameDirection.UPSTREAM)
311+
312+
try:
313+
await asyncio.wait_for(
314+
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
315+
timeout=1.0,
316+
)
317+
except asyncio.TimeoutError:
318+
pass
319+
320+
assert upstream_received
321+
322+
async def test_task_queue_frames_upstream(self):
323+
upstream_texts = []
324+
325+
pipeline = Pipeline([IdentityFilter()])
326+
task = PipelineTask(pipeline, cancel_on_idle_timeout=False)
327+
task.set_reached_upstream_filter((TextFrame,))
328+
329+
@task.event_handler("on_frame_reached_upstream")
330+
async def on_frame_reached_upstream(task, frame):
331+
if isinstance(frame, TextFrame):
332+
upstream_texts.append(frame.text)
333+
334+
@task.event_handler("on_pipeline_started")
335+
async def on_pipeline_started(task, frame):
336+
await task.queue_frames(
337+
[TextFrame(text="First"), TextFrame(text="Second")],
338+
FrameDirection.UPSTREAM,
339+
)
340+
341+
try:
342+
await asyncio.wait_for(
343+
task.run(PipelineTaskParams(loop=asyncio.get_event_loop())),
344+
timeout=1.0,
345+
)
346+
except asyncio.TimeoutError:
347+
pass
348+
349+
assert "First" in upstream_texts
350+
assert "Second" in upstream_texts
351+
295352
async def test_task_heartbeats(self):
296353
heartbeats_counter = 0
297354

0 commit comments

Comments
 (0)