Skip to content

Commit 21be84d

Browse files
committed
tech: fix list_tasks
1 parent acaa58a commit 21be84d

File tree

4 files changed

+92
-59
lines changed

4 files changed

+92
-59
lines changed

aigle/settings/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,12 @@
205205
CELERY_RESULT_SERIALIZER = "json"
206206
CELERY_TIMEZONE = "UTC"
207207

208+
# Task execution and monitoring settings
209+
CELERY_TASK_TRACK_STARTED = True # Track when task starts executing
210+
CELERY_TASK_SEND_SENT_EVENT = True # Send task-sent events
211+
CELERY_WORKER_SEND_TASK_EVENTS = True # Enable task events for monitoring
212+
CELERY_TASK_RESULT_EXPIRES = 3600 # Keep task results for 1 hour
213+
208214
# Optional: Task routing
209215
CELERY_ROUTES = {
210216
"myapp.tasks.run_management_command": {"queue": "management_commands"},

core/management/commands/test_cmd.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,18 @@ def handle(self, *args, **options):
4141
with connection.cursor() as cursor:
4242
test_table = "test_cmd"
4343
cursor.execute(
44-
f"CREATE TABLE IF NOT EXISTS public.{test_table} (id SERIAL PRIMARY KEY, created_at TIMESTAMP DEFAULT NOW(), args JSONB);"
44+
f"CREATE TABLE IF NOT EXISTS temp.{test_table} (id SERIAL PRIMARY KEY, created_at TIMESTAMP DEFAULT NOW(), args JSONB);"
4545
)
4646
cursor.execute(
47-
f"INSERT INTO public.{test_table} (data) VALUES ('{json.dumps(options)}');"
47+
f"INSERT INTO temp.{test_table} (args) VALUES ('{json.dumps({
48+
"test_str_required": test_str_required,
49+
"test_str_not_required": test_str_not_required,
50+
"test_bool_required": test_bool_required,
51+
"test_bool_not_required": test_bool_not_required,
52+
"test_int_required": test_int_required,
53+
"test_int_not_required": test_int_not_required,
54+
"test_array": test_array,
55+
})}');"
4856
)
4957

5058
waiting_sec = 120

core/serializers/run_command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ class TaskSerializer(serializers.Serializer):
3535
worker = serializers.CharField(read_only=True)
3636
status = serializers.CharField(read_only=True)
3737
eta = serializers.CharField(read_only=True, required=False, allow_null=True)
38+
time_start = serializers.CharField(read_only=True, required=False, allow_null=True)
39+
priority = serializers.IntegerField(read_only=True, required=False, allow_null=True)

core/utils/tasks.py

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -71,60 +71,77 @@ def cancel_task(task_id: str) -> bool:
7171
@staticmethod
7272
def get_all_tasks() -> List[Dict[str, Any]]:
7373
inspect = current_app.control.inspect()
74-
75-
# Get active tasks
76-
active_tasks = inspect.active()
77-
active_list = []
78-
79-
if active_tasks:
80-
for worker, tasks in active_tasks.items():
81-
for task in tasks:
82-
active_list.append(
83-
{
84-
"task_id": task["id"],
85-
"name": task["name"],
86-
"args": task["args"],
87-
"kwargs": task["kwargs"],
88-
"worker": worker,
89-
"status": "RUNNING",
90-
}
91-
)
92-
93-
# Get scheduled tasks
94-
scheduled_tasks = inspect.scheduled()
95-
scheduled_list = []
96-
97-
if scheduled_tasks:
98-
for worker, tasks in scheduled_tasks.items():
99-
for task in tasks:
100-
scheduled_list.append(
101-
{
102-
"task_id": task["request"]["id"],
103-
"name": task["request"]["task"],
104-
"args": task["request"]["args"],
105-
"kwargs": task["request"]["kwargs"],
106-
"worker": worker,
107-
"status": "SCHEDULED",
108-
"eta": task["eta"],
109-
}
110-
)
111-
112-
# Get reserved tasks
113-
reserved_tasks = inspect.reserved()
114-
reserved_list = []
115-
116-
if reserved_tasks:
117-
for worker, tasks in reserved_tasks.items():
118-
for task in tasks:
119-
reserved_list.append(
120-
{
121-
"task_id": task["id"],
122-
"name": task["name"],
123-
"args": task["args"],
124-
"kwargs": task["kwargs"],
125-
"worker": worker,
126-
"status": "RESERVED",
127-
}
128-
)
129-
130-
return active_list + scheduled_list + reserved_list
74+
all_tasks = []
75+
76+
try:
77+
# Get active tasks
78+
active_tasks = inspect.active()
79+
if active_tasks:
80+
for worker, tasks in active_tasks.items():
81+
for task in tasks:
82+
# Active tasks have a different structure - they contain the actual task request
83+
all_tasks.append(
84+
{
85+
"task_id": task.get("id")
86+
or task.get("uuid", "unknown"),
87+
"name": task.get("name") or task.get("type", "unknown"),
88+
"args": task.get("args", []),
89+
"kwargs": task.get("kwargs", {}),
90+
"worker": worker,
91+
"status": "RUNNING",
92+
"eta": task.get("eta"),
93+
"time_start": task.get("time_start"),
94+
}
95+
)
96+
except Exception as e:
97+
# Log the error but continue with other task types
98+
print(f"Error getting active tasks: {e}")
99+
100+
try:
101+
# Get scheduled tasks
102+
scheduled_tasks = inspect.scheduled()
103+
if scheduled_tasks:
104+
for worker, tasks in scheduled_tasks.items():
105+
for task in tasks:
106+
# Scheduled tasks have task info nested in "request" key
107+
request_info = task.get("request", {})
108+
all_tasks.append(
109+
{
110+
"task_id": request_info.get("id")
111+
or request_info.get("uuid", "unknown"),
112+
"name": request_info.get("task")
113+
or request_info.get("name", "unknown"),
114+
"args": request_info.get("args", []),
115+
"kwargs": request_info.get("kwargs", {}),
116+
"worker": worker,
117+
"status": "SCHEDULED",
118+
"eta": task.get("eta"),
119+
"priority": task.get("priority"),
120+
}
121+
)
122+
except Exception as e:
123+
print(f"Error getting scheduled tasks: {e}")
124+
125+
try:
126+
# Get reserved tasks (tasks that are queued but not yet executing)
127+
reserved_tasks = inspect.reserved()
128+
if reserved_tasks:
129+
for worker, tasks in reserved_tasks.items():
130+
for task in tasks:
131+
# Reserved tasks have similar structure to active tasks
132+
all_tasks.append(
133+
{
134+
"task_id": task.get("id")
135+
or task.get("uuid", "unknown"),
136+
"name": task.get("name") or task.get("type", "unknown"),
137+
"args": task.get("args", []),
138+
"kwargs": task.get("kwargs", {}),
139+
"worker": worker,
140+
"status": "RESERVED",
141+
"eta": task.get("eta"),
142+
}
143+
)
144+
except Exception as e:
145+
print(f"Error getting reserved tasks: {e}")
146+
147+
return all_tasks

0 commit comments

Comments
 (0)