|
3 | 3 | import asyncio |
4 | 4 | import base64 |
5 | 5 | import inspect |
| 6 | +import threading |
6 | 7 | import warnings |
7 | 8 | from typing import TYPE_CHECKING, Any, Callable |
8 | 9 | from urllib.parse import parse_qs |
@@ -324,36 +325,65 @@ async def final_handler(app): |
324 | 325 | return await next_handler(self) |
325 | 326 |
|
326 | 327 | def _wrap_middleware_async(self, middleware: Callable, next_handler: Callable) -> Callable: |
327 | | - """Wrap a middleware to work in async context.""" |
| 328 | + """Wrap a middleware to work in async context. |
| 329 | +
|
| 330 | + For sync middlewares, we split execution into pre/post phases around the |
| 331 | + call to next(). The sync middleware runs its pre-processing (e.g. request |
| 332 | + validation), then we intercept the next() call, await the async handler, |
| 333 | + and resume the middleware with the real response so post-processing |
| 334 | + (e.g. response validation) sees the actual data. |
| 335 | + """ |
328 | 336 |
|
329 | 337 | async def wrapped(app): |
330 | | - # Create a next_middleware that the sync middleware can call |
331 | | - def sync_next(app): |
332 | | - # This will be called by sync middleware |
333 | | - # We need to run the async next_handler |
334 | | - loop = asyncio.get_event_loop() |
335 | | - if loop.is_running(): |
336 | | - # We're in an async context, create a task |
337 | | - future = asyncio.ensure_future(next_handler(app)) |
338 | | - # Store for later await |
339 | | - app.context["_async_next_result"] = future |
340 | | - return Response(status_code=200, body="") # Placeholder |
341 | | - else: # pragma: no cover |
342 | | - return loop.run_until_complete(next_handler(app)) |
343 | | - |
344 | | - # Check if middleware is async |
345 | 338 | if inspect.iscoroutinefunction(middleware): |
346 | | - result = await middleware(app, next_handler) |
347 | | - else: |
348 | | - # Sync middleware - need special handling |
349 | | - result = middleware(app, sync_next) |
| 339 | + return await middleware(app, next_handler) |
350 | 340 |
|
351 | | - # Check if we stored an async result |
352 | | - if "_async_next_result" in app.context: |
353 | | - future = app.context.pop("_async_next_result") |
354 | | - result = await future |
| 341 | + # We use an Event to coordinate: the sync middleware runs in a thread, |
| 342 | + # calls sync_next which signals us to resolve the async handler, |
| 343 | + # then waits for the real response. |
| 344 | + middleware_called_next = asyncio.Event() |
| 345 | + next_app_holder: list = [] |
| 346 | + real_response_holder: list = [] |
| 347 | + middleware_result_holder: list = [] |
| 348 | + middleware_error_holder: list = [] |
355 | 349 |
|
356 | | - return result |
| 350 | + def sync_next(app): |
| 351 | + next_app_holder.append(app) |
| 352 | + middleware_called_next.set() |
| 353 | + # Block this thread until the real response is available |
| 354 | + event = threading.Event() |
| 355 | + next_app_holder.append(event) |
| 356 | + event.wait() |
| 357 | + return real_response_holder[0] |
| 358 | + |
| 359 | + def run_middleware(): |
| 360 | + try: |
| 361 | + result = middleware(app, sync_next) |
| 362 | + middleware_result_holder.append(result) |
| 363 | + except Exception as e: |
| 364 | + middleware_error_holder.append(e) |
| 365 | + |
| 366 | + thread = threading.Thread(target=run_middleware, daemon=True) |
| 367 | + thread.start() |
| 368 | + |
| 369 | + # Wait for the middleware to call next() |
| 370 | + await middleware_called_next.wait() |
| 371 | + |
| 372 | + # Now resolve the async next_handler |
| 373 | + real_response = await next_handler(next_app_holder[0]) |
| 374 | + real_response_holder.append(real_response) |
| 375 | + |
| 376 | + # Signal the thread that the response is ready |
| 377 | + threading_event = next_app_holder[1] |
| 378 | + threading_event.set() |
| 379 | + |
| 380 | + # Wait for the middleware thread to finish |
| 381 | + thread.join() |
| 382 | + |
| 383 | + if middleware_error_holder: |
| 384 | + raise middleware_error_holder[0] |
| 385 | + |
| 386 | + return middleware_result_holder[0] |
357 | 387 |
|
358 | 388 | return wrapped |
359 | 389 |
|
|
0 commit comments