Skip to content

Commit 28e0e4a

Browse files
kalluripradeepdominikhei
authored andcommitted
fix: ensure Celery tasks are registered at worker startup (main) (apache#63110)
* fix: ensure Celery tasks are registered at worker startup Fixes: apache#63043 * fix: add test for celery task registration on import * remove temporary fix scripts * chore: add TODO comment for execute_command removal when min Airflow >= 3 * fix: correct indentation of TODO comment in test
1 parent a57fba4 commit 28e0e4a

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

providers/celery/src/airflow/providers/celery/executors/celery_executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939

4040
from airflow.exceptions import AirflowProviderDeprecationWarning
4141
from airflow.executors.base_executor import BaseExecutor
42+
from airflow.providers.celery.executors import (
43+
celery_executor_utils as _celery_executor_utils, # noqa: F401 # Needed to register Celery tasks at worker startup, see #63043
44+
)
4245
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
4346
from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats
4447
from airflow.utils.state import TaskInstanceState

providers/celery/tests/unit/celery/executors/test_celery_executor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,3 +687,26 @@ def test_task_routing_through_team_specific_app(self, mock_send_tasks, monkeypat
687687
# Critical: task belongs to team A's app, not module-level app
688688
assert task_from_call.app is team_a_executor.celery_app
689689
assert task_from_call.name == "execute_command"
690+
691+
692+
def test_celery_tasks_registered_on_import():
693+
"""
694+
Ensure execute_workload (and execute_command for Airflow 2.x) are registered
695+
with the Celery app when celery_executor is imported.
696+
697+
Regression test for https://github.com/apache/airflow/issues/63043
698+
Celery provider 3.17.0 exposed that celery_executor_utils was never imported
699+
at module level, so tasks were never registered at worker startup.
700+
"""
701+
from airflow.providers.celery.executors.celery_executor_utils import app
702+
703+
registered_tasks = list(app.tasks.keys())
704+
assert "execute_workload" in registered_tasks, (
705+
"execute_workload must be registered with the Celery app at import time. "
706+
"Workers need this to receive tasks without KeyError."
707+
)
708+
# TODO: remove this block when min supported Airflow version is >= 3.0
709+
if not AIRFLOW_V_3_0_PLUS:
710+
assert "execute_command" in registered_tasks, (
711+
"execute_command must be registered for Airflow 2.x compatibility."
712+
)

0 commit comments

Comments
 (0)