Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 2cbe6a0

Browse files
aponcedeleonchyrobla
authored andcommitted
Merge pull request #197 from stacklok/setup-alerts
Record alerts as part of the pipeline
2 parents 0375d7d + 446c226 commit 2cbe6a0

File tree

22 files changed

+379
-763
lines changed

22 files changed

+379
-763
lines changed

scripts/import_packages.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
2+
from datetime import date
23
import json
4+
import os
35

46
import weaviate
57
from weaviate.classes.config import DataType, Property
@@ -14,7 +16,8 @@ class PackageImporter:
1416
def __init__(self):
1517
self.client = weaviate.WeaviateClient(
1618
embedded_options=EmbeddedOptions(
17-
persistence_data_path="./weaviate_data", grpc_port=50052
19+
persistence_data_path="./weaviate_data", grpc_port=50052,
20+
additional_env_vars={"ENABLE_MODULES": "backup-filesystem", "BACKUP_FILESYSTEM_PATH": os.getenv("BACKUP_FILESYSTEM_PATH", "/tmp")}
1821
)
1922
)
2023
self.json_files = [
@@ -86,6 +89,9 @@ async def run_import(self):
8689
self.setup_schema()
8790
await self.add_data()
8891

92+
#  take a backup of the data
93+
await self.client.backup.create(backup_id="backup-"+date.today().strftime("%Y-%m-%d"), backend="filesystem", wait_for_completion=True)
94+
8995

9096
if __name__ == "__main__":
9197
importer = PackageImporter()

sql/queries/queries.sql

Lines changed: 12 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,24 @@
1-
-- name: CreatePrompt :one
2-
INSERT INTO prompts (
3-
id,
4-
timestamp,
5-
provider,
6-
request,
7-
type
8-
) VALUES (?, ?, ?, ?, ?) RETURNING *;
9-
10-
-- name: GetPrompt :one
11-
SELECT * FROM prompts WHERE id = ?;
12-
13-
-- name: ListPrompts :many
14-
SELECT * FROM prompts
15-
ORDER BY timestamp DESC
16-
LIMIT ? OFFSET ?;
17-
18-
-- name: CreateOutput :one
19-
INSERT INTO outputs (
20-
id,
21-
prompt_id,
22-
timestamp,
23-
output
24-
) VALUES (?, ?, ?, ?) RETURNING *;
25-
26-
-- name: GetOutput :one
27-
SELECT * FROM outputs WHERE id = ?;
28-
29-
-- name: GetOutputsByPromptId :many
30-
SELECT * FROM outputs
31-
WHERE prompt_id = ?
32-
ORDER BY timestamp DESC;
33-
34-
-- name: CreateAlert :one
35-
INSERT INTO alerts (
36-
id,
37-
prompt_id,
38-
output_id,
39-
code_snippet,
40-
trigger_string,
41-
trigger_type,
42-
trigger_category,
43-
timestamp
44-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING *;
45-
46-
-- name: GetAlert :one
47-
SELECT * FROM alerts WHERE id = ?;
48-
49-
-- name: ListAlertsByPrompt :many
50-
SELECT * FROM alerts
51-
WHERE prompt_id = ?
52-
ORDER BY timestamp DESC;
53-
54-
-- name: GetSettings :one
55-
SELECT * FROM settings ORDER BY id LIMIT 1;
56-
57-
-- name: UpsertSettings :one
58-
INSERT INTO settings (
59-
id,
60-
ip,
61-
port,
62-
llm_model,
63-
system_prompt,
64-
other_settings
65-
) VALUES (?, ?, ?, ?, ?, ?)
66-
ON CONFLICT(id) DO UPDATE SET
67-
ip = excluded.ip,
68-
port = excluded.port,
69-
llm_model = excluded.llm_model,
70-
system_prompt = excluded.system_prompt,
71-
other_settings = excluded.other_settings
72-
RETURNING *;
73-
74-
-- name: GetPromptWithOutputsAndAlerts :many
1+
-- name: GetPromptWithOutputs :many
752
SELECT
763
p.*,
774
o.id as output_id,
785
o.output,
79-
a.id as alert_id,
80-
a.code_snippet,
81-
a.trigger_string,
82-
a.trigger_type,
83-
a.trigger_category
6+
o.timestamp as output_timestamp
847
FROM prompts p
858
LEFT JOIN outputs o ON p.id = o.prompt_id
86-
LEFT JOIN alerts a ON p.id = a.prompt_id
87-
WHERE p.id = ?
88-
ORDER BY o.timestamp DESC, a.timestamp DESC;
89-
9+
ORDER BY o.timestamp DESC;
9010

91-
-- name: GetPromptWithOutputs :many
11+
-- name: GetAlertsWithPromptAndOutput :many
9212
SELECT
93-
p.*,
13+
a.*,
14+
p.timestamp as prompt_timestamp,
15+
p.provider,
16+
p.request,
17+
p.type,
9418
o.id as output_id,
9519
o.output,
9620
o.timestamp as output_timestamp
97-
FROM prompts p
21+
FROM alerts a
22+
LEFT JOIN prompts p ON p.id = a.prompt_id
9823
LEFT JOIN outputs o ON p.id = o.prompt_id
99-
ORDER BY o.timestamp DESC;
24+
ORDER BY a.timestamp DESC;

sql/schema/schema.sql

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ CREATE TABLE outputs (
2222
CREATE TABLE alerts (
2323
id TEXT PRIMARY KEY, -- UUID stored as TEXT
2424
prompt_id TEXT NOT NULL,
25-
output_id TEXT NOT NULL,
26-
code_snippet TEXT NOT NULL, -- VARCHAR(255)
27-
trigger_string TEXT NOT NULL, -- VARCHAR(255)
25+
code_snippet TEXT, -- We check in code that not both code_snippet and trigger_string are NULL
26+
trigger_string TEXT, -- VARCHAR(255)
2827
trigger_type TEXT NOT NULL, -- VARCHAR(50)
2928
trigger_category TEXT,
3029
timestamp DATETIME NOT NULL,
31-
FOREIGN KEY (prompt_id) REFERENCES prompts(id),
32-
FOREIGN KEY (output_id) REFERENCES outputs(id)
30+
FOREIGN KEY (prompt_id) REFERENCES prompts(id)
3331
);
3432

3533
-- Settings table
@@ -45,7 +43,6 @@ CREATE TABLE settings (
4543
-- Create indexes for foreign keys and frequently queried columns
4644
CREATE INDEX idx_outputs_prompt_id ON outputs(prompt_id);
4745
CREATE INDEX idx_alerts_prompt_id ON alerts(prompt_id);
48-
CREATE INDEX idx_alerts_output_id ON alerts(output_id);
4946
CREATE INDEX idx_prompts_timestamp ON prompts(timestamp);
5047
CREATE INDEX idx_outputs_timestamp ON outputs(timestamp);
5148
CREATE INDEX idx_alerts_timestamp ON alerts(timestamp);

src/codegate/dashboard/dashboard.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import structlog
55
from fastapi import APIRouter
66

7-
from codegate.dashboard.post_processing import match_conversations, parse_get_prompt_with_output
8-
from codegate.dashboard.request_models import Conversation
7+
from codegate.dashboard.post_processing import (
8+
match_conversations,
9+
parse_get_alert_conversation,
10+
parse_get_prompt_with_output,
11+
)
12+
from codegate.dashboard.request_models import AlertConversation, Conversation
913
from codegate.db.connection import DbReader
1014

1115
logger = structlog.get_logger("codegate")
@@ -28,3 +32,18 @@ async def get_messages() -> List[Conversation]:
2832

2933
conversations = await match_conversations(partial_conversations)
3034
return conversations
35+
36+
37+
@dashboard_router.get("/dashboard/alerts")
38+
async def get_alerts() -> List[AlertConversation]:
39+
"""
40+
Get all the messages from the database and return them as a list of conversations.
41+
"""
42+
alerts_prompt_output = await db_reader.get_alerts_with_prompt_and_output()
43+
44+
# Parse the prompts and outputs in parallel
45+
async with asyncio.TaskGroup() as tg:
46+
tasks = [tg.create_task(parse_get_alert_conversation(row)) for row in alerts_prompt_output]
47+
alert_conversations = [task.result() for task in tasks if task.result() is not None]
48+
49+
return alert_conversations

src/codegate/dashboard/post_processing.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import asyncio
22
import json
3-
from typing import List, Optional, Tuple
3+
from typing import List, Optional, Tuple, Union
44

55
import structlog
66

77
from codegate.dashboard.request_models import (
8+
AlertConversation,
89
ChatMessage,
910
Conversation,
1011
PartialConversation,
1112
QuestionAnswer,
1213
)
13-
from codegate.db.queries import GetPromptWithOutputsRow
14+
from codegate.db.queries import GetAlertsWithPromptAndOutputRow, GetPromptWithOutputsRow
1415

1516
logger = structlog.get_logger("codegate")
1617

@@ -40,7 +41,7 @@ async def parse_request(request_str: str) -> Optional[str]:
4041
try:
4142
request = json.loads(request_str)
4243
except Exception as e:
43-
logger.exception(f"Error parsing request: {e}")
44+
logger.warning(f"Error parsing request: {request_str}. {e}")
4445
return None
4546

4647
messages = []
@@ -82,7 +83,7 @@ async def parse_output(output_str: str) -> Tuple[Optional[str], Optional[str]]:
8283
try:
8384
output = json.loads(output_str)
8485
except Exception as e:
85-
logger.exception(f"Error parsing request: {e}")
86+
logger.warning(f"Error parsing output: {output_str}. {e}")
8687
return None, None
8788

8889
output_message = ""
@@ -107,9 +108,9 @@ async def parse_output(output_str: str) -> Tuple[Optional[str], Optional[str]]:
107108
return output_message, chat_id
108109

109110

110-
async def parse_get_prompt_with_output(
111-
row: GetPromptWithOutputsRow,
112-
) -> Optional[PartialConversation]:
111+
async def _get_question_answer(
112+
row: Union[GetPromptWithOutputsRow, GetAlertsWithPromptAndOutputRow]
113+
) -> Tuple[Optional[QuestionAnswer], Optional[str]]:
113114
"""
114115
Parse a row from the get_prompt_with_outputs query and return a PartialConversation
115116
@@ -124,7 +125,7 @@ async def parse_get_prompt_with_output(
124125

125126
# If we couldn't parse the request or output, return None
126127
if not request_msg_str or not output_msg_str or not chat_id:
127-
return None
128+
return None, None
128129

129130
request_message = ChatMessage(
130131
message=request_msg_str,
@@ -136,10 +137,20 @@ async def parse_get_prompt_with_output(
136137
timestamp=row.output_timestamp,
137138
message_id=row.output_id,
138139
)
139-
question_answer = QuestionAnswer(
140-
question=request_message,
141-
answer=output_message,
142-
)
140+
return QuestionAnswer(question=request_message, answer=output_message), chat_id
141+
142+
143+
async def parse_get_prompt_with_output(
144+
row: GetPromptWithOutputsRow,
145+
) -> Optional[PartialConversation]:
146+
"""
147+
Parse a row from the get_prompt_with_outputs query and return a PartialConversation
148+
149+
The row contains the raw request and output strings from the pipeline.
150+
"""
151+
question_answer, chat_id = await _get_question_answer(row)
152+
if not question_answer or not chat_id:
153+
return None
143154
return PartialConversation(
144155
question_answer=question_answer,
145156
provider=row.provider,
@@ -187,3 +198,34 @@ async def match_conversations(
187198
)
188199

189200
return conversations
201+
202+
203+
async def parse_get_alert_conversation(
204+
row: GetAlertsWithPromptAndOutputRow,
205+
) -> Optional[AlertConversation]:
206+
"""
207+
Parse a row from the get_alerts_with_prompt_and_output query and return a Conversation
208+
209+
The row contains the raw request and output strings from the pipeline.
210+
"""
211+
question_answer, chat_id = await _get_question_answer(row)
212+
if not question_answer or not chat_id:
213+
return None
214+
215+
conversation = Conversation(
216+
question_answers=[question_answer],
217+
provider=row.provider,
218+
type=row.type,
219+
chat_id=chat_id or "chat-id-not-found",
220+
conversation_timestamp=row.timestamp,
221+
)
222+
code_snippet = json.loads(row.code_snippet) if row.code_snippet else None
223+
return AlertConversation(
224+
conversation=conversation,
225+
alert_id=row.id,
226+
code_snippet=code_snippet,
227+
trigger_string=row.trigger_string,
228+
trigger_type=row.trigger_type,
229+
trigger_category=row.trigger_category,
230+
timestamp=row.timestamp,
231+
)

src/codegate/dashboard/request_models.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
from pydantic import BaseModel
55

6+
from codegate.pipeline.base import CodeSnippet
7+
68

79
class ChatMessage(BaseModel):
810
"""
@@ -45,3 +47,17 @@ class Conversation(BaseModel):
4547
type: str
4648
chat_id: str
4749
conversation_timestamp: datetime.datetime
50+
51+
52+
class AlertConversation(BaseModel):
53+
"""
54+
Represents an alert with it's respective conversation.
55+
"""
56+
57+
conversation: Conversation
58+
alert_id: str
59+
code_snippet: Optional[CodeSnippet]
60+
trigger_string: Optional[str]
61+
trigger_type: str
62+
trigger_category: Optional[str]
63+
timestamp: datetime.datetime

0 commit comments

Comments
 (0)