Fix race condition in GitRepository.pull_code() with file-based locking#21388
Fix race condition in GitRepository.pull_code() with file-based locking#21388devin-ai-integration[bot] wants to merge 8 commits intomainfrom
GitRepository.pull_code() with file-based locking#21388Conversation
Add file-based locking around GitRepository.pull_code() to prevent race conditions when multiple concurrent flow runs use the same git repository. Uses asyncio.Lock for in-process async coordination between concurrent tasks and FileLock for cross-process coordination. The lock file is created adjacent to the destination directory (e.g., dest.lock). Closes #11187 Co-authored-by: alex.s <alex.s@prefect.io> Co-Authored-By: alex.s <ajstreed1@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
filelock is a transitive dependency not available in the prefect-client package. Fall back to asyncio.Lock only when filelock is not installed. Co-Authored-By: alex.s <ajstreed1@gmail.com>
- Create prefect/locking/_filelock.py with cross-platform file lock using
OS-level locking (fcntl.flock on Unix, msvcrt.locking on Windows)
- Use async-aware aacquire() method that polls with asyncio.sleep() to
avoid blocking the event loop during cross-process lock contention
- Fix lock path derivation: use parent/(name + '.lock') instead of
with_suffix('.lock') which incorrectly replaces existing suffixes
- Remove filelock transitive dependency usage entirely
- Update tests to work with new internal FileLock
Co-authored-by: Alexander Streed <desertaxle@users.noreply.github.com>
Co-Authored-By: alex.s <ajstreed1@gmail.com>
- Wrap acquire/aacquire polling loops in try/except BaseException to close the fd on CancelledError or any other unexpected exception - Wrap _unlock_fd in release() with try/finally to ensure os.close() runs even if unlock raises Co-authored-by: Alexander Streed <desertaxle@users.noreply.github.com> Co-Authored-By: alex.s <ajstreed1@gmail.com>
Co-Authored-By: alex.s <ajstreed1@gmail.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 13e3555258
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Handle ImportError for fcntl/msvcrt so _filelock.py loads on any OS - FileLock.acquire/aacquire silently no-op when locking is unavailable - pull_code() catches lock acquisition failures at runtime and falls back to asyncio.Lock only, logging a debug message Co-authored-by: Alexander Streed <desertaxle@users.noreply.github.com> Co-Authored-By: alex.s <ajstreed1@gmail.com>
Rewrites _filelock.py to use Path.touch(exist_ok=False) / Path.unlink() instead of fcntl/msvcrt, following the same pattern as FileSystemLockManager. No OS-specific imports needed — works on any platform that supports basic filesystem operations. Co-authored-by: Alexander Streed <desertaxle@users.noreply.github.com> Co-Authored-By: alex.s <ajstreed1@gmail.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5a86567757
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Write the owning process's PID to the lock file on acquisition. When a lock file already exists, read the PID and check if the process is still alive via os.kill(pid, 0). If the process is dead, remove the stale lock and retry immediately — no need to wait for timeout. Co-authored-by: Alexander Streed <desertaxle@users.noreply.github.com> Co-Authored-By: alex.s <ajstreed1@gmail.com>
There was a problem hiding this comment.
Can you add tests for this module?
Fixes a race condition in
GitRepository.pull_code()where multiple concurrent flow runs sharing the same clone destination directory can race on thegit_dir.exists()check-then-act logic, causingFileNotFoundErroror corrupt clones. Theshutil.rmtree()calls on failure paths can also delete a directory out from under a concurrent run.Closes #11187
Changes
Adds dual-layer locking around the entire
pull_code()method:asyncio.Lock(per destination path) — coordinates concurrent async tasks within the same process without blocking the event loopFileLock(prefect/locking/_filelock.py) — coordinates across separate processes via a.lockfile adjacent to the destination directory (e.g.,repo.lock)The internal
FileLockuses lock file existence to indicate the lock is held (os.open(O_CREAT | O_EXCL)to acquire,Path.unlink()to release), following the same pattern asFileSystemLockManager. No OS-specific locking primitives are required — it works on any platform with basic filesystem operations.Stale lock recovery: The owning process's PID is written to the lock file on acquisition. When contention is detected (
FileExistsError), the lock file's PID is read and checked viaos.kill(pid, 0). If the owning process is dead, the stale lock is removed and acquisition retries immediately — no need to wait for the 300s timeout.The existing
pull_codebody is extracted into_pull_code_locked()with no logic changes.Key review items
os.kill(pid, 0)cross-platform behavior: Used for PID liveness checks. Verify this works correctly on Windows (should useOpenProcessunder the hood in CPython).os.open(O_CREAT | O_EXCL): Atomic on POSIX. Confirm this holds on Windows and network filesystems relevant to users._pull_code_locksmodule-level dict (storage.py): Maps destination paths toasyncio.Lockinstances. Entries are never evicted — this is fine for typical usage (small number of distinct repos), but worth confirming that assumption.aacquire()uses polling (asyncio.sleep(0.1)between lock attempts). This avoids blocking the event loop during cross-process contention but is a busy-wait pattern.acquire()andaacquire()— they differ only intime.sleepvsasyncio.sleep. Kept separate for clarity rather than introducing a callback abstraction.Checklist
<link to issue>"mint.json.Link to Devin session: https://app.devin.ai/sessions/1e3a05b708534a449af4acc4a2d76cc1
Requested by: @desertaxle