Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ langfuse = "*"
pandasai = "*"
colorlog = "*"
psycopg2-binary = "*"
psycopg2 = "*"
celery = {extras = ["redis", "sqs"], version = "*"}
unstructured = {extras = ["all-docs"], version = "*"}
unstructured = "*"
llama-parse = "*"
llama-index = "*"
lxml = {extras = ["html_clean"], version = "*"}
Expand All @@ -71,6 +70,8 @@ google-api-python-client = "*"
google-auth-httplib2 = "*"
google-auth-oauthlib = "*"
msal = "*"
llama-index-postprocessor-cohere-rerank = "*"
megaparse = "*"

[dev-packages]
black = "*"
Expand Down
1,487 changes: 612 additions & 875 deletions Pipfile.lock

Large diffs are not rendered by default.

116 changes: 116 additions & 0 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
import asyncio
import os
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import List
from uuid import UUID

from celery.schedules import crontab
from celery_config import celery
from fastapi import UploadFile
from logger import get_logger
from middlewares.auth.auth_bearer import AuthBearer
from models.files import File
from models.settings import get_supabase_client, get_supabase_db
from modules.assistant.dto.inputs import InputAssistant
from modules.assistant.ito.difference_assistant import DifferenceAssistant
from modules.assistant.ito.summary import SummaryAssistant
from modules.brain.integrations.Notion.Notion_connector import NotionConnector
from modules.brain.service.brain_service import BrainService
from modules.brain.service.brain_vector_service import BrainVectorService
from modules.notification.dto.inputs import NotificationUpdatableProperties
from modules.notification.entity.notification import NotificationsStatusEnum
from modules.notification.service.notification_service import NotificationService
from modules.onboarding.service.onboarding_service import OnboardingService
from modules.user.entity.user_identity import UserIdentity
from packages.files.crawl.crawler import CrawlWebsite, slugify
from packages.files.parsers.github import process_github
from packages.files.processors import filter_file
Expand Down Expand Up @@ -302,6 +310,110 @@ def check_if_is_premium_user():
return True


@celery.task(name="process_assistant_task")
def process_assistant_task(
input_in: str,
files_name: List[str],
current_user: dict,
notification_id=None,
) -> None:

logger.debug(f"Input: {input}")
logger.debug(type(input))
_input = InputAssistant.model_validate(input_in)
# _input = InputAssistant(**json.loads(input)) # type: ignore
# _input = InputAssistant(json.dumps(_input))
_current_user = UserIdentity(**current_user) # type: ignore
try:
files = []
supabase_client = get_supabase_client()

for file_name in files_name:
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)

with NamedTemporaryFile(suffix="_" + tmp_name, delete=False) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()

file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
upload_file = UploadFile(
filename=file_instance.file_name,
size=file_instance.file_size,
file=BytesIO(file_instance.bytes_content),
headers='{"content-type": "application/pdf"}', # type : ignore
)
files.append(upload_file)

except Exception as e:
logger.exception(e)
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file: {e}",
),
)
return
loop = asyncio.get_event_loop()

asyncio.set_event_loop(asyncio.new_event_loop())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this code ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a file_descriptor problem if i don't reset the loop


if _input.name.lower() == "summary":
summary_assistant = SummaryAssistant(
input=_input, files=files, current_user=_current_user
)
try:
summary_assistant.check_input()
loop.run_until_complete(summary_assistant.process_assistant())
except ValueError as e:
logger.error(f"ValueError in SummaryAssistant: {e}")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"Error in summary processing: {e}",
),
)
elif _input.name.lower() == "difference":
difference_assistant = DifferenceAssistant(
input=_input, files=files, current_user=_current_user
)
try:
difference_assistant.check_input()
loop.run_until_complete(difference_assistant.process_assistant())
except ValueError as e:
logger.error(f"ValueError in DifferenceAssistant: {e}")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"Error in difference processing: {e}",
),
)
else:
logger.error("Invalid assistant name provided.")
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description="Invalid assistant name provided.",
),
)


celery.conf.beat_schedule = {
"remove_onboarding_more_than_x_days_task": {
"task": f"{__name__}.remove_onboarding_more_than_x_days_task",
Expand All @@ -319,4 +431,8 @@ def check_if_is_premium_user():
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_assistant": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the assistant need to run every minute ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

"task": "process_assistant_task",
"schedule": crontab(minute="*/1", hour="*"),
},
}
47 changes: 22 additions & 25 deletions backend/modules/assistant/controller/assistant_routes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from typing import List

from fastapi import APIRouter, Depends, HTTPException, UploadFile
from celery_worker import process_assistant_task
from fastapi import APIRouter, Depends, UploadFile
from logger import get_logger
from middlewares.auth import AuthBearer, get_current_user
from modules.assistant.dto.inputs import InputAssistant
from modules.assistant.dto.outputs import AssistantOutput
from modules.assistant.ito.difference import DifferenceAssistant
from modules.assistant.ito.summary import SummaryAssistant, summary_inputs
from modules.assistant.ito.difference_assistant import difference_inputs
from modules.assistant.ito.summary import summary_inputs
from modules.assistant.service.assistant import Assistant
from modules.notification.service.notification_service import NotificationService
from modules.upload.service.upload_file import upload_file_storage
from modules.user.entity.user_identity import UserIdentity

assistant_router = APIRouter()
logger = get_logger(__name__)

assistant_service = Assistant()
notification_service = NotificationService()


@assistant_router.get(
Expand All @@ -27,10 +31,10 @@ async def list_assistants(
"""

summary = summary_inputs()
# difference = difference_inputs()
difference = difference_inputs()
# crawler = crawler_inputs()
# audio_transcript = audio_transcript_inputs()
return [summary]
return [summary, difference]


@assistant_router.post(
Expand All @@ -40,25 +44,18 @@ async def list_assistants(
)
async def process_assistant(
input: InputAssistant,
files: List[UploadFile] = None,
files: List[UploadFile] = None, # type: ignore
current_user: UserIdentity = Depends(get_current_user),
):
if input.name.lower() == "summary":
summary_assistant = SummaryAssistant(
input=input, files=files, current_user=current_user
)
try:
summary_assistant.check_input()
return await summary_assistant.process_assistant()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
elif input.name.lower() == "difference":
difference_assistant = DifferenceAssistant(
input=input, files=files, current_user=current_user
)
try:
difference_assistant.check_input()
return await difference_assistant.process_assistant()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"message": "Assistant not found"}
files_names = []
for file in files:
file_content = await file.read()
upload_file_storage(file_content, str(file.filename), upsert="true")
files_names.append(file.filename)

process_assistant_task.delay(
input_in=input.model_dump_json(),
files_name=files_names,
current_user=current_user.model_dump(),
)
return {"message": "Assistant is working in the back"}
3 changes: 2 additions & 1 deletion backend/modules/assistant/dto/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ class InputAssistant(BaseModel):
@model_validator(mode="before")
@classmethod
def to_py_dict(cls, data):
return json.loads(data)
if isinstance(data, str):
return json.loads(data)
Loading