Skip to content
Merged
87 changes: 86 additions & 1 deletion astrbot/core/utils/pip_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import importlib.util
import io
import logging
import ntpath
import os
import re
import shlex
Expand All @@ -30,6 +31,7 @@

_DISTLIB_FINDER_PATCH_ATTEMPTED = False
_SITE_PACKAGES_IMPORT_LOCK = threading.RLock()
_PIP_IN_PROCESS_ENV_LOCK = threading.RLock()
_PIP_FAILURE_PATTERNS = {
"error_prefix": re.compile(r"^\s*error:", re.IGNORECASE),
"user_requested": re.compile(r"\bthe user requested\b", re.IGNORECASE),
Expand Down Expand Up @@ -235,6 +237,87 @@ def _run_pip_main_streaming(pip_main, args: list[str]) -> tuple[int, list[str]]:
return result_code, stream.lines


def _run_pip_main_with_temporary_environ(
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
pip_main,
args: list[str],
env_updates: dict[str, str] | None = None,
) -> tuple[int, list[str]]:
# os.environ is process-wide; serialize temporary mutations around the
# in-process pip invocation, including reading the existing environment for
# packaged runtime build-path prepends, and keep the mutation window inside
# the worker.
with _PIP_IN_PROCESS_ENV_LOCK:
if env_updates is None:
env_updates = _build_packaged_windows_runtime_build_env()
with _temporary_environ(env_updates):
return _run_pip_main_streaming(pip_main, args)


def _normalize_windows_native_build_path(path: str) -> str:
normalized = path.replace("/", "\\")

for prefix in ("\\\\?\\UNC\\", "\\??\\UNC\\"):
if normalized.startswith(prefix):
return ntpath.normpath(f"\\\\{normalized[len(prefix) :]}")

for prefix in ("\\\\?\\", "\\??\\"):
if normalized.startswith(prefix):
normalized = normalized[len(prefix) :]
break

return ntpath.normpath(normalized)


def _prepend_windows_env_path(name: str, path: str) -> str:
existing = os.environ.get(name)
if existing:
return f"{path};{existing}"
return path


def _build_packaged_windows_runtime_build_env() -> dict[str, str]:
if sys.platform != "win32" or not is_packaged_desktop_runtime():
return {}

runtime_executable = _normalize_windows_native_build_path(sys.executable)
runtime_dir = ntpath.dirname(runtime_executable)
if not runtime_dir:
return {}

env_updates: dict[str, str] = {}
include_dir = _normalize_windows_native_build_path(
ntpath.join(runtime_dir, "include")
)
libs_dir = _normalize_windows_native_build_path(ntpath.join(runtime_dir, "libs"))

if os.path.isdir(include_dir):
env_updates["INCLUDE"] = _prepend_windows_env_path("INCLUDE", include_dir)
if os.path.isdir(libs_dir):
env_updates["LIB"] = _prepend_windows_env_path("LIB", libs_dir)

return env_updates


@contextlib.contextmanager
def _temporary_environ(updates: dict[str, str]):
if not updates:
yield
return

missing = object()
previous_values = {key: os.environ.get(key, missing) for key in updates}

try:
os.environ.update(updates)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Outdated
yield
finally:
for key, previous_value in previous_values.items():
if previous_value is missing:
os.environ.pop(key, None)
else:
os.environ[key] = previous_value


def _matches_pip_failure_pattern(line: str, *pattern_names: str) -> bool:
names = pattern_names or tuple(_PIP_FAILURE_PATTERNS)
return any(_PIP_FAILURE_PATTERNS[name].search(line) for name in names)
Expand Down Expand Up @@ -931,7 +1014,9 @@ async def _run_pip_in_process(self, args: list[str]) -> int:
original_handlers = list(logging.getLogger().handlers)
try:
result_code, output_lines = await asyncio.to_thread(
_run_pip_main_streaming, pip_main, args
_run_pip_main_with_temporary_environ,
pip_main,
args,
)
finally:
_cleanup_added_root_handlers(original_handlers)
Expand Down
Loading
Loading