1919logger = get_logger (__name__ )
2020
2121
22+ class JobCancelledException (Exception ):
23+ """Exception raised when a job is explicitly cancelled (not due to client timeout)"""
24+
25+ def __init__ (self , job_id : str , message : str = None ):
26+ self .job_id = job_id
27+ super ().__init__ (message or f"Job { job_id } was explicitly cancelled" )
28+
29+
2230async def add_keepalive_to_stream (
2331 stream_generator : AsyncIterator [str | bytes ],
2432 keepalive_interval : float = 30.0 ,
@@ -134,8 +142,8 @@ async def cancellation_aware_stream_wrapper(
134142 # Send cancellation event to client
135143 cancellation_event = {"message_type" : "stop_reason" , "stop_reason" : "cancelled" }
136144 yield f"data: { json .dumps (cancellation_event )} \n \n "
137- # Raise CancelledError to interrupt the stream
138- raise asyncio . CancelledError ( f"Job { job_id } was cancelled" )
145+ # Raise custom exception for explicit job cancellation
146+ raise JobCancelledException ( job_id , f"Job { job_id } was cancelled" )
139147 except Exception as e :
140148 # Log warning but don't fail the stream if cancellation check fails
141149 logger .warning (f"Failed to check job cancellation for job { job_id } : { e } " )
@@ -144,9 +152,13 @@ async def cancellation_aware_stream_wrapper(
144152
145153 yield chunk
146154
155+ except JobCancelledException :
156+ # Re-raise JobCancelledException to distinguish from client timeout
157+ logger .info (f"Stream for job { job_id } was explicitly cancelled and cleaned up" )
158+ raise
147159 except asyncio .CancelledError :
148- # Re-raise CancelledError to ensure proper cleanup
149- logger .info (f"Stream for job { job_id } was cancelled and cleaned up" )
160+ # Re-raise CancelledError (likely client timeout) to ensure proper cleanup
161+ logger .info (f"Stream for job { job_id } was cancelled (likely client timeout) and cleaned up" )
150162 raise
151163 except Exception as e :
152164 logger .error (f"Error in cancellation-aware stream wrapper for job { job_id } : { e } " )
@@ -215,12 +227,12 @@ async def stream_response(self, send: Send) -> None:
215227 }
216228 )
217229
218- # This should be handled properly upstream?
219- except asyncio . CancelledError as exc :
220- logger .warning ( "Stream was cancelled by client or job cancellation " )
221- # Handle cancellation gracefully
230+ # Handle explicit job cancellations (should not throw error)
231+ except JobCancelledException as exc :
232+ logger .info ( f "Stream was explicitly cancelled for job { exc . job_id } " )
233+ # Handle explicit cancellation gracefully without error
222234 more_body = False
223- cancellation_resp = {"error" : { " message" : "Stream cancelled" } }
235+ cancellation_resp = {"message" : "Job was cancelled" }
224236 cancellation_event = f"event: cancelled\n data: { json .dumps (cancellation_resp )} \n \n " .encode (self .charset )
225237 if not self .response_started :
226238 await send (
@@ -238,6 +250,31 @@ async def stream_response(self, send: Send) -> None:
238250 "more_body" : more_body ,
239251 }
240252 )
253+ return
254+
255+ # Handle client timeouts (should throw error to inform user)
256+ except asyncio .CancelledError as exc :
257+ logger .warning ("Stream was cancelled due to client timeout or unexpected disconnection" )
258+ # Handle unexpected cancellation with error
259+ more_body = False
260+ error_resp = {"error" : {"message" : "Request was unexpectedly cancelled (likely due to client timeout or disconnection)" }}
261+ error_event = f"event: error\n data: { json .dumps (error_resp )} \n \n " .encode (self .charset )
262+ if not self .response_started :
263+ await send (
264+ {
265+ "type" : "http.response.start" ,
266+ "status" : 408 , # Request Timeout
267+ "headers" : self .raw_headers ,
268+ }
269+ )
270+ raise
271+ await send (
272+ {
273+ "type" : "http.response.body" ,
274+ "body" : error_event ,
275+ "more_body" : more_body ,
276+ }
277+ )
241278 capture_sentry_exception (exc )
242279 return
243280
0 commit comments