diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0b997282163..7a37d18c51d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -47,7 +47,7 @@ repos: additional_dependencies: [ 'keyring==23.0.1', 'nox==2021.6.12', - 'pytest==7.1.1', + 'pytest', 'types-docutils==0.18.3', 'types-setuptools==57.4.14', 'types-freezegun==1.1.9', diff --git a/docs/html/topics/authentication.md b/docs/html/topics/authentication.md index f5b553160df..966ac3e7a0d 100644 --- a/docs/html/topics/authentication.md +++ b/docs/html/topics/authentication.md @@ -66,25 +66,120 @@ man pages][netrc-docs]. ## Keyring Support pip supports loading credentials stored in your keyring using the -{pypi}`keyring` library. +{pypi}`keyring` library, which can be enabled py passing `--keyring-provider` +with a value of `auto`, `disabled`, `import`, or `subprocess`. The default +value `auto` respects `--no-input` and not query keyring at all if the option +is used; otherwise it tries the `import`, `subprocess`, and `disabled` +providers (in this order) and uses the first one that works. -pip will first try to use `keyring` in the same environment as itself and -fallback to using any `keyring` installation which is available on `PATH`. +### Configuring pip's keyring usage -Therefore, either of the following setups will work: +Since the keyring configuration is likely system-wide, a more common way to +configure its usage would be to use a configuration instead: + +```{seealso} +{doc}`./configuration` describes how pip configuration works. +``` ```bash -$ pip install keyring # install keyring from PyPI into same environment as pip -$ echo "your-password" | keyring set pypi.company.com your-username -$ pip install your-package --index-url https://pypi.company.com/ +$ pip config set --global global.keyring-provider subprocess + +# A different user on the same system which has PYTHONPATH configured and and +# wanting to use keyring installed that way could then run +$ pip config set --user global.keyring-provider import + +# For a specific virtual environment you might want to use disable it again +# because you will only be using PyPI and the private repo (and mirror) +# requires 2FA with a keycard and a pincode +$ pip config set --site global.index https://pypi.org/simple +$ pip config set --site global.keyring-provider disabled + +# configuring it via environment variable is also possible +$ export PIP_KEYRING_PROVIDER=disabled ``` -or +### Using keyring's Python module + +Setting `keyring-provider` to `import` makes pip communicate with `keyring` via +its Python interface. ```bash -$ pipx install keyring # install keyring from PyPI into standalone environment +# install keyring from PyPI +$ pip install keyring --index-url https://pypi.org/simple $ echo "your-password" | keyring set pypi.company.com your-username -$ pip install your-package --index-url https://pypi.company.com/ +$ pip install your-package --keyring-provider import --index-url https://pypi.company.com/ +``` + +### Using keyring as a command line application + +Setting `keyring-provider` to `subprocess` makes pip look for and use the +`keyring` command found on `PATH`. + +For this use case, a username *must* be included in the URL, since it is +required by `keyring`'s command line interface. See the example below or the +basic HTTP authentication section at the top of this page. + +```bash +# Install keyring from PyPI using pipx, which we assume is installed properly +# you can also create a venv somewhere and add it to the PATH yourself instead +$ pipx install keyring --index-url https://pypi.org/simple + +# For Azure DevOps, also install its keyring backend. +$ pipx inject keyring artifacts-keyring --index-url https://pypi.org/simple + +# For Google Artifact Registry, also install and initialize its keyring backend. +$ pipx inject keyring keyrings.google-artifactregistry-auth --index-url https://pypi.org/simple +$ gcloud auth login + +# Note that a username is required in the index URL. +$ pip install your-package --keyring-provider subprocess --index-url https://username@pypi.example.com/ +``` + +### Here be dragons + +The `auto` provider is conservative and does not query keyring at all when +`--no-input` is used because the keyring might require user interaction such as +prompting the user on the console. Third party tools frequently call Pip for +you and do indeed pass `--no-input` as they are well-behaved and don't have +much information to work with. (Keyring does have an api to request a backend +that does not require user input.) You have more information about your system, +however! + +You can force keyring usage by requesting a keyring provider other than `auto` +(or `disabled`). Leaving `import` and `subprocess`. You do this by passing +`--keyring-provider import` or one of the following methods: + +```bash +# via config file, possibly with --user, --global or --site +$ pip config set global.keyring-provider subprocess +# or via environment variable +$ export PIP_KEYRING_PROVIDER=import +``` + +```{warning} +Be careful when doing this since it could cause tools such as pipx and Pipenv +to appear to hang. They show their own progress indicator while hiding output +from the subprocess in which they run Pip. You won't know whether the keyring +backend is waiting the user input or not in such situations. +``` + +pip is conservative and does not query keyring at all when `--no-input` is used +because the keyring might require user interaction such as prompting the user +on the console. You can force keyring usage by passing `--force-keyring` or one +of the following: + +```bash +# possibly with --user, --global or --site +$ pip config set global.force-keyring true +# or +$ export PIP_FORCE_KEYRING=1 +``` + +```{warning} +Be careful when doing this since it could cause tools such as pipx and Pipenv +to appear to hang. They show their own progress indicator while hiding output +from the subprocess in which they run Pip. You won't know whether the keyring +backend is waiting the user input or not in such situations. ``` Note that `keyring` (the Python package) needs to be installed separately from diff --git a/news/8719.feature.rst b/news/8719.feature.rst new file mode 100644 index 00000000000..3f3caf2db89 --- /dev/null +++ b/news/8719.feature.rst @@ -0,0 +1 @@ +Add ``--keyring-provider`` flag. See the Authentication page in the documentation for more info. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 1f804097e86..98730bee1f6 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -252,6 +252,19 @@ class PipOption(Option): help="Disable prompting for input.", ) +keyring_provider: Callable[..., Option] = partial( + Option, + "--keyring-provider", + dest="keyring_provider", + choices=["auto", "disabled", "import", "subprocess"], + default="disabled", + help=( + "Enable the credential lookup via the keyring library if user input is allowed." + " Specify which mechanism to use [disabled, import, subprocess]." + " (default: disabled)" + ), +) + proxy: Callable[..., Option] = partial( Option, "--proxy", @@ -1027,6 +1040,7 @@ def check_list_path_option(options: Values) -> None: quiet, log, no_input, + keyring_provider, proxy, retries, timeout, diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 1044809f040..048b9c99e41 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -151,6 +151,7 @@ def _build_session( # Determine if we can prompt the user for authentication or not session.auth.prompting = not options.no_input + session.auth.keyring_provider = options.keyring_provider return session diff --git a/src/pip/_internal/network/auth.py b/src/pip/_internal/network/auth.py index ac8cbf23bab..c0efa765c85 100644 --- a/src/pip/_internal/network/auth.py +++ b/src/pip/_internal/network/auth.py @@ -3,12 +3,17 @@ Contains interface (MultiDomainBasicAuth) and associated glue code for providing credentials in the context of network requests. """ - +import logging import os import shutil import subprocess +import sysconfig +import typing import urllib.parse from abc import ABC, abstractmethod +from functools import lru_cache +from os.path import commonprefix +from pathlib import Path from typing import Any, Dict, List, NamedTuple, Optional, Tuple from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth @@ -131,7 +136,7 @@ def _get_password(self, service_name: str, username: str) -> Optional[str]: res = subprocess.run( cmd, stdin=subprocess.DEVNULL, - capture_output=True, + stdout=subprocess.PIPE, env=env, ) if res.returncode: @@ -142,66 +147,89 @@ def _set_password(self, service_name: str, username: str, password: str) -> None """Mirror the implementation of keyring.set_password using cli""" if self.keyring is None: return None - - cmd = [self.keyring, "set", service_name, username] - input_ = (password + os.linesep).encode("utf-8") env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" - res = subprocess.run(cmd, input=input_, env=env) - res.check_returncode() + subprocess.run( + [self.keyring, "set", service_name, username], + input=f"{password}{os.linesep}".encode("utf-8"), + env=env, + check=True, + ) return None -def get_keyring_provider() -> KeyRingBaseProvider: +@lru_cache(maxsize=None) +def get_keyring_provider(provider: str) -> KeyRingBaseProvider: + logger.verbose("Keyring provider requested: %s", provider) + # keyring has previously failed and been disabled - if not KEYRING_DISABLED: - # Default to trying to use Python provider + if KEYRING_DISABLED: + provider = "disabled" + if provider in ["import", "auto"]: try: - return KeyRingPythonProvider() + impl = KeyRingPythonProvider() + logger.verbose("Keyring provider set: import") + return impl except ImportError: pass except Exception as exc: # In the event of an unexpected exception # we should warn the user - logger.warning( - "Installed copy of keyring fails with exception %s, " - "trying to find a keyring executable as a fallback", - str(exc), - ) - - # Fallback to Cli Provider if `keyring` isn't installed + msg = "Installed copy of keyring fails with exception %s" + if provider == "auto": + msg = msg + ", trying to find a keyring executable as a fallback" + logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG)) + if provider in ["subprocess", "auto"]: cli = shutil.which("keyring") + if cli and cli.startswith(sysconfig.get_path("scripts")): + # all code within this function is stolen from shutil.which implementation + @typing.no_type_check + def PATH_as_shutil_which_determines_it() -> str: + path = os.environ.get("PATH", None) + if path is None: + try: + path = os.confstr("CS_PATH") + except (AttributeError, ValueError): + # os.confstr() or CS_PATH is not available + path = os.defpath + # bpo-35755: Don't use os.defpath if the PATH environment variable is + # set to an empty string + + return path + + scripts = Path(sysconfig.get_path("scripts")) + + paths = [] + for path in PATH_as_shutil_which_determines_it().split(os.pathsep): + p = Path(path) + try: + if not p.samefile(scripts): + paths.append(path) + except FileNotFoundError: + pass + + path = os.pathsep.join(paths) + + cli = shutil.which("keyring", path=path) + if cli: + logger.verbose("Keyring provider set: subprocess with executable %s", cli) return KeyRingCliProvider(cli) + logger.verbose("Keyring provider set: disabled") return KeyRingNullProvider() -def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]: - """Return the tuple auth for a given url from keyring.""" - # Do nothing if no url was provided - if not url: - return None - - keyring = get_keyring_provider() - try: - return keyring.get_auth_info(url, username) - except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", - str(exc), - ) - global KEYRING_DISABLED - KEYRING_DISABLED = True - return None - - class MultiDomainBasicAuth(AuthBase): def __init__( - self, prompting: bool = True, index_urls: Optional[List[str]] = None + self, + prompting: bool = True, + index_urls: Optional[List[str]] = None, + keyring_provider: str = "auto", ) -> None: self.prompting = prompting self.index_urls = index_urls + self.keyring_provider = keyring_provider # type: ignore[assignment] self.passwords: Dict[str, AuthInfo] = {} # When the user is prompted to enter credentials and keyring is # available, we will offer to save them. If the user accepts, @@ -210,6 +238,47 @@ def __init__( # ``save_credentials`` to save these. self._credentials_to_save: Optional[Credentials] = None + @property + def keyring_provider(self) -> KeyRingBaseProvider: + return get_keyring_provider(self._keyring_provider) + + @keyring_provider.setter + def keyring_provider(self, provider: str) -> None: + # The free function get_keyring_provider has been decorated with + # functools.cache. If an exception occurs in get_keyring_auth that + # cache will be cleared and keyring disabled, take that into account + # if you want to remove this indirection. + self._keyring_provider = provider + + @property + def use_keyring(self) -> bool: + # We won't use keyring when --no-input is passed unless + # a specific provider is requested because it might require + # user interaction + return self.prompting or self._keyring_provider not in ["auto", "disabled"] + + def _get_keyring_auth( + self, + url: Optional[str], + username: Optional[str], + ) -> Optional[AuthInfo]: + """Return the tuple auth for a given url from keyring.""" + # Do nothing if no url was provided + if not url: + return None + + try: + return self.keyring_provider.get_auth_info(url, username) + except Exception as exc: + logger.warning( + "Keyring is skipped due to an exception: %s", + str(exc), + ) + global KEYRING_DISABLED + KEYRING_DISABLED = True + get_keyring_provider.cache_clear() + return None + def _get_index_url(self, url: str) -> Optional[str]: """Return the original index URL matching the requested URL. @@ -226,15 +295,42 @@ def _get_index_url(self, url: str) -> Optional[str]: if not url or not self.index_urls: return None - for u in self.index_urls: - prefix = remove_auth_from_url(u).rstrip("/") + "/" - if url.startswith(prefix): - return u - return None + url = remove_auth_from_url(url).rstrip("/") + "/" + parsed_url = urllib.parse.urlsplit(url) + + candidates = [] + + for index in self.index_urls: + index = index.rstrip("/") + "/" + parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index)) + if parsed_url == parsed_index: + return index + + if parsed_url.netloc != parsed_index.netloc: + continue + + candidate = urllib.parse.urlsplit(index) + candidates.append(candidate) + + if not candidates: + return None + + candidates.sort( + reverse=True, + key=lambda candidate: commonprefix( + [ + parsed_url.path, + candidate.path, + ] + ).rfind("/"), + ) + + return urllib.parse.urlunsplit(candidates[0]) def _get_new_credentials( self, original_url: str, + *, allow_netrc: bool = True, allow_keyring: bool = False, ) -> AuthInfo: @@ -278,8 +374,8 @@ def _get_new_credentials( # The index url is more specific than the netloc, so try it first # fmt: off kr_auth = ( - get_keyring_auth(index_url, username) or - get_keyring_auth(netloc, username) + self._get_keyring_auth(index_url, username) or + self._get_keyring_auth(netloc, username) ) # fmt: on if kr_auth: @@ -356,18 +452,23 @@ def __call__(self, req: Request) -> Request: def _prompt_for_password( self, netloc: str ) -> Tuple[Optional[str], Optional[str], bool]: - username = ask_input(f"User for {netloc}: ") + username = ask_input(f"User for {netloc}: ") if self.prompting else None if not username: return None, None, False - auth = get_keyring_auth(netloc, username) - if auth and auth[0] is not None and auth[1] is not None: - return auth[0], auth[1], False + if self.use_keyring: + auth = self._get_keyring_auth(netloc, username) + if auth and auth[0] is not None and auth[1] is not None: + return auth[0], auth[1], False password = ask_password("Password: ") return username, password, True # Factored out to allow for easy patching in tests def _should_save_password_to_keyring(self) -> bool: - if not get_keyring_provider().has_keyring: + if ( + not self.prompting + or not self.use_keyring + or not self.keyring_provider.has_keyring + ): return False return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" @@ -377,19 +478,22 @@ def handle_401(self, resp: Response, **kwargs: Any) -> Response: if resp.status_code != 401: return resp + username, password = None, None + + # Query the keyring for credentials: + if self.use_keyring: + username, password = self._get_new_credentials( + resp.url, + allow_netrc=False, + allow_keyring=True, + ) + # We are not able to prompt the user so simply return the response - if not self.prompting: + if not self.prompting and not username and not password: return resp parsed = urllib.parse.urlparse(resp.url) - # Query the keyring for credentials: - username, password = self._get_new_credentials( - resp.url, - allow_netrc=False, - allow_keyring=True, - ) - # Prompt the user for a new username and password save = False if not username and not password: @@ -439,14 +543,17 @@ def warn_on_401(self, resp: Response, **kwargs: Any) -> None: def save_credentials(self, resp: Response, **kwargs: Any) -> None: """Response callback to save credentials on success.""" - keyring = get_keyring_provider() - assert keyring.has_keyring, "should never reach here without keyring" + assert ( + self.keyring_provider.has_keyring + ), "should never reach here without keyring" creds = self._credentials_to_save self._credentials_to_save = None if creds and resp.status_code < 400: try: logger.info("Saving credentials to keyring") - keyring.save_auth_info(creds.url, creds.username, creds.password) + self.keyring_provider.save_auth_info( + creds.url, creds.username, creds.password + ) except Exception: logger.exception("Failed to save credentials") diff --git a/tests/conftest.py b/tests/conftest.py index 46975b29beb..106e7321456 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import ( TYPE_CHECKING, + AnyStr, Callable, Dict, Iterable, @@ -507,7 +508,10 @@ def with_wheel(virtualenv: VirtualEnvironment, wheel_install: Path) -> None: class ScriptFactory(Protocol): def __call__( - self, tmpdir: Path, virtualenv: Optional[VirtualEnvironment] = None + self, + tmpdir: Path, + virtualenv: Optional[VirtualEnvironment] = None, + environ: Optional[Dict[AnyStr, AnyStr]] = None, ) -> PipTestEnvironment: ... @@ -521,7 +525,11 @@ def script_factory( def factory( tmpdir: Path, virtualenv: Optional[VirtualEnvironment] = None, + environ: Optional[Dict[AnyStr, AnyStr]] = None, ) -> PipTestEnvironment: + kwargs = {} + if environ: + kwargs["environ"] = environ if virtualenv is None: virtualenv = virtualenv_factory(tmpdir.joinpath("venv")) return PipTestEnvironment( @@ -541,6 +549,7 @@ def factory( pip_expect_warning=deprecated_python, # Tell the Test Environment if we want to run pip via a zipapp zipapp=zipapp, + **kwargs, ) return factory diff --git a/tests/functional/test_install_config.py b/tests/functional/test_install_config.py index 99e59b97b18..43a9f8bd305 100644 --- a/tests/functional/test_install_config.py +++ b/tests/functional/test_install_config.py @@ -3,10 +3,12 @@ import sys import tempfile import textwrap +from pathlib import Path +from typing import Callable, List import pytest -from tests.conftest import CertFactory, MockServer +from tests.conftest import CertFactory, MockServer, ScriptFactory from tests.lib import PipTestEnvironment, TestData from tests.lib.server import ( authorization_response, @@ -361,20 +363,96 @@ def test_do_not_prompt_for_authentication( assert "ERROR: HTTP error 401" in result.stderr +@pytest.fixture(params=(True, False), ids=("interactive", "noninteractive")) +def interactive(request: pytest.FixtureRequest) -> bool: + return request.param + + +@pytest.fixture(params=(True, False), ids=("auth_needed", "auth_not_needed")) +def auth_needed(request: pytest.FixtureRequest) -> bool: + return request.param + + +@pytest.fixture(params=("disabled", "import", "subprocess", "auto")) +def keyring_provider(request: pytest.FixtureRequest) -> str: + return request.param + + +@pytest.fixture(params=("disabled", "import", "subprocess")) +def keyring_provider_implementation(request: pytest.FixtureRequest) -> str: + return request.param + + +@pytest.fixture() +def flags( + request: pytest.FixtureRequest, + interactive: bool, + auth_needed: bool, + keyring_provider: str, + keyring_provider_implementation: str, +) -> List[str]: + if ( + keyring_provider != "auto" + and keyring_provider_implementation != keyring_provider + ): + pytest.skip() + + flags = ["--keyring-provider", keyring_provider] + if not interactive: + flags.append("--no-input") + if auth_needed: + if keyring_provider_implementation == "disabled" or ( + not interactive and keyring_provider == "auto" + ): + request.applymarker(pytest.mark.xfail()) + return flags + + @pytest.mark.skipif( sys.platform == "linux" and sys.version_info < (3, 8), reason="Custom SSL certification not running well in CI", ) -@pytest.mark.parametrize("auth_needed", (True, False)) def test_prompt_for_keyring_if_needed( - script: PipTestEnvironment, data: TestData, cert_factory: CertFactory, auth_needed: bool, + flags: List[str], + keyring_provider: str, + keyring_provider_implementation: str, + tmpdir: Path, + script_factory: ScriptFactory, + virtualenv_factory: Callable[[Path], VirtualEnvironment], ) -> None: - """Test behaviour while installing from a index url + """Test behaviour while installing from an index url requiring authentication and keyring is possible. """ + environ = os.environ.copy() + workspace = tmpdir.joinpath("workspace") + + if keyring_provider_implementation == "subprocess": + keyring_virtualenv = virtualenv_factory(workspace.joinpath("keyring")) + keyring_script = script_factory( + workspace.joinpath("keyring"), keyring_virtualenv + ) + keyring_script.pip( + "install", + "keyring", + ) + + environ["PATH"] = str(keyring_script.bin_path) + os.pathsep + environ["PATH"] + + virtualenv = virtualenv_factory(workspace.joinpath("venv")) + script = script_factory(workspace.joinpath("venv"), virtualenv, environ=environ) + + if keyring_provider != "auto" or keyring_provider_implementation != "subprocess": + script.pip( + "install", + "keyring", + ) + + if keyring_provider_implementation != "subprocess": + keyring_script = script + cert_path = cert_factory() ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ctx.load_cert_chain(cert_path, cert_path) @@ -394,22 +472,40 @@ def test_prompt_for_keyring_if_needed( response(data.packages / "simple-3.0.tar.gz"), ] - url = f"https://{server.host}:{server.port}/simple" + url = f"https://USERNAME@{server.host}:{server.port}/simple" keyring_content = textwrap.dedent( """\ import os import sys - from collections import namedtuple + import keyring + from keyring.backend import KeyringBackend + from keyring.credentials import SimpleCredential + + class TestBackend(KeyringBackend): + priority = 1 + + def get_credential(self, url, username): + sys.stderr.write("get_credential was called" + os.linesep) + return SimpleCredential(username="USERNAME", password="PASSWORD") - Cred = namedtuple("Cred", ["username", "password"]) + def get_password(self, url, username): + sys.stderr.write("get_password was called" + os.linesep) + return "PASSWORD" - def get_credential(url, username): - sys.stderr.write("get_credential was called" + os.linesep) - return Cred("USERNAME", "PASSWORD") + def set_password(self, url, username): + pass """ ) - keyring_path = script.site_packages_path / "keyring.py" + keyring_path = keyring_script.site_packages_path / "keyring_test.py" + keyring_path.write_text(keyring_content) + + keyring_content = ( + "import keyring_test;" + " import keyring;" + " keyring.set_keyring(keyring_test.TestBackend())" + os.linesep + ) + keyring_path = keyring_path.with_suffix(".pth") keyring_path.write_text(keyring_content) with server_running(server): @@ -421,10 +517,16 @@ def get_credential(url, username): cert_path, "--client-cert", cert_path, + *flags, "simple", ) + function_name = ( + "get_credential" + if keyring_provider_implementation == "import" + else "get_password" + ) if auth_needed: - assert "get_credential was called" in result.stderr + assert function_name + " was called" in result.stderr else: - assert "get_credential was called" not in result.stderr + assert function_name + " was called" not in result.stderr diff --git a/tests/unit/test_network_auth.py b/tests/unit/test_network_auth.py index 5e9e325a15f..5dde6da57c5 100644 --- a/tests/unit/test_network_auth.py +++ b/tests/unit/test_network_auth.py @@ -1,5 +1,6 @@ import functools import os +import subprocess import sys from typing import Any, Dict, Iterable, List, Optional, Tuple @@ -15,6 +16,7 @@ def reset_keyring() -> Iterable[None]: yield None # Reset the state of the module between tests pip._internal.network.auth.KEYRING_DISABLED = False + pip._internal.network.auth.get_keyring_provider.cache_clear() @pytest.mark.parametrize( @@ -100,7 +102,12 @@ def test_get_credentials_uses_cached_credentials_only_username() -> None: def test_get_index_url_credentials() -> None: - auth = MultiDomainBasicAuth(index_urls=["http://foo:bar@example.com/path"]) + auth = MultiDomainBasicAuth( + index_urls=[ + "http://example.com/", + "http://foo:bar@example.com/path", + ] + ) get = functools.partial( auth._get_new_credentials, allow_netrc=False, allow_keyring=False ) @@ -110,6 +117,45 @@ def test_get_index_url_credentials() -> None: assert get("http://example.com/path3/path2") == (None, None) +def test_prioritize_longest_path_prefix_match_organization() -> None: + auth = MultiDomainBasicAuth( + index_urls=[ + "http://foo:bar@example.com/org-name-alpha/repo-alias/simple", + "http://bar:foo@example.com/org-name-beta/repo-alias/simple", + ] + ) + get = functools.partial( + auth._get_new_credentials, allow_netrc=False, allow_keyring=False + ) + + # Inspired by Azure DevOps URL structure, GitLab should look similar + assert get("http://example.com/org-name-alpha/repo-guid/dowbload/") == ( + "foo", + "bar", + ) + assert get("http://example.com/org-name-beta/repo-guid/dowbload/") == ("bar", "foo") + + +def test_prioritize_longest_path_prefix_match_project() -> None: + auth = MultiDomainBasicAuth( + index_urls=[ + "http://foo:bar@example.com/org-alpha/project-name-alpha/repo-alias/simple", + "http://bar:foo@example.com/org-alpha/project-name-beta/repo-alias/simple", + ] + ) + get = functools.partial( + auth._get_new_credentials, allow_netrc=False, allow_keyring=False + ) + + # Inspired by Azure DevOps URL structure, GitLab should look similar + assert get( + "http://example.com/org-alpha/project-name-alpha/repo-guid/dowbload/" + ) == ("foo", "bar") + assert get( + "http://example.com/org-alpha/project-name-beta/repo-guid/dowbload/" + ) == ("bar", "foo") + + class KeyringModuleV1: """Represents the supported API of keyring before get_credential was added. @@ -121,7 +167,7 @@ def __init__(self) -> None: def get_password(self, system: str, username: str) -> Optional[str]: if system == "example.com" and username: return username + "!netloc" - if system == "http://example.com/path2" and username: + if system == "http://example.com/path2/" and username: return username + "!url" return None @@ -134,8 +180,8 @@ def set_password(self, system: str, username: str, password: str) -> None: ( ("http://example.com/path1", (None, None)), # path1 URLs will be resolved by netloc - ("http://user@example.com/path1", ("user", "user!netloc")), - ("http://user2@example.com/path1", ("user2", "user2!netloc")), + ("http://user@example.com/path3", ("user", "user!netloc")), + ("http://user2@example.com/path3", ("user2", "user2!netloc")), # path2 URLs will be resolved by index URL ("http://example.com/path2/path3", (None, None)), ("http://foo@example.com/path2/path3", ("foo", "foo!url")), @@ -148,7 +194,10 @@ def test_keyring_get_password( ) -> None: keyring = KeyringModuleV1() monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] - auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) + auth = MultiDomainBasicAuth( + index_urls=["http://example.com/path2", "http://example.com/path3"], + keyring_provider="import", + ) actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) assert actual == expect @@ -157,7 +206,7 @@ def test_keyring_get_password( def test_keyring_get_password_after_prompt(monkeypatch: pytest.MonkeyPatch) -> None: keyring = KeyringModuleV1() monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] - auth = MultiDomainBasicAuth() + auth = MultiDomainBasicAuth(keyring_provider="import") def ask_input(prompt: str) -> str: assert prompt == "User for example.com: " @@ -173,7 +222,7 @@ def test_keyring_get_password_after_prompt_when_none( ) -> None: keyring = KeyringModuleV1() monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] - auth = MultiDomainBasicAuth() + auth = MultiDomainBasicAuth(keyring_provider="import") def ask_input(prompt: str) -> str: assert prompt == "User for unknown.com: " @@ -194,7 +243,10 @@ def test_keyring_get_password_username_in_index( ) -> None: keyring = KeyringModuleV1() monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] - auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"]) + auth = MultiDomainBasicAuth( + index_urls=["http://user@example.com/path2", "http://example.com/path4"], + keyring_provider="import", + ) get = functools.partial( auth._get_new_credentials, allow_netrc=False, allow_keyring=True ) @@ -227,7 +279,7 @@ def test_keyring_set_password( ) -> None: keyring = KeyringModuleV1() monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] - auth = MultiDomainBasicAuth(prompting=True) + auth = MultiDomainBasicAuth(prompting=True, keyring_provider="import") monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) if creds[2]: @@ -284,7 +336,7 @@ def get_password(self, system: str, username: str) -> None: assert False, "get_password should not ever be called" def get_credential(self, system: str, username: str) -> Optional[Credential]: - if system == "http://example.com/path2": + if system == "http://example.com/path2/": return self.Credential("username", "url") if system == "example.com": return self.Credential("username", "netloc") @@ -303,7 +355,10 @@ def test_keyring_get_credential( monkeypatch: pytest.MonkeyPatch, url: str, expect: str ) -> None: monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2()) # type: ignore[misc] - auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) + auth = MultiDomainBasicAuth( + index_urls=["http://example.com/path1", "http://example.com/path2"], + keyring_provider="import", + ) assert ( auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect @@ -325,7 +380,9 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non keyring_broken = KeyringModuleBroken() monkeypatch.setitem(sys.modules, "keyring", keyring_broken) # type: ignore[misc] - auth = MultiDomainBasicAuth(index_urls=["http://example.com/"]) + auth = MultiDomainBasicAuth( + index_urls=["http://example.com/"], keyring_provider="import" + ) assert keyring_broken._call_count == 0 for i in range(5): @@ -347,13 +404,15 @@ def __call__( *, env: Dict[str, str], stdin: Optional[Any] = None, - capture_output: Optional[bool] = None, + stdout: Optional[Any] = None, input: Optional[bytes] = None, + check: Optional[bool] = None ) -> Any: if cmd[1] == "get": assert stdin == -3 # subprocess.DEVNULL - assert capture_output is True + assert stdout == subprocess.PIPE assert env["PYTHONIOENCODING"] == "utf-8" + assert check is None password = self.get_password(*cmd[2:]) if password is None: @@ -361,13 +420,15 @@ def __call__( self.returncode = 1 else: # Passwords are returned encoded with a newline appended + self.returncode = 0 self.stdout = (password + os.linesep).encode("utf-8") if cmd[1] == "set": assert stdin is None - assert capture_output is None + assert stdout is None assert env["PYTHONIOENCODING"] == "utf-8" assert input is not None + assert check # Input from stdin is encoded self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep)) @@ -384,8 +445,8 @@ def check_returncode(self) -> None: ( ("http://example.com/path1", (None, None)), # path1 URLs will be resolved by netloc - ("http://user@example.com/path1", ("user", "user!netloc")), - ("http://user2@example.com/path1", ("user2", "user2!netloc")), + ("http://user@example.com/path3", ("user", "user!netloc")), + ("http://user2@example.com/path3", ("user2", "user2!netloc")), # path2 URLs will be resolved by index URL ("http://example.com/path2/path3", (None, None)), ("http://foo@example.com/path2/path3", ("foo", "foo!url")), @@ -400,7 +461,10 @@ def test_keyring_cli_get_password( monkeypatch.setattr( pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() ) - auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) + auth = MultiDomainBasicAuth( + index_urls=["http://example.com/path2", "http://example.com/path3"], + keyring_provider="subprocess", + ) actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) assert actual == expect @@ -431,7 +495,7 @@ def test_keyring_cli_set_password( monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") keyring = KeyringSubprocessResult() monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) - auth = MultiDomainBasicAuth(prompting=True) + auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess") monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) if creds[2]: