Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 1805096

Browse files
authored
Merge pull request #227 from stacklok/add-notification-sse
feat: add alerts sse notification endpoint
2 parents 1f809f2 + f75bf61 commit 1805096

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

src/codegate/dashboard/dashboard.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
import asyncio
2-
from typing import List
2+
from typing import List, AsyncGenerator
33

44
import structlog
55
from fastapi import APIRouter
6+
from fastapi.responses import StreamingResponse
67

78
from codegate.dashboard.post_processing import (
89
parse_get_alert_conversation,
910
parse_messages_in_conversations,
1011
)
1112
from codegate.dashboard.request_models import AlertConversation, Conversation
12-
from codegate.db.connection import DbReader
13+
from codegate.db.connection import DbReader, alert_queue
1314

1415
logger = structlog.get_logger("codegate")
1516

@@ -34,3 +35,19 @@ def get_alerts() -> List[AlertConversation]:
3435
"""
3536
alerts_prompt_output = asyncio.run(db_reader.get_alerts_with_prompt_and_output())
3637
return asyncio.run(parse_get_alert_conversation(alerts_prompt_output))
38+
39+
40+
async def generate_sse_events() -> AsyncGenerator[str, None]:
41+
"""
42+
SSE generator from queue
43+
"""
44+
while True:
45+
message = await alert_queue.get()
46+
yield f"data: {message}\n\n"
47+
48+
@dashboard_router.get("/dashboard/alerts_notification")
49+
async def stream_sse():
50+
"""
51+
Send alerts event
52+
"""
53+
return StreamingResponse(generate_sse_events(), media_type="text/event-stream")

src/codegate/db/connection.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import uuid
66
from pathlib import Path
77
from typing import AsyncGenerator, AsyncIterator, List, Optional
8-
98
import structlog
109
from litellm import ChatCompletionRequest, ModelResponse
1110
from pydantic import BaseModel
@@ -20,7 +19,7 @@
2019
)
2120

2221
logger = structlog.get_logger("codegate")
23-
22+
alert_queue = asyncio.Queue()
2423

2524
class DbCodeGate:
2625

@@ -213,7 +212,9 @@ async def record_alerts(self, alerts: List[Alert]) -> None:
213212
async with asyncio.TaskGroup() as tg:
214213
for alert in alerts:
215214
try:
216-
tg.create_task(self._insert_pydantic_model(alert, sql))
215+
result = tg.create_task(self._insert_pydantic_model(alert, sql))
216+
if result and alert.trigger_category == "critical":
217+
await alert_queue.put(f"New alert detected: {alert.timestamp}")
217218
except Exception as e:
218219
logger.error(f"Failed to record alert: {alert}.", error=str(e))
219220
return None

0 commit comments

Comments
 (0)