Skip to content

Commit b77a50d

Browse files
authored
Merge pull request #3529 from lukepayyapilli/fix/llm-timeout-without-retry
feat: handle exceptions for BaseOpenAILLMService
2 parents 7456a0a + 433c1b9 commit b77a50d

3 files changed

Lines changed: 168 additions & 1 deletion

File tree

changelog/3529.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed OpenAI LLM services to emit `ErrorFrame` on completion timeout, enabling proper error handling and LLMSwitcher failover.

src/pipecat/services/openai/base_llm.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
492492
await self.push_frame(LLMFullResponseStartFrame())
493493
await self.start_processing_metrics()
494494
await self._process_context(context)
495-
except httpx.TimeoutException:
495+
except httpx.TimeoutException as e:
496496
await self._call_event_handler("on_completion_timeout")
497+
await self.push_error(error_msg="LLM completion timeout", exception=e)
498+
except Exception as e:
499+
await self.push_error(error_msg=f"Error during completion: {e}", exception=e)
497500
finally:
498501
await self.stop_processing_metrics()
499502
await self.push_frame(LLMFullResponseEndFrame())

tests/test_openai_llm_timeout.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
#
2+
# Copyright (c) 2024-2026, Daily
3+
#
4+
# SPDX-License-Identifier: BSD 2-Clause License
5+
#
6+
7+
"""Unit tests for OpenAI LLM error handling."""
8+
9+
from unittest.mock import AsyncMock, patch
10+
11+
import httpx
12+
import pytest
13+
14+
from pipecat.frames.frames import (
15+
LLMContextFrame,
16+
LLMFullResponseEndFrame,
17+
LLMFullResponseStartFrame,
18+
)
19+
from pipecat.processors.aggregators.llm_context import LLMContext
20+
from pipecat.processors.frame_processor import FrameDirection
21+
from pipecat.services.openai.llm import OpenAILLMService
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_openai_llm_emits_error_frame_on_timeout():
26+
"""Test that OpenAI LLM service emits ErrorFrame when a timeout occurs.
27+
28+
This enables LLMSwitcher to trigger failover to backup LLMs when the
29+
primary LLM times out.
30+
"""
31+
with patch.object(OpenAILLMService, "create_client"):
32+
service = OpenAILLMService(model="gpt-4")
33+
service._client = AsyncMock()
34+
35+
# Track pushed frames and errors
36+
pushed_frames = []
37+
pushed_errors = []
38+
timeout_handler_called = False
39+
40+
original_push_frame = service.push_frame
41+
42+
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
43+
pushed_frames.append(frame)
44+
await original_push_frame(frame, direction)
45+
46+
async def mock_push_error(error_msg, exception=None):
47+
pushed_errors.append({"error_msg": error_msg, "exception": exception})
48+
49+
async def mock_timeout_handler(event_name):
50+
nonlocal timeout_handler_called
51+
if event_name == "on_completion_timeout":
52+
timeout_handler_called = True
53+
54+
service.push_frame = mock_push_frame
55+
service.push_error = mock_push_error
56+
service._call_event_handler = AsyncMock(side_effect=mock_timeout_handler)
57+
58+
# Mock _process_context to raise TimeoutException
59+
service._process_context = AsyncMock(
60+
side_effect=httpx.TimeoutException("Connection timed out")
61+
)
62+
63+
# Mock metrics methods
64+
service.start_processing_metrics = AsyncMock()
65+
service.stop_processing_metrics = AsyncMock()
66+
service.start_ttfb_metrics = AsyncMock()
67+
68+
# Create a context frame to process
69+
context = LLMContext(
70+
messages=[{"role": "user", "content": "Hello"}],
71+
)
72+
frame = LLMContextFrame(context=context)
73+
74+
# Process the frame
75+
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
76+
77+
# Verify timeout handler was called
78+
service._call_event_handler.assert_called_once_with("on_completion_timeout")
79+
assert timeout_handler_called
80+
81+
# Verify push_error was called with correct message
82+
assert len(pushed_errors) == 1
83+
assert pushed_errors[0]["error_msg"] == "LLM completion timeout"
84+
assert isinstance(pushed_errors[0]["exception"], httpx.TimeoutException)
85+
86+
# Verify LLMFullResponseStartFrame and LLMFullResponseEndFrame were pushed
87+
frame_types = [type(f) for f in pushed_frames]
88+
assert LLMFullResponseStartFrame in frame_types
89+
assert LLMFullResponseEndFrame in frame_types
90+
91+
92+
@pytest.mark.asyncio
93+
async def test_openai_llm_timeout_still_pushes_end_frame():
94+
"""Test that LLMFullResponseEndFrame is pushed even when timeout occurs.
95+
96+
The finally block should ensure proper cleanup regardless of timeout.
97+
"""
98+
with patch.object(OpenAILLMService, "create_client"):
99+
service = OpenAILLMService(model="gpt-4")
100+
service._client = AsyncMock()
101+
102+
pushed_frames = []
103+
104+
async def mock_push_frame(frame, direction=FrameDirection.DOWNSTREAM):
105+
pushed_frames.append(frame)
106+
107+
service.push_frame = mock_push_frame
108+
service.push_error = AsyncMock()
109+
service._call_event_handler = AsyncMock()
110+
service._process_context = AsyncMock(side_effect=httpx.TimeoutException("Timeout"))
111+
service.start_processing_metrics = AsyncMock()
112+
service.stop_processing_metrics = AsyncMock()
113+
114+
context = LLMContext(
115+
messages=[{"role": "user", "content": "Hello"}],
116+
)
117+
frame = LLMContextFrame(context=context)
118+
119+
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
120+
121+
# Verify both start and end frames are pushed
122+
frame_types = [type(f) for f in pushed_frames]
123+
assert LLMFullResponseStartFrame in frame_types
124+
assert LLMFullResponseEndFrame in frame_types
125+
126+
# Verify metrics were stopped
127+
service.stop_processing_metrics.assert_called_once()
128+
129+
130+
@pytest.mark.asyncio
131+
async def test_openai_llm_emits_error_frame_on_exception():
132+
"""Test that OpenAI LLM service emits ErrorFrame when a general exception occurs.
133+
134+
This enables proper error handling for API errors, rate limits, and other failures.
135+
"""
136+
with patch.object(OpenAILLMService, "create_client"):
137+
service = OpenAILLMService(model="gpt-4")
138+
service._client = AsyncMock()
139+
140+
pushed_errors = []
141+
142+
async def mock_push_error(error_msg, exception=None):
143+
pushed_errors.append({"error_msg": error_msg, "exception": exception})
144+
145+
service.push_frame = AsyncMock()
146+
service.push_error = mock_push_error
147+
service._call_event_handler = AsyncMock()
148+
service._process_context = AsyncMock(side_effect=RuntimeError("API Error"))
149+
service.start_processing_metrics = AsyncMock()
150+
service.stop_processing_metrics = AsyncMock()
151+
152+
context = LLMContext(
153+
messages=[{"role": "user", "content": "Hello"}],
154+
)
155+
frame = LLMContextFrame(context=context)
156+
157+
await service.process_frame(frame, FrameDirection.DOWNSTREAM)
158+
159+
# Verify push_error was called with correct message
160+
assert len(pushed_errors) == 1
161+
assert "Error during completion" in pushed_errors[0]["error_msg"]
162+
assert "API Error" in pushed_errors[0]["error_msg"]
163+
assert isinstance(pushed_errors[0]["exception"], RuntimeError)

0 commit comments

Comments
 (0)