Skip to content

Commit 621b177

Browse files
dheerajturagaAnkurdeewan
authored andcommitted
Introduce EdgeDBManager: Independent Provider Specific Database Schema Management (apache#61155)
* Add EdgeDBManager for provider-specific database migrations Implement EdgeDBManager to integrate Edge3 provider with Airflow's external database manager system, enabling independent schema version control for Edge3 tables separate from core Airflow migrations. This enables Edge3 provider to manage its database schema evolution independently from core Airflow, allowing for provider-specific version control and migration management. The infrastructure is ready for use once initial migration files are generated and the legacy _check_db_schema() approach in EdgeExecutor.start() is removed. * Add table creation support to EdgeDBManager via Alembic migration Wire up the full initdb/create_db_from_orm lifecycle in EdgeDBManager so table creation is handled through proper Alembic migrations instead of ad-hoc metadata.create_all() in EdgeExecutor.start(). - Create initial Alembic migration (0001_3_0_0) for edge_worker, edge_job, and edge_logs tables with if_not_exists=True - Replace _check_db_schema() + metadata.create_all() in EdgeExecutor.start() with EdgeDBManager.initdb() - Remove legacy _check_db_schema() (Airflow 2.x workaround) - Add edge3 support to check_revision_heads_map.py pre-commit script - Add check-revision-heads-map-edge3 pre-commit hook - Rename migrations README to README.md - Add tests for create_db_from_orm, initdb, and revision_heads_map * Add missing test file for edge3 migrations env module The test_project_structure check requires every provider source module to have a corresponding test file. Add test_env.py for the migrations env.py module with tests for version table name and metadata contents. * Skip DB lock acquisition when edge3 migrations are already current Add a fast-path check_migration() call before acquiring the global advisory lock in EdgeExecutor.start(). This avoids unnecessary lock contention when multiple API server instances start simultaneously and the database is already at the target migration state. * Add session-scoped fixture to create edge3 tables for tests Since edge3 tables are managed via separate _edge_metadata (removed from Base.metadata), the test framework's initdb no longer creates them. Add a session-scoped autouse fixture in conftest.py that creates the tables once for all edge3 tests. * Fix edge3 test failures - Guard _create_edge_tables conftest fixture against settings.engine being None when non-DB tests (CLI) run in the same session - Patch _check_valid_db_connection in test_list_edge_workers since the test env sql_alchemy_conn equals the default value * Jens suggestions
1 parent 2387269 commit 621b177

File tree

16 files changed

+878
-55
lines changed

16 files changed

+878
-55
lines changed

providers/edge3/.pre-commit-config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ repos:
4646
additional_dependencies: ['pnpm@10.25.0']
4747
pass_filenames: true
4848
require_serial: true
49+
- id: check-revision-heads-map-edge3
50+
name: Check that the REVISION_HEADS_MAP is up-to-date
51+
language: python
52+
entry: ../../scripts/ci/prek/check_revision_heads_map.py
53+
pass_filenames: false
54+
files: >
55+
(?x)
56+
^src/airflow/providers/edge3/migrations/versions/.*$|
57+
^src/airflow/providers/edge3/migrations/versions|
58+
^src/airflow/providers/edge3/models/db\.py$
4959
- id: compile-edge-assets
5060
name: Compile Edge provider assets
5161
language: node
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# A generic, single database configuration.
19+
20+
[alembic]
21+
# path to migration scripts
22+
# Use forward slashes (/) also on windows to provide an os agnostic path
23+
script_location = %(here)s/migrations
24+
25+
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
26+
# Uncomment the line below if you want the files to be prepended with date and time
27+
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
28+
# for all available tokens
29+
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
30+
31+
# sys.path path, will be prepended to sys.path if present.
32+
# defaults to the current working directory.
33+
prepend_sys_path = .
34+
35+
# timezone to use when rendering the date within the migration file
36+
# as well as the filename.
37+
# If specified, requires the python>=3.9 or backports.zoneinfo library.
38+
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
39+
# string value is passed to ZoneInfo()
40+
# leave blank for localtime
41+
# timezone =
42+
43+
# max length of characters to apply to the "slug" field
44+
# truncate_slug_length = 40
45+
46+
# set to 'true' to run the environment during
47+
# the 'revision' command, regardless of autogenerate
48+
# revision_environment = false
49+
50+
# set to 'true' to allow .pyc and .pyo files without
51+
# a source .py file to be detected as revisions in the
52+
# versions/ directory
53+
# sourceless = false
54+
55+
# version location specification; This defaults
56+
# to alembic/versions. When using multiple version
57+
# directories, initial revisions must be specified with --version-path.
58+
# The path separator used here should be the separator specified by "version_path_separator" below.
59+
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
60+
61+
# version path separator; As mentioned above, this is the character used to split
62+
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
63+
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
64+
# Valid values for version_path_separator are:
65+
#
66+
# version_path_separator = :
67+
# version_path_separator = ;
68+
# version_path_separator = space
69+
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
70+
71+
# set to 'true' to search source files recursively
72+
# in each "version_locations" directory
73+
# new in Alembic version 1.10
74+
# recursive_version_locations = false
75+
76+
# the output encoding used when revision files
77+
# are written from script.py.mako
78+
# output_encoding = utf-8
79+
80+
sqlalchemy.url = scheme://localhost/airflow
81+
82+
83+
[post_write_hooks]
84+
# post_write_hooks defines scripts or Python functions that are run
85+
# on newly generated revision scripts. See the documentation for further
86+
# detail and examples
87+
88+
# format using "black" - use the console_scripts runner, against the "black" entrypoint
89+
# hooks = black
90+
# black.type = console_scripts
91+
# black.entrypoint = black
92+
# black.options = -l 79 REVISION_SCRIPT_FILENAME
93+
94+
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
95+
# hooks = ruff
96+
# ruff.type = exec
97+
# ruff.executable = %(here)s/.venv/bin/ruff
98+
# ruff.options = --fix REVISION_SCRIPT_FILENAME
99+
100+
# Logging configuration
101+
[loggers]
102+
keys = root,sqlalchemy,alembic
103+
104+
[handlers]
105+
keys = console
106+
107+
[formatters]
108+
keys = generic
109+
110+
[logger_root]
111+
level = WARN
112+
handlers = console
113+
qualname =
114+
115+
[logger_sqlalchemy]
116+
level = WARN
117+
handlers =
118+
qualname = sqlalchemy.engine
119+
120+
[logger_alembic]
121+
level = INFO
122+
handlers =
123+
qualname = alembic
124+
125+
[handler_console]
126+
class = StreamHandler
127+
args = (sys.stderr,)
128+
level = NOTSET
129+
formatter = generic
130+
131+
[formatter_generic]
132+
format = %(levelname)-5.5s [%(name)s] %(message)s
133+
datefmt = %H:%M:%S

providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717

1818
from __future__ import annotations
1919

20-
import contextlib
2120
from collections.abc import Sequence
2221
from copy import deepcopy
2322
from datetime import datetime, timedelta
2423
from typing import TYPE_CHECKING, Any
2524

26-
from sqlalchemy import delete, inspect, select, text
27-
from sqlalchemy.exc import NoSuchTableError
28-
from sqlalchemy.orm import Session
25+
from sqlalchemy import delete, select
2926

3027
from airflow.configuration import conf
3128
from airflow.executors import workloads
3229
from airflow.executors.base_executor import BaseExecutor
3330
from airflow.models.taskinstance import TaskInstance
3431
from airflow.providers.common.compat.sdk import Stats, timezone
32+
from airflow.providers.edge3.models.db import EdgeDBManager
3533
from airflow.providers.edge3.models.edge_job import EdgeJobModel
3634
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
3735
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics
@@ -40,7 +38,7 @@
4038
from airflow.utils.state import TaskInstanceState
4139

4240
if TYPE_CHECKING:
43-
from sqlalchemy.engine.base import Engine
41+
from sqlalchemy.orm import Session
4442

4543
from airflow.cli.cli_config import GroupCommand
4644
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -61,56 +59,15 @@ def __init__(self, parallelism: int = PARALLELISM):
6159
super().__init__(parallelism=parallelism)
6260
self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
6361

64-
def _check_db_schema(self, engine: Engine) -> None:
65-
"""
66-
Check if already existing table matches the newest table schema.
67-
68-
workaround as Airflow 2.x had no support for provider DB migrations,
69-
then it is possible to use alembic also for provider distributions.
70-
71-
TODO(jscheffl): Change to alembic DB migrations in the future.
72-
"""
73-
inspector = inspect(engine)
74-
edge_job_columns = None
75-
edge_job_command_len = None
76-
with contextlib.suppress(NoSuchTableError):
77-
edge_job_schema = inspector.get_columns("edge_job")
78-
edge_job_columns = [column["name"] for column in edge_job_schema]
79-
for column in edge_job_schema:
80-
if column["name"] == "command":
81-
edge_job_command_len = column["type"].length # type: ignore[attr-defined]
82-
83-
# version 0.6.0rc1 added new column concurrency_slots
84-
if edge_job_columns and "concurrency_slots" not in edge_job_columns:
85-
EdgeJobModel.metadata.drop_all(engine, tables=[EdgeJobModel.__table__])
86-
87-
# version 1.1.0 the command column was changed to VARCHAR(2048)
88-
elif edge_job_command_len and edge_job_command_len != 2048:
89-
with Session(engine) as session:
90-
query = "ALTER TABLE edge_job ALTER COLUMN command TYPE VARCHAR(2048);"
91-
session.execute(text(query))
92-
session.commit()
93-
94-
edge_worker_columns = None
95-
with contextlib.suppress(NoSuchTableError):
96-
edge_worker_columns = [column["name"] for column in inspector.get_columns("edge_worker")]
97-
98-
# version 0.14.0pre0 added new column maintenance_comment
99-
if edge_worker_columns and "maintenance_comment" not in edge_worker_columns:
100-
with Session(engine) as session:
101-
query = "ALTER TABLE edge_worker ADD maintenance_comment VARCHAR(1024);"
102-
session.execute(text(query))
103-
session.commit()
104-
10562
@provide_session
10663
def start(self, session: Session = NEW_SESSION):
10764
"""If EdgeExecutor provider is loaded first time, ensure table exists."""
65+
edge_db_manager = EdgeDBManager(session)
66+
if edge_db_manager.check_migration():
67+
return
68+
10869
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
109-
engine = session.get_bind().engine
110-
self._check_db_schema(engine)
111-
EdgeJobModel.metadata.create_all(engine)
112-
EdgeLogsModel.metadata.create_all(engine)
113-
EdgeWorkerModel.metadata.create_all(engine)
70+
edge_db_manager.initdb()
11471

11572
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
11673
"""
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
Edge3 provider database migrations using Alembic.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.

0 commit comments

Comments
 (0)