AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646
AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646wjddn279 wants to merge 11 commits intoapache:mainfrom
Conversation
|
Not sure whether AI concluded this:
This is WRONG. jobs are only purged on completion. Not while being in the queue. So should not be a problem. I would doubt that different configurations per team are needed. I'd assume that in case Edge is needed that workers are started per team. Whereas central tables are shared. Shall we make the PR "draft" until questions are clear? Regarding the questions:
|
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
jscheffl
left a comment
There was a problem hiding this comment.
Blocking merge. Besides the changes I do not agree there is ZERO documentation being added how to make a multi-team setup possible. Also I assume DB Scheme must be adjusted - depending on if queues overlap.
dheerajturaga
left a comment
There was a problem hiding this comment.
Given that Edge executor is unique in persisting all state to shared tables (edge_job, edge_worker), the isolation approach needs to be discussed and agreed upon with the AIP-67 authors and the Edge provider maintainers before writing the implementation and not left as open questions in the PR body. I'd suggest starting that conversation first, since implementing a correct solution would involve modifying the table schemas for edge. What has been done for Celery can't be simply applied as is.
I see my message wasn't conveyed as I intended.
This shouldn't happen. The AI mistranslated it.🥲 Sorry for that. However, the cleanup process in edge_executor's multi-instance environment runs on all instances and performs checks on all rows (regardless of team). This can cause side effects because it runs at unintended intervals compared to when there's only one instance. I thought it would be better if it only ran for the jobs managed by that particular instance. Therefore, we needed a minimum unit to distinguish teams in that table (edge_job), and the best option I thought might be possible (without changing the table) was to distinguish by queue. That's why I left a separate question asking if that was actually feasible. I also tried implementing it in that direction for now. Of course, I'm open to changing the approach. |
|
Thank you for the review. Actually, I examined several cases but couldn't determine the exact direction, so I implemented it in what I thought was the best approach, and I'm of course willing to change it according to the decision. I should have checked first... I'll change the PR to draft. |
The
I believe this is the best answer. Let's not abuse queue again and implement this right 😄 |
|
@o-nikolas @jscheffl @dheerajturaga So to summarize your points — can I confirm that we've reached an agreement on adding a If so, I'll update the worker startup to allow specifying a team_name to belong to, alongside the existing queue subscription. A worker will only execute a job if it belongs to the same team_name and the job is delivered through one of the specified queues. |
Yes and - I assume this is with others as well - Multi Team is an option feature, so the CLI param and column is optionally respected only. probably NULL for most setups and this is to be respected as well. Good thing about the delayed discussion is that meanwhile the DB manager for table migrations has been merged in #61155 so if you need a column you cann add the first migration there now. |
1951cbf to
67c6926
Compare
|
@o-nikolas @jscheffl @dheerajturaga The task is complete and ready for review. The following work has been done:
Notes When each worker starts, cases like |
|
Also, it seems like there are no integration tests for this. Would it be okay if I write some when I have time? |
Intergation tests for Edge are a long standing item on my bucket list. Never had time to make them. I'd be very very happy about a contribution. Maybe in a separate PR. Otherwise some back-compat tests are failing and static checks need fixing... |
jscheffl
left a comment
There was a problem hiding this comment.
Some more comments. In general and structurally looking very good already!
|
Forgot to mention: There are zero docs. Can you add a description about multi teamto RST docs as well - especially also highlighting the security restriction for the time being? |
d55c3f9 to
e7400d6
Compare
bf98bba to
cdba6d9
Compare
18851cf to
6fcfab5
Compare
6fcfab5 to
d4ba386
Compare
|
@dheerajturaga There are two things I'd like to confirm:
|
Regarding (2): 3.2.0 is the provider version for which a DB migratio is targetted. At the moment 3.1.0 is the version being available. Tomorrow a new provider will be cut and due to functional enhancements this will get to be 3.2.0. If your PR is merged before tomorrow it will get into 3.2.0, else it will get in two weeks later into 3.3.0. As some reviews are open... not sure if this can be achieved. I'd rather propose to re-work targetting for a 3.3.0 version. |
d4ba386 to
9184670
Compare
no problem! changed! |
9184670 to
f926598
Compare
f926598 to
1233232
Compare
|
Now looks good, as we are just in the moment releasing new providers maybe hold on for 1-2 days that no erro needs fixing... @dheerajturaga WDYT, good enough now to be merged? |
|
@wjddn279 @dheerajturaga — This PR has new commits since the last review requesting changes, and it looks like the author has followed up. Could you take another look when you have a chance to see if the review comments have been addressed? Thanks! |
Config isolation
No major issues here. Following the same pattern as other executors (LocalExecutor, CeleryExecutor), all direct reads from the global conf have been replaced with self.conf, which is a team-aware ExecutorConf instance created in the base executor. This ensures that each team's executor reads team-specific configuration values (e.g., heartbeat interval, purge intervals) without affecting other teams.
Multi-instance isolation
Determining the right approach for isolating resources between teams in EdgeExecutor was not straightforward. Looking at the CeleryExecutor as a reference, it achieves full team isolation by assigning each team a separate broker and separate Celery worker pool. Based on this, I concluded that edge workers should also be partitioned per team.
However, unlike CeleryExecutor which uses external brokers, EdgeExecutor manages all persistence through shared DB tables. This means team-level isolation needs to happen at the query level. Specifically, the maintenance operations (_purge_jobs, _update_orphaned_jobs, _check_worker_liveness) were previously operating on all rows in these tables indiscriminately. In a multi-team setup where each team may have different configuration values, this could lead to one team's executor incorrectly purging another team's jobs or marking another team's workers as dead.
To address this, I introduced _managed_queues -- a per-instance set that tracks which queues this executor is responsible for. It is initialized with the default_queue from the (possibly team-specific) config and grows as queue_workload() is called. All maintenance queries now filter by WHERE queue IN (_managed_queues), and worker liveness checks skip workers whose registered queues do not overlap with the executor's managed queues.
This approach assumes that each team uses a distinct set of queues and that different teams do not share the same queue names.
Questions
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.