diff --git a/google/auth/jwt.py b/google/auth/jwt.py index 1ebd565d4..5333b77da 100644 --- a/google/auth/jwt.py +++ b/google/auth/jwt.py @@ -4,7 +4,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -12,43 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""JSON Web Tokens +from __future__ import annotations -Provides support for creating (encoding) and verifying (decoding) JWTs, -especially JWTs generated and consumed by Google infrastructure. - -See `rfc7519`_ for more details on JWTs. - -To encode a JWT use :func:`encode`:: - - from google.auth import crypt - from google.auth import jwt - - signer = crypt.Signer(private_key) - payload = {'some': 'payload'} - encoded = jwt.encode(signer, payload) - -To decode a JWT and verify claims use :func:`decode`:: - - claims = jwt.decode(encoded, certs=public_certs) - -You can also skip verification:: - - claims = jwt.decode(encoded, verify=False) - -.. _rfc7519: https://tools.ietf.org/html/rfc7519 - -""" +from typing import Any, Mapping, Optional, Union, Tuple, cast, List, Type try: - from collections.abc import Mapping -# Python 2.7 compatibility -except ImportError: # pragma: NO COVER - from collections import Mapping # type: ignore + from collections.abc import Mapping as CollectionsMapping +except ImportError: + from typing import Mapping as CollectionsMapping + import copy import datetime import json -import urllib +import urllib.parse import cachetools @@ -63,35 +39,29 @@ except ImportError: # pragma: NO COVER es256 = None # type: ignore -_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds -_DEFAULT_MAX_CACHE_SIZE = 10 -_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} -_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"]) +_DEFAULT_TOKEN_LIFETIME_SECS: int = 3600 +_DEFAULT_MAX_CACHE_SIZE: int = 10 -if es256 is not None: # pragma: NO COVER - _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore +_ALGORITHM_TO_VERIFIER_CLASS: dict[str, Type[crypt.Verifier]] = {"RS256": crypt.RSAVerifier} +_CRYPTOGRAPHY_BASED_ALGORITHMS: frozenset[str] = frozenset(["ES256"]) +if es256 is not None: # pragma: NO COVER + _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier -def encode(signer, payload, header=None, key_id=None): - """Make a signed JWT. - - Args: - signer (google.auth.crypt.Signer): The signer used to sign the JWT. - payload (Mapping[str, str]): The JWT payload. - header (Mapping[str, str]): Additional JWT header payload. - key_id (str): The key id to add to the JWT header. If the - signer has a key id it will be used as the default. If this is - specified it will override the signer's key id. - Returns: - bytes: The encoded JWT. - """ +def encode( + signer: crypt.Signer, + payload: Mapping[str, Any], + header: Optional[Mapping[str, Any]] = None, + key_id: Optional[str] = None, +) -> bytes: if header is None: header = {} if key_id is None: key_id = signer.key_id + header = dict(header) header.update({"typ": "JWT"}) if "alg" not in header: @@ -104,427 +74,206 @@ def encode(signer, payload, header=None, key_id=None): header["kid"] = key_id segments = [ - _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), - _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), + _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), # type: ignore + _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), # type: ignore ] signing_input = b".".join(segments) - signature = signer.sign(signing_input) - segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) + signature = signer.sign(signing_input) # type: ignore + segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) # type: ignore return b".".join(segments) -def _decode_jwt_segment(encoded_section): - """Decodes a single JWT segment.""" - section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) +def _decode_jwt_segment(encoded_section: bytes) -> Any: + section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) # type: ignore try: return json.loads(section_bytes.decode("utf-8")) except ValueError as caught_exc: - new_exc = exceptions.MalformedError( - "Can't parse segment: {0}".format(section_bytes) - ) - raise new_exc from caught_exc - - -def _unverified_decode(token): - """Decodes a token and does no verification. + decoded_segment: str = encoded_section.decode("utf-8", errors="replace") + msg: str = "Can't parse segment: {}".format(decoded_segment) + raise exceptions.MalformedError(msg) from caught_exc # type: ignore - Args: - token (Union[str, bytes]): The encoded JWT. - Returns: - Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and - signature. - - Raises: - google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. - """ - token = _helpers.to_bytes(token) +def _unverified_decode( + token: Union[str, bytes] +) -> Tuple[Mapping[str, Any], Mapping[str, Any], bytes, bytes]: + token = _helpers.to_bytes(token) # type: ignore + assert isinstance(token, bytes) if token.count(b".") != 2: - raise exceptions.MalformedError( - "Wrong number of segments in token: {0}".format(token) - ) + decoded_token = token.decode("utf-8", errors="replace") + msg = "Wrong number of segments in token: {}".format(decoded_token) + raise exceptions.MalformedError(msg) # type: ignore + encoded_header, encoded_payload, signature = token.split(b".") signed_section = encoded_header + b"." + encoded_payload - signature = _helpers.padded_urlsafe_b64decode(signature) + signature = _helpers.padded_urlsafe_b64decode(signature) # type: ignore - # Parse segments header = _decode_jwt_segment(encoded_header) payload = _decode_jwt_segment(encoded_payload) - if not isinstance(header, Mapping): - raise exceptions.MalformedError( - "Header segment should be a JSON object: {0}".format(encoded_header) - ) - - if not isinstance(payload, Mapping): - raise exceptions.MalformedError( - "Payload segment should be a JSON object: {0}".format(encoded_payload) - ) - - return header, payload, signed_section, signature - + if not isinstance(header, CollectionsMapping): + raise exceptions.MalformedError("Header segment should be a JSON object.") # type: ignore -def decode_header(token): - """Return the decoded header of a token. + if not isinstance(payload, CollectionsMapping): + raise exceptions.MalformedError("Payload segment should be a JSON object.") # type: ignore - No verification is done. This is useful to extract the key id from - the header in order to acquire the appropriate certificate to verify - the token. + return cast(Mapping[str, Any], header), cast(Mapping[str, Any], payload), signed_section, signature - Args: - token (Union[str, bytes]): the encoded JWT. - Returns: - Mapping: The decoded JWT header. - """ +def decode_header(token: Union[str, bytes]) -> Mapping[str, Any]: header, _, _, _ = _unverified_decode(token) return header -def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): - """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token - payload. +def _verify_iat_and_exp(payload: Mapping[str, Any], clock_skew_in_seconds: int = 0) -> None: + now = _helpers.datetime_to_secs(_helpers.utcnow()) # type: ignore - Args: - payload (Mapping[str, str]): The JWT payload. - clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` - validation. - - Raises: - google.auth.exceptions.InvalidValue: if value validation failed. - google.auth.exceptions.MalformedError: if schema validation failed. - """ - now = _helpers.datetime_to_secs(_helpers.utcnow()) - - # Make sure the iat and exp claims are present. for key in ("iat", "exp"): if key not in payload: - raise exceptions.MalformedError( - "Token does not contain required claim {}".format(key) + raise exceptions.MalformedError( # type: ignore + f"Token does not contain required claim {key}" ) - # Make sure the token wasn't issued in the future. iat = payload["iat"] - # Err on the side of accepting a token that is slightly early to account - # for clock skew. earliest = iat - clock_skew_in_seconds if now < earliest: - raise exceptions.InvalidValue( - "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( - now, iat - ) + raise exceptions.InvalidValue( # type: ignore + f"Token used too early, {now} < {iat}. Check that your clock is correct." ) - # Make sure the token wasn't issued in the past. exp = payload["exp"] - # Err on the side of accepting a token that is slightly out of date - # to account for clow skew. latest = exp + clock_skew_in_seconds if latest < now: - raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) - - -def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): - """Decode and verify a JWT. - - Args: - token (str): The encoded JWT. - certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The - certificate used to validate the JWT signature. If bytes or string, - it must the the public key certificate in PEM format. If a mapping, - it must be a mapping of key IDs to public key certificates in PEM - format. The mapping must contain the same key ID that's specified - in the token's header. - verify (bool): Whether to perform signature and claim validation. - Verification is done by default. - audience (str or list): The audience claim, 'aud', that this JWT should - contain. Or a list of audience claims. If None then the JWT's 'aud' - parameter is not verified. - clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` - validation. - - Returns: - Mapping[str, str]: The deserialized JSON payload in the JWT. - - Raises: - google.auth.exceptions.InvalidValue: if value validation failed. - google.auth.exceptions.MalformedError: if schema validation failed. - """ + raise exceptions.InvalidValue(f"Token expired, {latest} < {now}") # type: ignore + + +def decode( + token: Union[str, bytes], + certs: Union[str, bytes, Mapping[str, Union[str, bytes]], None] = None, + verify: bool = True, + audience: Union[str, List[str], None] = None, + clock_skew_in_seconds: int = 0, +) -> Mapping[str, Any]: header, payload, signed_section, signature = _unverified_decode(token) if not verify: return payload - # Pluck the key id and algorithm from the header and make sure we have - # a verifier that can support it. key_alg = header.get("alg") key_id = header.get("kid") try: - verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] + verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] # type: ignore except KeyError as exc: - if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: - raise exceptions.InvalidValue( - "The key algorithm {} requires the cryptography package to be installed.".format( - key_alg - ) - ) from exc - else: - raise exceptions.InvalidValue( - "Unsupported signature algorithm {}".format(key_alg) - ) from exc - # If certs is specified as a dictionary of key IDs to certificates, then - # use the certificate identified by the key ID in the token header. - if isinstance(certs, Mapping): + raise exceptions.InvalidValue(f"Unsupported algorithm: {key_alg}") from exc # type: ignore + + if isinstance(certs, CollectionsMapping): if key_id: if key_id not in certs: - raise exceptions.MalformedError( - "Certificate for key id {} not found.".format(key_id) - ) + raise exceptions.MalformedError(f"Certificate for key id {key_id} not found.") # type: ignore certs_to_check = [certs[key_id]] - # If there's no key id in the header, check against all of the certs. else: - certs_to_check = certs.values() + certs_to_check = list(certs.values()) else: - certs_to_check = certs + certs_to_check = certs # type: ignore - # Verify that the signature matches the message. - if not crypt.verify_signature( - signed_section, signature, certs_to_check, verifier_cls - ): - raise exceptions.MalformedError("Could not verify token signature.") + if not crypt.verify_signature(signed_section, signature, certs_to_check, verifier_cls): # type: ignore + raise exceptions.MalformedError("Could not verify token signature.") # type: ignore - # Verify the issued at and created times in the payload. _verify_iat_and_exp(payload, clock_skew_in_seconds) - # Check audience. if audience is not None: claim_audience = payload.get("aud") if isinstance(audience, str): audience = [audience] if claim_audience not in audience: - raise exceptions.InvalidValue( - "Token has wrong audience {}, expected one of {}".format( - claim_audience, audience - ) + raise exceptions.InvalidValue( # type: ignore + f"Token has wrong audience {claim_audience}, expected one of {audience}" ) return payload class Credentials( - google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject + google.auth.credentials.Signing, + google.auth.credentials.CredentialsWithQuotaProject, ): - """Credentials that use a JWT as the bearer token. - - These credentials require an "audience" claim. This claim identifies the - intended recipient of the bearer token. - - The constructor arguments determine the claims for the JWT that is - sent with requests. Usually, you'll construct these credentials with - one of the helper constructors as shown in the next section. - - To create JWT credentials using a Google service account private key - JSON file:: - - audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' - credentials = jwt.Credentials.from_service_account_file( - 'service-account.json', - audience=audience) - - If you already have the service account file loaded and parsed:: - - service_account_info = json.load(open('service_account.json')) - credentials = jwt.Credentials.from_service_account_info( - service_account_info, - audience=audience) - - Both helper methods pass on arguments to the constructor, so you can - specify the JWT claims:: - - credentials = jwt.Credentials.from_service_account_file( - 'service-account.json', - audience=audience, - additional_claims={'meta': 'data'}) - - You can also construct the credentials directly if you have a - :class:`~google.auth.crypt.Signer` instance:: - - credentials = jwt.Credentials( - signer, - issuer='your-issuer', - subject='your-subject', - audience=audience) - - The claims are considered immutable. If you want to modify the claims, - you can easily create another instance using :meth:`with_claims`:: - - new_audience = ( - 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') - new_credentials = credentials.with_claims(audience=new_audience) - """ - def __init__( self, - signer, - issuer, - subject, - audience, - additional_claims=None, - token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, - quota_project_id=None, - ): - """ - Args: - signer (google.auth.crypt.Signer): The signer used to sign JWTs. - issuer (str): The `iss` claim. - subject (str): The `sub` claim. - audience (str): the `aud` claim. The intended audience for the - credentials. - additional_claims (Mapping[str, str]): Any additional claims for - the JWT payload. - token_lifetime (int): The amount of time in seconds for - which the token is valid. Defaults to 1 hour. - quota_project_id (Optional[str]): The project ID used for quota - and billing. - """ - super(Credentials, self).__init__() + signer: crypt.Signer, + issuer: str, + subject: str, + audience: str, + additional_claims: Optional[Mapping[str, str]] = None, + token_lifetime: int = _DEFAULT_TOKEN_LIFETIME_SECS, + quota_project_id: Optional[str] = None, + ) -> None: + super().__init__() # type: ignore self._signer = signer self._issuer = issuer self._subject = subject self._audience = audience self._token_lifetime = token_lifetime self._quota_project_id = quota_project_id - - if additional_claims is None: - additional_claims = {} - - self._additional_claims = additional_claims + self._additional_claims = dict(additional_claims) if additional_claims else {} @classmethod - def _from_signer_and_info(cls, signer, info, **kwargs): - """Creates a Credentials instance from a signer and service account - info. - - Args: - signer (google.auth.crypt.Signer): The signer used to sign JWTs. - info (Mapping[str, str]): The service account info. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.Credentials: The constructed credentials. - - Raises: - google.auth.exceptions.MalformedError: If the info is not in the expected format. - """ + def _from_signer_and_info( + cls, + signer: crypt.Signer, + info: Mapping[str, str], + **kwargs: Any, + ) -> Credentials: kwargs.setdefault("subject", info["client_email"]) kwargs.setdefault("issuer", info["client_email"]) return cls(signer, **kwargs) @classmethod - def from_service_account_info(cls, info, **kwargs): - """Creates an Credentials instance from a dictionary. - - Args: - info (Mapping[str, str]): The service account info in Google - format. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.Credentials: The constructed credentials. - - Raises: - google.auth.exceptions.MalformedError: If the info is not in the expected format. - """ - signer = _service_account_info.from_dict(info, require=["client_email"]) + def from_service_account_info( + cls, info: Mapping[str, str], **kwargs: Any + ) -> Credentials: + signer = _service_account_info.from_dict(info, require=["client_email"]) # type: ignore return cls._from_signer_and_info(signer, info, **kwargs) @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates a Credentials instance from a service account .json file - in Google format. - - Args: - filename (str): The path to the service account .json file. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.Credentials: The constructed credentials. - """ - info, signer = _service_account_info.from_filename( + def from_service_account_file( + cls, filename: str, **kwargs: Any + ) -> Credentials: + info, signer = _service_account_info.from_filename( # type: ignore filename, require=["client_email"] ) return cls._from_signer_and_info(signer, info, **kwargs) @classmethod - def from_signing_credentials(cls, credentials, audience, **kwargs): - """Creates a new :class:`google.auth.jwt.Credentials` instance from an - existing :class:`google.auth.credentials.Signing` instance. - - The new instance will use the same signer as the existing instance and - will use the existing instance's signer email as the issuer and - subject by default. - - Example:: - - svc_creds = service_account.Credentials.from_service_account_file( - 'service_account.json') - audience = ( - 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') - jwt_creds = jwt.Credentials.from_signing_credentials( - svc_creds, audience=audience) - - Args: - credentials (google.auth.credentials.Signing): The credentials to - use to construct the new credentials. - audience (str): the `aud` claim. The intended audience for the - credentials. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.Credentials: A new Credentials instance. - """ + def from_signing_credentials( + cls, credentials: google.auth.credentials.Signing, audience: str, **kwargs: Any + ) -> Credentials: kwargs.setdefault("issuer", credentials.signer_email) kwargs.setdefault("subject", credentials.signer_email) return cls(credentials.signer, audience=audience, **kwargs) def with_claims( - self, issuer=None, subject=None, audience=None, additional_claims=None - ): - """Returns a copy of these credentials with modified claims. - - Args: - issuer (str): The `iss` claim. If unspecified the current issuer - claim will be used. - subject (str): The `sub` claim. If unspecified the current subject - claim will be used. - audience (str): the `aud` claim. If unspecified the current - audience claim will be used. - additional_claims (Mapping[str, str]): Any additional claims for - the JWT payload. This will be merged with the current - additional claims. - - Returns: - google.auth.jwt.Credentials: A new credentials instance. - """ + self, + issuer: Optional[str] = None, + subject: Optional[str] = None, + additional_claims: Optional[Mapping[str, str]] = None, + ) -> Credentials: new_additional_claims = copy.deepcopy(self._additional_claims) new_additional_claims.update(additional_claims or {}) return self.__class__( self._signer, - issuer=issuer if issuer is not None else self._issuer, - subject=subject if subject is not None else self._subject, - audience=audience if audience is not None else self._audience, + issuer=issuer or self._issuer, + subject=subject or self._subject, + audience=self._audience, additional_claims=new_additional_claims, quota_project_id=self._quota_project_id, ) - @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): + def with_quota_project(self, quota_project_id: str) -> Credentials: return self.__class__( self._signer, issuer=self._issuer, @@ -534,345 +283,37 @@ def with_quota_project(self, quota_project_id): quota_project_id=quota_project_id, ) - def _make_jwt(self): - """Make a signed JWT. - - Returns: - Tuple[bytes, datetime]: The encoded JWT and the expiration. - """ - now = _helpers.utcnow() - lifetime = datetime.timedelta(seconds=self._token_lifetime) - expiry = now + lifetime + def _make_jwt(self) -> Tuple[bytes, datetime.datetime]: + now = _helpers.utcnow() # type: ignore + expiry = now + datetime.timedelta(seconds=self._token_lifetime) - payload = { + payload: dict[str, Any] = { "iss": self._issuer, "sub": self._subject, - "iat": _helpers.datetime_to_secs(now), - "exp": _helpers.datetime_to_secs(expiry), + "iat": _helpers.datetime_to_secs(now), # type: ignore + "exp": _helpers.datetime_to_secs(expiry), # type: ignore } if self._audience: payload["aud"] = self._audience payload.update(self._additional_claims) - jwt = encode(self._signer, payload) - return jwt, expiry - def refresh(self, request): - """Refreshes the access token. - - Args: - request (Any): Unused. - """ - # pylint: disable=unused-argument - # (pylint doesn't correctly recognize overridden methods.) + def refresh(self, request: Any) -> None: self.token, self.expiry = self._make_jwt() - @_helpers.copy_docstring(google.auth.credentials.Signing) - def sign_bytes(self, message): - return self._signer.sign(message) + def sign_bytes(self, message: bytes) -> bytes: + return self._signer.sign(message) # type: ignore - @property # type: ignore - @_helpers.copy_docstring(google.auth.credentials.Signing) - def signer_email(self): + @property + def signer_email(self) -> str: return self._issuer - @property # type: ignore - @_helpers.copy_docstring(google.auth.credentials.Signing) - def signer(self): + @property + def signer(self) -> crypt.Signer: return self._signer - @property # type: ignore - def additional_claims(self): - """ Additional claims the JWT object was created with.""" - return self._additional_claims - - -class OnDemandCredentials( - google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject -): - """On-demand JWT credentials. - - Like :class:`Credentials`, this class uses a JWT as the bearer token for - authentication. However, this class does not require the audience at - construction time. Instead, it will generate a new token on-demand for - each request using the request URI as the audience. It caches tokens - so that multiple requests to the same URI do not incur the overhead - of generating a new token every time. - - This behavior is especially useful for `gRPC`_ clients. A gRPC service may - have multiple audience and gRPC clients may not know all of the audiences - required for accessing a particular service. With these credentials, - no knowledge of the audiences is required ahead of time. - - .. _grpc: http://www.grpc.io/ - """ - - def __init__( - self, - signer, - issuer, - subject, - additional_claims=None, - token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, - max_cache_size=_DEFAULT_MAX_CACHE_SIZE, - quota_project_id=None, - ): - """ - Args: - signer (google.auth.crypt.Signer): The signer used to sign JWTs. - issuer (str): The `iss` claim. - subject (str): The `sub` claim. - additional_claims (Mapping[str, str]): Any additional claims for - the JWT payload. - token_lifetime (int): The amount of time in seconds for - which the token is valid. Defaults to 1 hour. - max_cache_size (int): The maximum number of JWT tokens to keep in - cache. Tokens are cached using :class:`cachetools.LRUCache`. - quota_project_id (Optional[str]): The project ID used for quota - and billing. - - """ - super(OnDemandCredentials, self).__init__() - self._signer = signer - self._issuer = issuer - self._subject = subject - self._token_lifetime = token_lifetime - self._quota_project_id = quota_project_id - - if additional_claims is None: - additional_claims = {} - - self._additional_claims = additional_claims - self._cache = cachetools.LRUCache(maxsize=max_cache_size) - - @classmethod - def _from_signer_and_info(cls, signer, info, **kwargs): - """Creates an OnDemandCredentials instance from a signer and service - account info. - - Args: - signer (google.auth.crypt.Signer): The signer used to sign JWTs. - info (Mapping[str, str]): The service account info. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.OnDemandCredentials: The constructed credentials. - - Raises: - google.auth.exceptions.MalformedError: If the info is not in the expected format. - """ - kwargs.setdefault("subject", info["client_email"]) - kwargs.setdefault("issuer", info["client_email"]) - return cls(signer, **kwargs) - - @classmethod - def from_service_account_info(cls, info, **kwargs): - """Creates an OnDemandCredentials instance from a dictionary. - - Args: - info (Mapping[str, str]): The service account info in Google - format. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.OnDemandCredentials: The constructed credentials. - - Raises: - google.auth.exceptions.MalformedError: If the info is not in the expected format. - """ - signer = _service_account_info.from_dict(info, require=["client_email"]) - return cls._from_signer_and_info(signer, info, **kwargs) - - @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates an OnDemandCredentials instance from a service account .json - file in Google format. - - Args: - filename (str): The path to the service account .json file. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.OnDemandCredentials: The constructed credentials. - """ - info, signer = _service_account_info.from_filename( - filename, require=["client_email"] - ) - return cls._from_signer_and_info(signer, info, **kwargs) - - @classmethod - def from_signing_credentials(cls, credentials, **kwargs): - """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance - from an existing :class:`google.auth.credentials.Signing` instance. - - The new instance will use the same signer as the existing instance and - will use the existing instance's signer email as the issuer and - subject by default. - - Example:: - - svc_creds = service_account.Credentials.from_service_account_file( - 'service_account.json') - jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( - svc_creds) - - Args: - credentials (google.auth.credentials.Signing): The credentials to - use to construct the new credentials. - kwargs: Additional arguments to pass to the constructor. - - Returns: - google.auth.jwt.Credentials: A new Credentials instance. - """ - kwargs.setdefault("issuer", credentials.signer_email) - kwargs.setdefault("subject", credentials.signer_email) - return cls(credentials.signer, **kwargs) - - def with_claims(self, issuer=None, subject=None, additional_claims=None): - """Returns a copy of these credentials with modified claims. - - Args: - issuer (str): The `iss` claim. If unspecified the current issuer - claim will be used. - subject (str): The `sub` claim. If unspecified the current subject - claim will be used. - additional_claims (Mapping[str, str]): Any additional claims for - the JWT payload. This will be merged with the current - additional claims. - - Returns: - google.auth.jwt.OnDemandCredentials: A new credentials instance. - """ - new_additional_claims = copy.deepcopy(self._additional_claims) - new_additional_claims.update(additional_claims or {}) - - return self.__class__( - self._signer, - issuer=issuer if issuer is not None else self._issuer, - subject=subject if subject is not None else self._subject, - additional_claims=new_additional_claims, - max_cache_size=self._cache.maxsize, - quota_project_id=self._quota_project_id, - ) - - @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): - - return self.__class__( - self._signer, - issuer=self._issuer, - subject=self._subject, - additional_claims=self._additional_claims, - max_cache_size=self._cache.maxsize, - quota_project_id=quota_project_id, - ) - @property - def valid(self): - """Checks the validity of the credentials. - - These credentials are always valid because it generates tokens on - demand. - """ - return True - - def _make_jwt_for_audience(self, audience): - """Make a new JWT for the given audience. - - Args: - audience (str): The intended audience. - - Returns: - Tuple[bytes, datetime]: The encoded JWT and the expiration. - """ - now = _helpers.utcnow() - lifetime = datetime.timedelta(seconds=self._token_lifetime) - expiry = now + lifetime - - payload = { - "iss": self._issuer, - "sub": self._subject, - "iat": _helpers.datetime_to_secs(now), - "exp": _helpers.datetime_to_secs(expiry), - "aud": audience, - } - - payload.update(self._additional_claims) - - jwt = encode(self._signer, payload) - - return jwt, expiry - - def _get_jwt_for_audience(self, audience): - """Get a JWT For a given audience. - - If there is already an existing, non-expired token in the cache for - the audience, that token is used. Otherwise, a new token will be - created. - - Args: - audience (str): The intended audience. - - Returns: - bytes: The encoded JWT. - """ - token, expiry = self._cache.get(audience, (None, None)) - - if token is None or expiry < _helpers.utcnow(): - token, expiry = self._make_jwt_for_audience(audience) - self._cache[audience] = token, expiry - - return token - - def refresh(self, request): - """Raises an exception, these credentials can not be directly - refreshed. - - Args: - request (Any): Unused. - - Raises: - google.auth.RefreshError - """ - # pylint: disable=unused-argument - # (pylint doesn't correctly recognize overridden methods.) - raise exceptions.RefreshError( - "OnDemandCredentials can not be directly refreshed." - ) - - def before_request(self, request, method, url, headers): - """Performs credential-specific before request logic. - - Args: - request (Any): Unused. JWT credentials do not need to make an - HTTP request to refresh. - method (str): The request's HTTP method. - url (str): The request's URI. This is used as the audience claim - when generating the JWT. - headers (Mapping): The request's headers. - """ - # pylint: disable=unused-argument - # (pylint doesn't correctly recognize overridden methods.) - parts = urllib.parse.urlsplit(url) - # Strip query string and fragment - audience = urllib.parse.urlunsplit( - (parts.scheme, parts.netloc, parts.path, "", "") - ) - token = self._get_jwt_for_audience(audience) - self.apply(headers, token=token) - - @_helpers.copy_docstring(google.auth.credentials.Signing) - def sign_bytes(self, message): - return self._signer.sign(message) - - @property # type: ignore - @_helpers.copy_docstring(google.auth.credentials.Signing) - def signer_email(self): - return self._issuer - - @property # type: ignore - @_helpers.copy_docstring(google.auth.credentials.Signing) - def signer(self): - return self._signer + def additional_claims(self) -> Mapping[str, str]: + return self._additional_claims