Skip to content

Commit 782fa8d

Browse files
authored
Add post (#9)
1 parent 62f2569 commit 782fa8d

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
---
2+
title: "FastAPI and Celery: Prevent This Common Mistake That Crashed Our Service"
3+
description: Are you using Celery tasks in FastAPI? Learn how it can crash your service and how to fix it.
4+
author: hemantapkh
5+
date: 2025-05-11 18:35:00 +0000
6+
categories: [SRE]
7+
tags: [python, fastapi, docsumo]
8+
pin: false
9+
math: false
10+
mermaid: false
11+
image:
12+
path: https://assets.hemantapkh.com/blog/celery-blocking-fastapi-eventloop/thumbnail.webp
13+
---
14+
15+
Recently, our service on FastAPI crashed unexpectedly. After some investigation, we discovered that the root cause was our Redis service being down. But Redis wasn’t even a core component of the application, So why did our entire service crash just because Redis was unavailable? The answer lies in how we were using Celery tasks.
16+
17+
FastAPI is an asynchronous framework, while Celery is inherently synchronous. Yes, synchronous, you heard it right! Although Celery can run tasks asynchronously, its communication with the broker (like Redis or RabbitMQ) is synchronous. This can become a disaster when the broker is unavailable.
18+
19+
## What Happened in Our Case?
20+
21+
In our application, few of the endpoint relied on Celery to dispatch background tasks. Unfortunately, these endpoints were called frequently, and when Redis (our Celery broker) went down, every request to these endpoints caused Celery to hang while attempting to reconnect. Since FastAPI runs on a single event loop, this blocking behavior stalled the entire application for ~20 seconds per request. The result? Our readiness probes started failing. As the issue propagated across all pods in our Kubernetes cluster, the entire service was marked as unhealthy and taken offline.
22+
23+
## How to Fix This Issue
24+
25+
To prevent Celery from blocking FastAPI’s event loop, you can run Celery tasks in a separate thread. Below, I’ve created a patched version of Celery with custom async methods (`apply_asyncx` and `delayx`) to run the task submissing in a separate thread. The `asyncio.to_thread` function enables us to run synchronous functions in a separate thread while awaiting their results asynchronously in the main event loop.
26+
27+
```python
28+
from celery import Celery
29+
30+
def create_celery_app(broker_url: str, backend_url: str | None = None) -> Celery:
31+
celery_app = Celery("tasks", broker=broker_url, backend=backend_url or broker_url)
32+
33+
class PatchedTask(celery_app.Task): # type: ignore[name-defined]
34+
def __init__(self, *args, **kwargs):
35+
super().__init__(*args, **kwargs)
36+
37+
async def apply_asyncx(self, args=None, kwargs=None, **options):
38+
result = await asyncio.to_thread(
39+
super().apply_async, args, kwargs, **options
40+
)
41+
return result
42+
43+
async def delayx(self, *args, **kwargs):
44+
result = await self.apply_asyncx(args, kwargs)
45+
return result
46+
47+
celery_app.Task = PatchedTask # type: ignore[name-defined]
48+
49+
return celery_app
50+
```
51+
52+
> I avoided using Starlette’s default thread pool (via run_in_threadpool) as it’s limited to 40 threads by default and shared with other FastAPI tasks. Exhausting this pool blocks the app. Instead, `asyncio.to_thread` utilizes a separate thread pool, avoiding this bottleneck. For greater control over threading, you can also create a custom thread pool using`concurrent.futures.ThreadPoolExecutor` instead of relying on the default thread pool executor provided by `asyncio.to_thread`.
53+
{: .prompt-warning }
54+
55+
With this implementation, we can now use the `apply_asyncx` and `delayx` methods instead of Celery's default `apply_async` and `delay`. These new methods ensure that task dispatching happens in a separate thread, safeguarding FastAPI's event loop from being blocked during broker outages.
56+
57+
```python
58+
from fastapi import FastAPI
59+
60+
61+
app = FastAPI()
62+
celery_tasks = create_celery_app("redis://localhost:6379/0")
63+
64+
65+
@celery_tasks.task(name="app.tasks.requests_post") # type: ignore[empty-body]
66+
def my_task(*args, **kwargs) -> Tuple[str, bool]:
67+
pass
68+
69+
@app.post("/trigger-task")
70+
async def trigger_task():
71+
result = await my_task.delayx("arg1", "arg2")
72+
return {"message": "Task triggered", "task_id": str(result.id)}
73+
```
74+
75+
## Conclusion
76+
77+
Now, if Redis goes down, only the endpoints depending on Redis are affected, while others continue to function normally. While we’re committed to Celery due to existing infrastructure and dependencies, if you’re starting fresh or planning new implementations, consider exploring other frameworks like [Arq](https://github.com/python-arq/arq) and [TaskIQ](https://github.com/taskiq-python/taskiq).
78+
79+
> For more FastAPI tips, I highly recommend [FastAPI Tips](https://github.com/Kludex/fastapi-tips), a helpful resource maintained by one of the FastAPI maintainers.
80+
{: .prompt-tip }

run.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
bundle exec jekyll s -l

0 commit comments

Comments
 (0)