Skip to content

Commit a46a8f8

Browse files
fix: add test for celery task registration on import
1 parent 15e0b74 commit a46a8f8

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

add_test.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
content = open('providers/celery/tests/unit/celery/executors/test_celery_executor.py', encoding='utf-8').read()
2+
new_test = '''
3+
4+
def test_celery_tasks_registered_on_import():
5+
"""
6+
Ensure execute_workload (and execute_command for Airflow 2.x) are registered
7+
with the Celery app when celery_executor is imported.
8+
9+
Regression test for https://github.com/apache/airflow/issues/63043
10+
Celery provider 3.17.0 exposed that celery_executor_utils was never imported
11+
at module level, so tasks were never registered at worker startup.
12+
"""
13+
from airflow.providers.celery.executors.celery_executor_utils import app
14+
15+
registered_tasks = list(app.tasks.keys())
16+
assert "execute_workload" in registered_tasks, (
17+
"execute_workload must be registered with the Celery app at import time. "
18+
"Workers need this to receive tasks without KeyError."
19+
)
20+
if not AIRFLOW_V_3_0_PLUS:
21+
assert "execute_command" in registered_tasks, (
22+
"execute_command must be registered for Airflow 2.x compatibility."
23+
)
24+
'''
25+
result = content + new_test
26+
open('providers/celery/tests/unit/celery/executors/test_celery_executor.py', 'w', encoding='utf-8', newline='\n').write(result)
27+
print('Done')

providers/celery/tests/unit/celery/executors/test_celery_executor.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,3 +687,25 @@ def test_task_routing_through_team_specific_app(self, mock_send_tasks, monkeypat
687687
# Critical: task belongs to team A's app, not module-level app
688688
assert task_from_call.app is team_a_executor.celery_app
689689
assert task_from_call.name == "execute_command"
690+
691+
692+
def test_celery_tasks_registered_on_import():
693+
"""
694+
Ensure execute_workload (and execute_command for Airflow 2.x) are registered
695+
with the Celery app when celery_executor is imported.
696+
697+
Regression test for https://github.com/apache/airflow/issues/63043
698+
Celery provider 3.17.0 exposed that celery_executor_utils was never imported
699+
at module level, so tasks were never registered at worker startup.
700+
"""
701+
from airflow.providers.celery.executors.celery_executor_utils import app
702+
703+
registered_tasks = list(app.tasks.keys())
704+
assert "execute_workload" in registered_tasks, (
705+
"execute_workload must be registered with the Celery app at import time. "
706+
"Workers need this to receive tasks without KeyError."
707+
)
708+
if not AIRFLOW_V_3_0_PLUS:
709+
assert "execute_command" in registered_tasks, (
710+
"execute_command must be registered for Airflow 2.x compatibility."
711+
)

0 commit comments

Comments
 (0)