Skip to content

Commit ac6f73a

Browse files
kimyoungi99vincbeck
authored andcommitted
Fix race condition in auth manager initialization (#62431)
FAB FastAPI routes call get_application_builder() on every request, which creates a new Flask app and invokes init_app(). Concurrent calls race on the singleton auth_manager's appbuilder and security_manager, causing KeyError: 'AUTH_USER_REGISTRATION' and AttributeError. Add _init_app_lock around the critical section in init_app() that mutates the singleton auth_manager state and registers views, so concurrent get_application_builder() calls are serialized.
1 parent 430a3f2 commit ac6f73a

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

airflow-core/src/airflow/api_fastapi/app.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import logging
20+
import threading
2021
from contextlib import AsyncExitStack, asynccontextmanager
2122
from typing import TYPE_CHECKING, cast
2223
from urllib.parse import urlsplit
@@ -64,8 +65,10 @@ def get_cookie_path() -> str:
6465

6566
log = logging.getLogger(__name__)
6667

67-
app: FastAPI | None = None
68-
auth_manager: BaseAuthManager | None = None
68+
69+
class _AuthManagerState:
70+
instance: BaseAuthManager | None = None
71+
_lock = threading.Lock()
6972

7073

7174
@asynccontextmanager
@@ -149,11 +152,14 @@ def get_auth_manager_cls() -> type[BaseAuthManager]:
149152

150153

151154
def create_auth_manager() -> BaseAuthManager:
152-
"""Create the auth manager."""
153-
global auth_manager
155+
"""Create the auth manager, cached as a thread-safe singleton."""
154156
auth_manager_cls = get_auth_manager_cls()
155-
auth_manager = auth_manager_cls()
156-
return auth_manager
157+
if _AuthManagerState.instance is not None and isinstance(_AuthManagerState.instance, auth_manager_cls):
158+
return _AuthManagerState.instance
159+
with _AuthManagerState._lock:
160+
if _AuthManagerState.instance is None or not isinstance(_AuthManagerState.instance, auth_manager_cls):
161+
_AuthManagerState.instance = auth_manager_cls()
162+
return _AuthManagerState.instance
157163

158164

159165
def init_auth_manager(app: FastAPI | None = None) -> BaseAuthManager:

airflow-core/tests/unit/api_fastapi/test_app.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import threading
1920
from unittest import mock
2021

2122
import pytest
@@ -140,3 +141,37 @@ def test_nested_subpath(self):
140141
"""When base_url contains a nested subpath, get_cookie_path() should return it."""
141142
with mock.patch.object(app_module, "API_ROOT_PATH", "/org/team-a/airflow/"):
142143
assert app_module.get_cookie_path() == "/org/team-a/airflow/"
144+
145+
146+
def test_create_auth_manager_thread_safety():
147+
"""Concurrent calls to create_auth_manager must return the same singleton instance."""
148+
call_count = 0
149+
singleton = None
150+
151+
class FakeAuthManager:
152+
def __init__(self):
153+
nonlocal call_count, singleton
154+
call_count += 1
155+
singleton = self
156+
157+
app_module.purge_cached_app()
158+
159+
results = []
160+
barrier = threading.Barrier(10)
161+
162+
def call_create_auth_manager():
163+
barrier.wait()
164+
results.append(app_module.create_auth_manager())
165+
166+
with mock.patch.object(app_module, "get_auth_manager_cls", return_value=FakeAuthManager):
167+
threads = [threading.Thread(target=call_create_auth_manager) for _ in range(10)]
168+
for t in threads:
169+
t.start()
170+
for t in threads:
171+
t.join()
172+
173+
assert len(results) == 10
174+
assert all(r is singleton for r in results)
175+
assert call_count == 1
176+
177+
app_module.purge_cached_app()

providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import logging
22+
import threading
2223
from functools import reduce
2324
from typing import TYPE_CHECKING
2425

@@ -57,6 +58,8 @@
5758
# This module contains code imported from FlaskAppbuilder, so lets use _its_ logger name
5859
log = logging.getLogger("flask_appbuilder.base")
5960

61+
_init_app_lock = threading.Lock()
62+
6063

6164
def dynamic_class_import(class_path):
6265
"""
@@ -197,19 +200,23 @@ def init_app(self, app, session):
197200
self._addon_managers = app.config["ADDON_MANAGERS"]
198201
self.session = session
199202
auth_manager = create_auth_manager()
200-
auth_manager.appbuilder = self
201-
if hasattr(auth_manager, "init_flask_resources"):
202-
auth_manager.init_flask_resources()
203-
if hasattr(auth_manager, "security_manager"):
204-
self.sm = auth_manager.security_manager
205-
else:
206-
self.sm = AirflowSecurityManagerV2(self)
207-
self.bm = BabelManager(self)
208-
self._add_global_static()
209-
self._add_global_filters()
210-
app.before_request(self.sm.before_request)
211-
self._add_admin_views()
212-
self._add_addon_views()
203+
with _init_app_lock:
204+
auth_manager.appbuilder = self
205+
# Invalidate cached security_manager so it binds to the current Flask app.
206+
if "security_manager" in auth_manager.__dict__:
207+
del auth_manager.__dict__["security_manager"]
208+
if hasattr(auth_manager, "init_flask_resources"):
209+
auth_manager.init_flask_resources()
210+
if hasattr(auth_manager, "security_manager"):
211+
self.sm = auth_manager.security_manager
212+
else:
213+
self.sm = AirflowSecurityManagerV2(self)
214+
self.bm = BabelManager(self)
215+
self._add_global_static()
216+
self._add_global_filters()
217+
app.before_request(self.sm.before_request)
218+
self._add_admin_views()
219+
self._add_addon_views()
213220
self._init_extension(app)
214221
self._swap_url_filter()
215222

0 commit comments

Comments
 (0)