From 97d3c6c1b4615474fd9511a8442cc0a33a361c4c Mon Sep 17 00:00:00 2001 From: Juan Carlos Date: Tue, 5 Nov 2024 17:13:39 -0300 Subject: [PATCH 1/2] feat: Check if token is a JWT --- postgrest/utils.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/postgrest/utils.py b/postgrest/utils.py index 31cda49..d2f3c48 100644 --- a/postgrest/utils.py +++ b/postgrest/utils.py @@ -1,11 +1,14 @@ from __future__ import annotations +import re from typing import Any, Type, TypeVar, cast, get_origin from urllib.parse import urlparse from httpx import AsyncClient # noqa: F401 from httpx import Client as BaseClient # noqa: F401 +BASE64URL_REGEX = r"^([a-z0-9_-]{4})*($|[a-z0-9_-]{3}$|[a-z0-9_-]{2}$)$" + class SyncClient(BaseClient): def aclose(self) -> None: @@ -40,3 +43,26 @@ def get_origin_and_cast(typ: type[type[_T]]) -> type[_T]: def is_http_url(url: str) -> bool: return urlparse(url).scheme in {"https", "http"} + + +def is_valid_jwt(value: str) -> bool: + """Checks if value looks like a JWT, does not do any extra parsing.""" + if not isinstance(value, str): + return False + + # Remove trailing whitespaces if any. + value = value.strip() + + # Remove "Bearer " prefix if any. + if value.startswith("Bearer "): + value = value[7:] + + # Valid JWT must have 2 dots (Header.Paylod.Signature) + if value.count(".") != 2: + return False + + for part in value.split("."): + if not re.search(BASE64URL_REGEX, part, re.IGNORECASE): + return False + + return True From 96c1bcf121a50dee3bf60f4f5ba488d4b7d70ebf Mon Sep 17 00:00:00 2001 From: Juan Carlos Date: Tue, 5 Nov 2024 17:13:49 -0300 Subject: [PATCH 2/2] feat: Check if token is a JWT --- postgrest/base_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/postgrest/base_client.py b/postgrest/base_client.py index db6fffc..e2bf417 100644 --- a/postgrest/base_client.py +++ b/postgrest/base_client.py @@ -5,7 +5,7 @@ from httpx import BasicAuth, Timeout -from .utils import AsyncClient, SyncClient, is_http_url +from .utils import AsyncClient, SyncClient, is_http_url, is_valid_jwt class BasePostgrestClient(ABC): @@ -58,6 +58,8 @@ def auth( Bearer token is preferred if both ones are provided. """ if token: + if not is_valid_jwt(token): + ValueError("token must be a valid JWT authorization token") self.session.headers["Authorization"] = f"Bearer {token}" elif username: self.session.auth = BasicAuth(username, password)