Skip to content

Commit 2ddd83e

Browse files
committed
tech: add async runner celery
1 parent 0ffa0f4 commit 2ddd83e

File tree

16 files changed

+292
-7
lines changed

16 files changed

+292
-7
lines changed

.github/workflows/deploy-to-scaleway.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,16 @@ jobs:
3838
source .env || exit 1
3939
set +a # Disable auto-export of variables
4040
41+
xargs -a Aptfile sudo apt install -y || exit 1
42+
43+
sudo systemctl start redis-server || exit 1
44+
sudo systemctl enable redis-server || exit 1
45+
4146
source venv/bin/activate || exit 1
4247
python -m pip install -r requirements.txt || exit 1
4348
python manage.py migrate || exit 1
49+
50+
python -m celery -A aigle worker --loglevel=info || exit 1
4451
sudo systemctl restart gunicorn_aigle || exit 1
4552
4653
echo "Deployment completed successfully!"

.vscode/settings.json

Lines changed: 0 additions & 7 deletions
This file was deleted.

Aptfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
gdal-bin
22
libgdal-dev
33
python3-gdal
4+
redis-server

aigle/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .celery import app as celery_app
2+
3+
__all__ = ("celery_app",)

aigle/celery.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import os
2+
from celery import Celery
3+
4+
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "aigle.settings")
5+
6+
app = Celery("aigle")
7+
app.config_from_object("django.conf:settings", namespace="CELERY")
8+
app.autodiscover_tasks()

aigle/settings.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import logging_loki # noqa: F401
2121
from core.utils.logs import scaleway_logger # noqa: F401
2222

23+
from celery import Celery # noqa: F401
24+
2325
DEPLOYMENT_DATETIME = datetime.now()
2426

2527
# Build paths inside the project like this: BASE_DIR / 'subdir'.
@@ -339,3 +341,17 @@
339341
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
340342

341343
MEDIA_ROOT = os.path.join(BASE_DIR, "media")
344+
345+
346+
# Celery Configuration
347+
CELERY_BROKER_URL = "redis://localhost:6379/0"
348+
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
349+
CELERY_ACCEPT_CONTENT = ["json"]
350+
CELERY_TASK_SERIALIZER = "json"
351+
CELERY_RESULT_SERIALIZER = "json"
352+
CELERY_TIMEZONE = "UTC"
353+
354+
# Optional: Task routing
355+
CELERY_ROUTES = {
356+
"myapp.tasks.run_management_command": {"queue": "management_commands"},
357+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# management/commands/run_async.py
2+
3+
from typing import Any, List, Optional
4+
from django.core.management.base import BaseCommand, CommandParser
5+
6+
from core.utils.tasks import AsyncCommandService
7+
8+
9+
class Command(BaseCommand):
10+
help = "Run Django management commands asynchronously"
11+
12+
def add_arguments(self, parser: CommandParser) -> None:
13+
parser.add_argument("command", type=str, help="Command to run")
14+
parser.add_argument("--args", nargs="*", default=[], help="Command arguments")
15+
parser.add_argument("--wait", action="store_true", help="Wait for completion")
16+
17+
def handle(self, *args: Any, **options: Any) -> None:
18+
command_name: str = options["command"]
19+
command_args: List[str] = options.get("args", [])
20+
wait: bool = options.get("wait", False)
21+
22+
task_id: str = AsyncCommandService.run_command_async(
23+
command_name, *command_args
24+
)
25+
self.stdout.write(f"Task started with ID: {task_id}")
26+
27+
if wait:
28+
self.stdout.write("Waiting for completion...")
29+
result: Optional[Any] = AsyncCommandService.get_task_result(task_id)
30+
while result is None:
31+
import time
32+
33+
time.sleep(1)
34+
result = AsyncCommandService.get_task_result(task_id)
35+
36+
self.stdout.write("Task completed:")
37+
self.stdout.write(str(result))

core/serializers/run_command.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from rest_framework import serializers
2+
3+
4+
class RunCommandSerializer(serializers.Serializer):
5+
command = serializers.CharField(max_length=100, required=True)
6+
args = serializers.ListField(
7+
child=serializers.CharField(), required=False, default=list
8+
)
9+
kwargs = serializers.DictField(required=False, default=dict)
10+
11+
def validate_command(self, value: str) -> str:
12+
if not value.strip():
13+
raise serializers.ValidationError("Command name cannot be empty")
14+
return value.strip()
15+
16+
17+
class TaskStatusSerializer(serializers.Serializer):
18+
task_id = serializers.CharField(read_only=True)
19+
status = serializers.CharField(read_only=True)
20+
result = serializers.JSONField(read_only=True, allow_null=True)
21+
traceback = serializers.CharField(read_only=True, allow_null=True)
22+
23+
24+
class CancelTaskSerializer(serializers.Serializer):
25+
task_id = serializers.CharField(required=True)
26+
27+
def validate_task_id(self, value: str) -> str:
28+
if not value.strip():
29+
raise serializers.ValidationError("Task ID cannot be empty")
30+
return value.strip()

core/urls.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from core.views.object_type import ObjectTypeViewSet
1212
from core.views.object_type_category import ObjectTypeCategoryViewSet
1313
from core.views.parcel import ParcelViewSet
14+
from core.views.run_command import CommandAsyncViewSet
1415
from core.views.statistics.validation_status_evolution import (
1516
StatisticsValidationStatusEvolutionView,
1617
)
@@ -62,6 +63,7 @@
6263
"detection-object", DetectionObjectViewSet, basename="DetectionObjectViewSet"
6364
)
6465
router.register("detection-data", DetectionDataViewSet, basename="DetectionDataViewSet")
66+
router.register("run-command", CommandAsyncViewSet, basename="CommandAsyncViewSet")
6567

6668
urlpatterns = router.urls
6769

core/utils/tasks.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from io import StringIO
2+
from celery import shared_task
3+
from celery.result import AsyncResult
4+
from django.core.management import call_command
5+
from django.core.management.base import CommandError
6+
from typing import Dict, Any, Optional, Union
7+
8+
9+
@shared_task(bind=True)
10+
def run_management_command(
11+
self, command_name: str, *args: Any, **kwargs: Any
12+
) -> Dict[str, Union[str, Any]]:
13+
try:
14+
output = StringIO()
15+
call_command(command_name, *args, stdout=output, stderr=output, **kwargs)
16+
return {
17+
"status": "success",
18+
"output": output.getvalue(),
19+
"task_id": self.request.id,
20+
}
21+
except CommandError as e:
22+
return {"status": "error", "error": str(e), "task_id": self.request.id}
23+
except Exception as e:
24+
self.retry(countdown=60, max_retries=3)
25+
return {
26+
"status": "error",
27+
"error": f"Unexpected error: {str(e)}",
28+
"task_id": self.request.id,
29+
}
30+
31+
32+
@shared_task
33+
def run_custom_command(command_name: str, **options: Any) -> str:
34+
output = StringIO()
35+
try:
36+
call_command(command_name, stdout=output, stderr=output, **options)
37+
return output.getvalue()
38+
except Exception as e:
39+
return f"Error: {str(e)}"
40+
41+
42+
class AsyncCommandService:
43+
@staticmethod
44+
def run_command_async(command_name: str, *args: Any, **kwargs: Any) -> str:
45+
task = run_management_command.delay(command_name, *args, **kwargs)
46+
return task.id
47+
48+
@staticmethod
49+
def get_task_status(task_id: str) -> Dict[str, Union[str, Any, None]]:
50+
result = AsyncResult(task_id)
51+
return {
52+
"task_id": task_id,
53+
"status": result.status,
54+
"result": result.result if result.ready() else None,
55+
"traceback": result.traceback if result.failed() else None,
56+
}
57+
58+
@staticmethod
59+
def get_task_result(task_id: str) -> Optional[Any]:
60+
result = AsyncResult(task_id)
61+
if result.ready():
62+
return result.result
63+
return None
64+
65+
@staticmethod
66+
def cancel_task(task_id: str) -> bool:
67+
result = AsyncResult(task_id)
68+
result.revoke(terminate=True)
69+
return True

0 commit comments

Comments
 (0)