Skip to content

Commit 024809b

Browse files
authored
Merge pull request pipecat-ai#3503 from pipecat-ai/aleix/ai-service-start-end-cancel
AIService: handle StartFrame/EndFrame/CancelFrame exceptions
2 parents 778dacc + 6cf0d53 commit 024809b

2 files changed

Lines changed: 23 additions & 4 deletions

File tree

changelog/3503.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed an issue in `AIService` where unhandled exceptions in `start()`, `stop()`, or `cancel()` implementations would prevent `process_frame()` to continue and therefore `StartFrame`, `EndFrame`, or `CancelFrame` from being pushed downstream, causing the pipeline to not start or stop properly.

src/pipecat/services/ai_service.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
148148
await super().process_frame(frame, direction)
149149

150150
if isinstance(frame, StartFrame):
151-
await self.start(frame)
152-
elif isinstance(frame, CancelFrame):
153-
await self.cancel(frame)
151+
await self._start(frame)
154152
elif isinstance(frame, EndFrame):
155-
await self.stop(frame)
153+
await self._stop(frame)
154+
elif isinstance(frame, CancelFrame):
155+
await self._cancel(frame)
156156

157157
async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):
158158
"""Process frames from an async generator.
@@ -169,3 +169,21 @@ async def process_generator(self, generator: AsyncGenerator[Frame | None, None])
169169
await self.push_error_frame(f)
170170
else:
171171
await self.push_frame(f)
172+
173+
async def _start(self, frame: StartFrame):
174+
try:
175+
await self.start(frame)
176+
except Exception as e:
177+
logger.error(f"{self}: exception processing {frame}: {e}")
178+
179+
async def _stop(self, frame: EndFrame):
180+
try:
181+
await self.stop(frame)
182+
except Exception as e:
183+
logger.error(f"{self}: exception processing {frame}: {e}")
184+
185+
async def _cancel(self, frame: CancelFrame):
186+
try:
187+
await self.cancel(frame)
188+
except Exception as e:
189+
logger.error(f"{self}: exception processing {frame}: {e}")

0 commit comments

Comments
 (0)