Skip to content

Commit ef71923

Browse files
feat: Add EventSourceResponse with FastAPI-style SSE support (#203)
- EventSourceResponse with auto SSE framing, compression skip, keep-alive pings - ServerSentEvent struct for full control over event/id/retry/comment fields - response_class parameter on all HTTP method decorators - Rust: zero-copy chunks via PyBackedBytes, PyBackedStr for media_type - Rust: always skip compression for text/event-stream streams - Rust: keepalive ping stream wrapper using tokio::time::timeout - Dedicated SSE documentation page fix: Resolve task affinity bug in async streaming (fixes opentelemetry/anyio errors) Rewrote async stream forwarding to schedule the Python generator iteration on the same asyncio event loop task that created the generator context. The previous approach used tokio::spawn + join_all which drove the generator from a different task, breaking anyio cancel scopes and opentelemetry context propagation (ContextVar tokens created in one task couldn't be detached in another).
1 parent 36b93c3 commit ef71923

File tree

17 files changed

+1178
-376
lines changed

17 files changed

+1178
-376
lines changed

docs/src/ref/responses.md

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,97 @@ BOLT_ALLOWED_FILE_PATHS = [
173173
]
174174
```
175175

176+
## EventSourceResponse
177+
178+
Server-Sent Events response with automatic SSE framing, compression skipping, and keep-alive pings. See the [SSE topic guide](../topics/sse.md) for full documentation.
179+
180+
### Implicit pattern
181+
182+
```python
183+
from collections.abc import AsyncIterable
184+
from django_bolt.responses import EventSourceResponse
185+
186+
@api.get("/events", response_class=EventSourceResponse)
187+
async def events() -> AsyncIterable[dict]:
188+
for i in range(10):
189+
yield {"count": i}
190+
await asyncio.sleep(1)
191+
```
192+
193+
### Explicit pattern
194+
195+
```python
196+
from django_bolt.responses import EventSourceResponse
197+
198+
@api.get("/events")
199+
async def events():
200+
async def generate():
201+
yield {"count": i}
202+
return EventSourceResponse(generate())
203+
```
204+
205+
### Parameters
206+
207+
| Parameter | Type | Default | Description |
208+
|-----------|------|---------|-------------|
209+
| `content` | generator | required | Generator instance |
210+
| `status_code` | `int` | `200` | HTTP status code |
211+
| `headers` | `dict` | `None` | Additional response headers |
212+
| `ping_interval` | `float \| None` | `15.0` | Keep-alive ping interval in seconds (`None` to disable) |
213+
214+
### Automatic behavior
215+
216+
- **SSE framing**: Yielded objects are JSON-serialized and wrapped in `data: ...\n\n`
217+
- **Compression**: Automatically skipped (no `@no_compress` needed)
218+
- **Headers**: `Cache-Control`, `X-Accel-Buffering`, `Content-Type` set automatically
219+
- **Keep-alive**: `: ping` comments sent when idle to prevent proxy timeouts
220+
221+
## ServerSentEvent
222+
223+
Represents a single SSE event with full control over all fields.
224+
225+
```python
226+
from django_bolt.responses import ServerSentEvent
227+
228+
yield ServerSentEvent(data={"count": 1}, event="update", id="1")
229+
yield ServerSentEvent(raw_data="plain text", comment="keepalive")
230+
yield ServerSentEvent(retry=5000)
231+
```
232+
233+
### Fields
234+
235+
| Field | Type | Default | Description |
236+
|-------|------|---------|-------------|
237+
| `data` | `Any` | `None` | JSON-serialized event payload |
238+
| `raw_data` | `str \| None` | `None` | Raw string payload (mutually exclusive with `data`) |
239+
| `event` | `str \| None` | `None` | Event type name |
240+
| `id` | `str \| None` | `None` | Event ID (no null characters) |
241+
| `retry` | `int \| None` | `None` | Reconnection time in ms (non-negative) |
242+
| `comment` | `str \| None` | `None` | Comment line (`: ` prefix) |
243+
244+
## format_sse_event
245+
246+
Build SSE wire-format bytes from pre-serialized data.
247+
248+
```python
249+
from django_bolt.responses import format_sse_event
250+
251+
# Simple data event
252+
frame = format_sse_event(data_str='{"count": 1}')
253+
# b'data: {"count": 1}\n\n'
254+
255+
# Full event
256+
frame = format_sse_event(data_str="payload", event="update", id="42", retry=5000)
257+
# b'event: update\ndata: payload\nid: 42\nretry: 5000\n\n'
258+
259+
# Pre-encoded bytes (avoids decode/encode round-trip)
260+
frame = format_sse_event(data_bytes=b'{"count": 1}')
261+
# b'data: {"count": 1}\n\n'
262+
```
263+
176264
## StreamingResponse
177265

178-
Streaming response for generators.
266+
General-purpose streaming response for generators.
179267

180268
```python
181269
from django_bolt import StreamingResponse
@@ -193,9 +281,9 @@ return StreamingResponse(generate(), media_type="text/plain")
193281
async def async_generate():
194282
for i in range(100):
195283
await asyncio.sleep(0.1)
196-
yield f"data: {i}\n\n"
284+
yield f"chunk {i}\n"
197285

198-
return StreamingResponse(async_generate(), media_type="text/event-stream")
286+
return StreamingResponse(async_generate(), media_type="text/plain")
199287
```
200288

201289
### Parameters
@@ -212,7 +300,7 @@ return StreamingResponse(async_generate(), media_type="text/event-stream")
212300
| Type | Use case |
213301
|------|----------|
214302
| `text/plain` | Plain text streaming |
215-
| `text/event-stream` | Server-Sent Events (SSE) |
303+
| `text/event-stream` | Server-Sent Events (prefer `EventSourceResponse`) |
216304
| `application/octet-stream` | Binary data |
217305
| `application/json` | JSON streaming (NDJSON) |
218306

docs/src/topics/middleware.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ async def stream():
130130
return StreamingResponse(generate())
131131
```
132132

133+
!!! note
134+
`EventSourceResponse` and all SSE streams (`text/event-stream`) skip compression
135+
automatically. You only need `@no_compress` for non-SSE streaming.
136+
133137
### Compression settings
134138

135139
Configure in `settings.py`:

docs/src/topics/responses.md

Lines changed: 8 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -250,149 +250,17 @@ async def async_stream():
250250

251251
### Server-Sent Events (SSE)
252252

253-
Create SSE endpoints for real-time updates. SSE is a standard for pushing events from server to browser over HTTP.
254-
255-
#### Basic SSE
253+
For SSE endpoints, see the dedicated [Server-Sent Events guide](sse.md). Django-Bolt provides `EventSourceResponse` with automatic SSE framing, compression skipping, and keep-alive pings:
256254

257255
```python
258-
import asyncio
259-
260-
@api.get("/events")
261-
async def events():
262-
async def generate():
263-
for i in range(10):
264-
yield f"data: message-{i}\n\n"
265-
await asyncio.sleep(1)
266-
267-
return StreamingResponse(generate(), media_type="text/event-stream")
268-
```
269-
270-
#### SSE format
271-
272-
Each event is terminated by a double newline (`\n\n`). Fields:
273-
274-
| Field | Description |
275-
|-------|-------------|
276-
| `data:` | Event data (required) |
277-
| `event:` | Event type (optional, default: "message") |
278-
| `id:` | Event ID for reconnection (optional) |
279-
| `retry:` | Reconnection time in ms (optional) |
280-
281-
#### Full SSE event format
282-
283-
```python
284-
@api.get("/sse-events")
285-
async def sse_events():
286-
async def generate():
287-
for i in range(5):
288-
# Full SSE event with all fields
289-
yield f"event: update\nid: {i}\ndata: {{\"count\": {i}}}\n\n"
290-
await asyncio.sleep(0.5)
291-
292-
return StreamingResponse(generate(), media_type="text/event-stream")
293-
```
294-
295-
Client receives:
296-
```
297-
event: update
298-
id: 0
299-
data: {"count": 0}
300-
301-
event: update
302-
id: 1
303-
data: {"count": 1}
304-
```
305-
306-
#### Sync generators for SSE
307-
308-
You can use sync generators for CPU-bound operations:
309-
310-
```python
311-
import time
312-
313-
@api.get("/sync-sse")
314-
async def sync_sse():
315-
def generate():
316-
for i in range(5):
317-
yield f"data: sync-message-{i}\n\n"
318-
time.sleep(0.1)
319-
320-
return StreamingResponse(generate(), media_type="text/event-stream")
321-
```
322-
323-
#### Mixed data types
324-
325-
Generators can yield both strings and bytes:
256+
from collections.abc import AsyncIterable
257+
from django_bolt.responses import EventSourceResponse
326258

327-
```python
328-
@api.get("/mixed-sse")
329-
async def mixed_sse():
330-
async def generate():
331-
yield "data: string message\n\n"
332-
yield b"data: bytes message\n\n" # Also works
333-
334-
return StreamingResponse(generate(), media_type="text/event-stream")
335-
```
336-
337-
#### SSE with cleanup
338-
339-
Use try/finally for resource cleanup when clients disconnect:
340-
341-
```python
342-
@api.get("/sse-with-cleanup")
343-
async def sse_with_cleanup():
344-
async def generate():
345-
try:
346-
yield "data: START\n\n"
347-
for i in range(100):
348-
yield f"data: chunk-{i}\n\n"
349-
await asyncio.sleep(0.1)
350-
yield "data: END\n\n"
351-
finally:
352-
# This runs when client disconnects
353-
print("Client disconnected, cleaning up")
354-
355-
return StreamingResponse(generate(), media_type="text/event-stream")
356-
```
357-
358-
#### SSE headers
359-
360-
SSE endpoints should include these headers for proper behavior:
361-
362-
```python
363-
@api.get("/sse")
364-
async def sse():
365-
async def generate():
366-
for i in range(10):
367-
yield f"data: {i}\n\n"
368-
await asyncio.sleep(1)
369-
370-
return StreamingResponse(
371-
generate(),
372-
media_type="text/event-stream",
373-
headers={
374-
"Cache-Control": "no-cache",
375-
"X-Accel-Buffering": "no", # Disable nginx buffering
376-
}
377-
)
378-
```
379-
380-
### Disabling compression for streams
381-
382-
Streaming responses should not be compressed. Use `@no_compress`:
383-
384-
```python
385-
from django_bolt.middleware import no_compress
386-
387-
@api.get("/sse")
388-
@no_compress
389-
async def sse():
390-
async def generate():
391-
for i in range(10):
392-
yield f"data: {i}\n\n"
393-
await asyncio.sleep(1)
394-
395-
return StreamingResponse(generate(), media_type="text/event-stream")
259+
@api.get("/events", response_class=EventSourceResponse)
260+
async def events() -> AsyncIterable[dict]:
261+
for i in range(10):
262+
yield {"count": i}
263+
await asyncio.sleep(1)
396264
```
397265

398266
## Response with custom headers

0 commit comments

Comments
 (0)