From 42d15a19aec1f8e5a247bfe1a6298232d491f87e Mon Sep 17 00:00:00 2001 From: Keegan Caruso Date: Tue, 30 Sep 2025 17:24:10 -0700 Subject: [PATCH 1/2] Python adapter --- .gitignore | 4 + .../appsettings.json | 44 +-- .../MicrosoftIdentityWebSidecarClient.py | 361 ++++++++++++++++++ tests/DevApps/SidecarAdapter/python/README.md | 40 ++ .../SidecarAdapter/python/get_token.py | 38 ++ tests/DevApps/SidecarAdapter/python/main.py | 343 +++++++++++++++++ 6 files changed, 804 insertions(+), 26 deletions(-) create mode 100644 tests/DevApps/SidecarAdapter/python/MicrosoftIdentityWebSidecarClient.py create mode 100644 tests/DevApps/SidecarAdapter/python/README.md create mode 100644 tests/DevApps/SidecarAdapter/python/get_token.py create mode 100644 tests/DevApps/SidecarAdapter/python/main.py diff --git a/.gitignore b/.gitignore index f4eef6e87..904b8061a 100644 --- a/.gitignore +++ b/.gitignore @@ -362,3 +362,7 @@ objd/ # Generated data protection keys key*.xml + +# bin files + +*.bin diff --git a/src/Microsoft.Identity.Web.Sidecar/appsettings.json b/src/Microsoft.Identity.Web.Sidecar/appsettings.json index a59d70ceb..3ceacc805 100644 --- a/src/Microsoft.Identity.Web.Sidecar/appsettings.json +++ b/src/Microsoft.Identity.Web.Sidecar/appsettings.json @@ -6,38 +6,30 @@ before the project can be successfully executed. For more info see https://aka.ms/dotnet-template-ms-identity-platform */ "AzureAd": { - "Instance": "https://login.microsoftonline.com/", // https://login.microsoftonline.com/ - "TenantId": "f645ad92-e38d-4d1a-b510-d1b09a74a8ca", // f645ad92-e38d-4d1a-b510-d1b09a74a8ca - "ClientId": "556d438d-2f4b-4add-9713-ede4e5f5d7da", //"556d438d-2f4b-4add-9713-ede4e5f5d7da" + "Instance": "https://login.microsoftonline.com/", + "TenantId": "", // f645ad92-e38d-4d1a-b510-d1b09a74a8ca + "ClientId": "", //"556d438d-2f4b-4add-9713-ede4e5f5d7da" - // "Scopes": "access_as_user" // access_as_user + // "Scopes": "" // access_as_user - "ClientCredentials": [ - { - "SourceType": "StoreWithDistinguishedName", - "CertificateStorePath": "LocalMachine/My", - "CertificateDistinguishedName": "CN=LabAuth.MSIDLab.com" - } - ], + //"ClientCredentials": [ + // { + // "SourceType": "StoreWithDistinguishedName", + // "CertificateStorePath": "LocalMachine/My", + // "CertificateDistinguishedName": "CN=LabAuth.MSIDLab.com" + // } + //], "AllowWebApiToBeAuthorizedByACL": true }, - "DownstreamApis": { - "me": { - "BaseUrl": "https://graph.microsoft.com/v1.0/", - "RelativePath": "me", - "Scopes": [ "User.Read" ] - }, - "me-no-relative-path": { - "BaseUrl": "https://graph.microsoft.com/v1.0/", - "Scopes": [ "User.Read" ] - }, - "me-no-scp": { - "BaseUrl": "https://graph.microsoft.com/v1.0/", - "RelativePath": "me" - } - }, + //"DownstreamApis": { + // "me": { + // "BaseUrl": "https://graph.microsoft.com/v1.0/", + // "RelativePath": "me", + // "Scopes": [ "User.Read" ] + // } + //}, "Logging": { "LogLevel": { diff --git a/tests/DevApps/SidecarAdapter/python/MicrosoftIdentityWebSidecarClient.py b/tests/DevApps/SidecarAdapter/python/MicrosoftIdentityWebSidecarClient.py new file mode 100644 index 000000000..9a6acd188 --- /dev/null +++ b/tests/DevApps/SidecarAdapter/python/MicrosoftIdentityWebSidecarClient.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Sequence +from urllib.parse import urljoin + +import requests + + +JsonDict = Dict[str, Any] + + +@dataclass(frozen=True) +class ProblemDetails: + """Represents the RFC 7807 problem details payload returned by the sidecar.""" + + type: Optional[str] + title: Optional[str] + status: Optional[int] + detail: Optional[str] + instance: Optional[str] + + @staticmethod + def from_dict(data: Mapping[str, Any]) -> "ProblemDetails": + return ProblemDetails( + type=data.get("type"), + title=data.get("title"), + status=data.get("status"), + detail=data.get("detail"), + instance=data.get("instance"), + ) + + +@dataclass(frozen=True) +class AuthorizationHeaderResult: + authorization_header: str + + @staticmethod + def from_dict(data: Mapping[str, Any]) -> "AuthorizationHeaderResult": + return AuthorizationHeaderResult(authorization_header=data["authorizationHeader"]) + + +@dataclass(frozen=True) +class DownstreamApiResult: + status_code: int + headers: Mapping[str, Any] + content: Any + + @staticmethod + def from_dict(data: Mapping[str, Any]) -> "DownstreamApiResult": + return DownstreamApiResult( + status_code=data["statusCode"], + headers=data.get("headers", {}), + content=data.get("content"), + ) + + +@dataclass(frozen=True) +class ValidateAuthorizationHeaderResult: + protocol: str + token: str + claims: Mapping[str, Any] + + @staticmethod + def from_dict(data: Mapping[str, Any]) -> "ValidateAuthorizationHeaderResult": + return ValidateAuthorizationHeaderResult( + protocol=data["protocol"], + token=data["token"], + claims=data.get("claims", {}), + ) + + +@dataclass(frozen=True) +class AcquireTokenOptions: + tenant: Optional[str] = None + force_refresh: Optional[bool] = None + claims: Optional[str] = None + correlation_id: Optional[str] = None + long_running_web_api_session_key: Optional[str] = None + fmi_path: Optional[str] = None + pop_public_key: Optional[str] = None + managed_identity_user_assigned_client_id: Optional[str] = None + + +@dataclass(frozen=True) +class SidecarCallOptions: + scopes: Optional[Sequence[str]] = None + request_app_token: Optional[bool] = None + base_url: Optional[str] = None + relative_path: Optional[str] = None + http_method: Optional[str] = None + accept_header: Optional[str] = None + content_type: Optional[str] = None + acquire_token_options: Optional[AcquireTokenOptions] = None + + +class SidecarError(Exception): + """Raised when the sidecar returns an error response.""" + + def __init__(self, status_code: int, message: str, problem_details: Optional[ProblemDetails] = None) -> None: + super().__init__(message) + self.status_code = status_code + self.problem_details = problem_details + + +class MicrosoftIdentityWebSidecarClient: + """Client for the Microsoft.Identity.Web.Sidecar endpoints.""" + + def __init__( + self, + base_url: str, + *, + session: Optional[requests.Session] = None, + default_headers: Optional[Mapping[str, str]] = None, + timeout: Optional[float] = 30.0, + ) -> None: + self._base_url = base_url.rstrip("/") + "/" + self._session = session or requests.Session() + self._owns_session = session is None + self._default_headers: Dict[str, str] = dict(default_headers or {}) + self._timeout = timeout + + def close(self) -> None: + if self._owns_session: + self._session.close() + + def __enter__(self) -> "MicrosoftIdentityWebSidecarClient": + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore[override] + self.close() + + def validate_authorization_header(self, authorization_header: str) -> ValidateAuthorizationHeaderResult: + response_data = self._send_json( + method="GET", + path="Validate", + headers={"Authorization": authorization_header}, + ) + return ValidateAuthorizationHeaderResult.from_dict(response_data) + + def get_authorization_header( + self, + api_name: str, + authorization_header: str, + *, + agent_identity: Optional[str] = None, + agent_username: Optional[str] = None, + options: Optional[SidecarCallOptions] = None, + ) -> AuthorizationHeaderResult: + params = self._build_query_parameters(agent_identity, agent_username, options) + response_data = self._send_json( + method="GET", + path=f"AuthorizationHeader/{api_name}", + headers={"Authorization": authorization_header}, + params=params, + ) + return AuthorizationHeaderResult.from_dict(response_data) + + def get_authorization_header_unauthenticated( + self, + api_name: str, + *, + agent_identity: Optional[str] = None, + agent_username: Optional[str] = None, + options: Optional[SidecarCallOptions] = None, + ) -> AuthorizationHeaderResult: + params = self._build_query_parameters(agent_identity, agent_username, options) + response_data = self._send_json( + method="GET", + path=f"AuthorizationHeaderUnauthenticated/{api_name}", + params=params, + ) + return AuthorizationHeaderResult.from_dict(response_data) + + def invoke_downstream_api( + self, + api_name: str, + authorization_header: str, + *, + agent_identity: Optional[str] = None, + agent_username: Optional[str] = None, + options: Optional[SidecarCallOptions] = None, + json_body: Any = None, + ) -> DownstreamApiResult: + params = self._build_query_parameters(agent_identity, agent_username, options) + response_data = self._send_json( + method="POST", + path=f"DownstreamApi/{api_name}", + headers={"Authorization": authorization_header}, + params=params, + json=json_body, + ) + return DownstreamApiResult.from_dict(response_data) + + def invoke_downstream_api_unauthenticated( + self, + api_name: str, + *, + agent_identity: Optional[str] = None, + agent_username: Optional[str] = None, + options: Optional[SidecarCallOptions] = None, + json_body: Any = None, + ) -> DownstreamApiResult: + params = self._build_query_parameters(agent_identity, agent_username, options) + response_data = self._send_json( + method="POST", + path=f"DownstreamApiUnauthenticated/{api_name}", + params=params, + json=json_body, + ) + return DownstreamApiResult.from_dict(response_data) + + def with_default_authorization(self, authorization_header: str) -> "MicrosoftIdentityWebSidecarClient": + """Return a new client instance that always sends the given Authorization header.""" + + headers = dict(self._default_headers) + headers["Authorization"] = authorization_header + return MicrosoftIdentityWebSidecarClient( + self._base_url, + session=self._session, + default_headers=headers, + timeout=self._timeout, + ) + + def _build_query_parameters( + self, + agent_identity: Optional[str], + agent_username: Optional[str], + options: Optional[SidecarCallOptions], + ) -> Dict[str, Any]: + params: Dict[str, Any] = {} + if agent_identity: + params["AgentIdentity"] = agent_identity + if agent_username: + params["AgentUsername"] = agent_username + + if options: + if options.scopes: + params["optionsOverride.Scopes"] = list(options.scopes) + if options.request_app_token is not None: + params["optionsOverride.RequestAppToken"] = _to_bool_str(options.request_app_token) + if options.base_url: + params["optionsOverride.BaseUrl"] = options.base_url + if options.relative_path: + params["optionsOverride.RelativePath"] = options.relative_path + if options.http_method: + params["optionsOverride.HttpMethod"] = options.http_method + if options.accept_header: + params["optionsOverride.AcceptHeader"] = options.accept_header + if options.content_type: + params["optionsOverride.ContentType"] = options.content_type + + if options.acquire_token_options: + acquire_options = options.acquire_token_options + if acquire_options.tenant: + params["optionsOverride.AcquireTokenOptions.Tenant"] = acquire_options.tenant + if acquire_options.force_refresh is not None: + params[ + "optionsOverride.AcquireTokenOptions.ForceRefresh" + ] = _to_bool_str(acquire_options.force_refresh) + if acquire_options.claims: + params["optionsOverride.AcquireTokenOptions.Claims"] = acquire_options.claims + if acquire_options.correlation_id: + params[ + "optionsOverride.AcquireTokenOptions.CorrelationId" + ] = acquire_options.correlation_id + if acquire_options.long_running_web_api_session_key: + params[ + "optionsOverride.AcquireTokenOptions.LongRunningWebApiSessionKey" + ] = acquire_options.long_running_web_api_session_key + if acquire_options.fmi_path: + params["optionsOverride.AcquireTokenOptions.FmiPath"] = acquire_options.fmi_path + if acquire_options.pop_public_key: + params[ + "optionsOverride.AcquireTokenOptions.PopPublicKey" + ] = acquire_options.pop_public_key + if acquire_options.managed_identity_user_assigned_client_id: + params[ + "optionsOverride.AcquireTokenOptions.ManagedIdentity.UserAssignedClientId" + ] = acquire_options.managed_identity_user_assigned_client_id + return params + + def _send_json( + self, + *, + method: str, + path: str, + headers: Optional[Mapping[str, str]] = None, + params: Optional[Mapping[str, Any]] = None, + json: Any = None, + ) -> JsonDict: + response = self._send( + method=method, + path=path, + headers=headers, + params=params, + json=json, + ) + try: + return response.json() + except ValueError as exc: + raise SidecarError(response.status_code, "Expected JSON response from sidecar") from exc + + def _send( + self, + *, + method: str, + path: str, + headers: Optional[Mapping[str, str]] = None, + params: Optional[Mapping[str, Any]] = None, + json: Any = None, + ) -> requests.Response: + url = urljoin(self._base_url, path) + request_headers: MutableMapping[str, str] = dict(self._default_headers) + if headers: + request_headers.update(headers) + + prepared_params = _prepare_params(params) + + response = self._session.request( + method=method, + url=url, + headers=request_headers, + params=prepared_params, + json=json, + timeout=self._timeout, + ) + if response.status_code >= 400: + self._raise_sidecar_error(response) + return response + + def _raise_sidecar_error(self, response: requests.Response) -> None: + problem_details: Optional[ProblemDetails] = None + message = f"Sidecar request failed with status code {response.status_code}" + try: + data = response.json() + except ValueError: + pass + else: + if isinstance(data, Mapping): + problem_details = ProblemDetails.from_dict(data) + detail = problem_details.detail or problem_details.title + if detail: + message = detail + raise SidecarError(response.status_code, message, problem_details) + + +def _prepare_params(params: Optional[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + if not params: + return params + prepared: Dict[str, Any] = {} + for key, value in params.items(): + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): + prepared[key] = list(value) + else: + prepared[key] = value + return prepared + + +def _to_bool_str(value: bool) -> str: + return "true" if value else "false" diff --git a/tests/DevApps/SidecarAdapter/python/README.md b/tests/DevApps/SidecarAdapter/python/README.md new file mode 100644 index 000000000..902e2a3c3 --- /dev/null +++ b/tests/DevApps/SidecarAdapter/python/README.md @@ -0,0 +1,40 @@ +# Sidecar Adapter Python + +This folder contains helper scripts for interacting with the Microsoft Identity Web Sidecar from Python. + +## Contents + +- `MicrosoftIdentityWebSidecarClient.py` – Typed client covering the Sidecar's `/Validate`, `/AuthorizationHeader`, and `/DownstreamApi` endpoints. +- `main.py` – Command-line harness that exercises the client and prints JSON responses. +- `get_token.py` – Helper for obtaining a user token via MSAL. +``` + +## Usage + +Display the available commands: + +```sh +uv run --with requests main.py --help +``` + +Example: validate an authorization header returned by `get_token.py`: + +```sh +$token = uv run --with msal get_token.py +uv run --with requests main.py --base-url https://localhost:5001/sidecar --authorization-header "Bearer $token" validate +``` + +Invoke a downstream API by name, supplying an override scope and a JSON payload stored in `body.json`: + +```sh +uv run --with requests main.py --base-url https://localhost:5001/sidecar --authorization-header "Bearer $token" ` + --scope User.Read invoke-downstream graphApi --body-file body.json +``` + +Invoke a downstream API by name, use the credentials configured by the application: + +```sh +uv run --with requests main.py --base-url=http://localhost:5178 --agent-username=username --agent-identity=id invoke-downstream-unauth me +``` + +For client-credential flows, omit `--authorization-header` and use the unauthenticated commands such as `get-auth-header-unauth` or `invoke-downstream-unauth`. diff --git a/tests/DevApps/SidecarAdapter/python/get_token.py b/tests/DevApps/SidecarAdapter/python/get_token.py new file mode 100644 index 000000000..e164f6a56 --- /dev/null +++ b/tests/DevApps/SidecarAdapter/python/get_token.py @@ -0,0 +1,38 @@ +from msal import PublicClientApplication, SerializableTokenCache +import os + +# Persistent token cache +cache = SerializableTokenCache() + +# Load cache from file if exists +if os.path.exists("token_cache.bin"): + cache.deserialize(open("token_cache.bin", "r").read()) + +app = PublicClientApplication( + client_id="f79f9db9-c582-4b7b-9d4c-0e8fd40623f0", + authority="https://login.microsoftonline.com/f645ad92-e38d-4d1a-b510-d1b09a74a8ca", + token_cache=cache +) + +# Try silent acquisition first +accounts = app.get_accounts() +result = None + +if accounts: + result = app.acquire_token_silent( + scopes=["api://556d438d-2f4b-4add-9713-ede4e5f5d7da/access_as_user"], + account=accounts[0] + ) + +if not result: + result = app.acquire_token_interactive(scopes=["api://556d438d-2f4b-4add-9713-ede4e5f5d7da/access_as_user"]) + +# Save cache after acquisition +if cache.has_state_changed: + with open("token_cache.bin", "w") as cache_file: + cache_file.write(cache.serialize()) + +if (result): + print("Access token acquired:", result["access_token"]) +else: + print("Failed to acquire token:", result.get("error"), result.get("error_description")) diff --git a/tests/DevApps/SidecarAdapter/python/main.py b/tests/DevApps/SidecarAdapter/python/main.py new file mode 100644 index 000000000..eb6aaa6c8 --- /dev/null +++ b/tests/DevApps/SidecarAdapter/python/main.py @@ -0,0 +1,343 @@ +import argparse +import json +from pathlib import Path +from typing import Any, Optional + +from MicrosoftIdentityWebSidecarClient import ( + AcquireTokenOptions, + MicrosoftIdentityWebSidecarClient, + SidecarCallOptions, + SidecarError, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Exercise the Microsoft Identity Web Sidecar client against a running sidecar instance.", + ) + parser.add_argument( + "--base-url", + required=True, + help="Fully qualified base URL for the sidecar (e.g. https://localhost:5001/sidecar).", + ) + parser.add_argument( + "--authorization-header", + help="Authorization header to send for authenticated endpoints (e.g. 'Bearer ').", + ) + parser.add_argument( + "--agent-identity", + help="Optional AgentIdentity query parameter for the sidecar call.", + ) + parser.add_argument( + "--agent-username", + help="Optional AgentUsername query parameter for the sidecar call.", + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + subparsers.add_parser("validate", help="Validate an authorization header using the /Validate endpoint.") + + auth_header_parser = subparsers.add_parser( + "get-auth-header", + help="Call /AuthorizationHeader/{apiName}.", + ) + auth_header_parser.add_argument("api_name", help="Configured API name defined in the sidecar configuration.") + _augment_with_options_override(auth_header_parser) + + auth_header_unauth_parser = subparsers.add_parser( + "get-auth-header-unauth", help="Call /AuthorizationHeaderUnauthenticated/{apiName}." + ) + auth_header_unauth_parser.add_argument( + "api_name", help="Configured API name defined in the sidecar configuration." + ) + _augment_with_options_override(auth_header_unauth_parser) + + downstream_parser = subparsers.add_parser( + "invoke-downstream", + help="Call /DownstreamApi/{apiName} with an optional JSON body.", + ) + downstream_parser.add_argument("api_name", help="Configured API name defined in the sidecar configuration.") + downstream_parser.add_argument( + "--body-json", + help="Inline JSON payload to POST to the downstream API.", + ) + downstream_parser.add_argument( + "--body-file", + type=Path, + help="Path to a JSON file to POST to the downstream API.", + ) + _augment_with_options_override(downstream_parser) + + downstream_unauth_parser = subparsers.add_parser( + "invoke-downstream-unauth", + help="Call /DownstreamApiUnauthenticated/{apiName} with an optional JSON body.", + ) + downstream_unauth_parser.add_argument("api_name", help="Configured API name defined in the sidecar configuration.") + downstream_unauth_parser.add_argument( + "--body-json", + help="Inline JSON payload to POST to the downstream API.", + ) + downstream_unauth_parser.add_argument( + "--body-file", + type=Path, + help="Path to a JSON file to POST to the downstream API.", + ) + _augment_with_options_override(downstream_unauth_parser) + + return parser.parse_args() + + +def _augment_with_options_override(subparser: argparse.ArgumentParser) -> None: + subparser.add_argument( + "--scope", + dest="scopes", + action="append", + help="Repeatable. Adds a scope to optionsOverride.Scopes.", + ) + subparser.add_argument( + "--request-app-token", + dest="request_app_token", + action="store_const", + const=True, + default=None, + help="Set optionsOverride.RequestAppToken=true.", + ) + subparser.add_argument( + "--base-url-override", + dest="override_base_url", + help="Sets optionsOverride.BaseUrl.", + ) + subparser.add_argument( + "--relative-path", + dest="relative_path", + help="Sets optionsOverride.RelativePath.", + ) + subparser.add_argument( + "--http-method", + dest="http_method", + help="Sets optionsOverride.HttpMethod.", + ) + subparser.add_argument( + "--accept-header", + dest="accept_header", + help="Sets optionsOverride.AcceptHeader.", + ) + subparser.add_argument( + "--content-type", + dest="content_type", + help="Sets optionsOverride.ContentType.", + ) + subparser.add_argument( + "--tenant", + dest="tenant", + help="Sets optionsOverride.AcquireTokenOptions.Tenant.", + ) + subparser.add_argument( + "--force-refresh", + dest="force_refresh", + action="store_const", + const=True, + default=None, + help="Sets optionsOverride.AcquireTokenOptions.ForceRefresh=true.", + ) + subparser.add_argument( + "--claims", + dest="claims", + help="Sets optionsOverride.AcquireTokenOptions.Claims.", + ) + subparser.add_argument( + "--correlation-id", + dest="correlation_id", + help="Sets optionsOverride.AcquireTokenOptions.CorrelationId.", + ) + subparser.add_argument( + "--long-running-session-key", + dest="long_running_session_key", + help="Sets optionsOverride.AcquireTokenOptions.LongRunningWebApiSessionKey.", + ) + subparser.add_argument( + "--fmi-path", + dest="fmi_path", + help="Sets optionsOverride.AcquireTokenOptions.FmiPath.", + ) + subparser.add_argument( + "--pop-public-key", + dest="pop_public_key", + help="Sets optionsOverride.AcquireTokenOptions.PopPublicKey.", + ) + subparser.add_argument( + "--managed-identity-client-id", + dest="managed_identity_client_id", + help="Sets optionsOverride.AcquireTokenOptions.ManagedIdentity.UserAssignedClientId.", + ) + + +def build_call_options(args: argparse.Namespace) -> Optional[SidecarCallOptions]: + if not any( + getattr(args, attr, None) + for attr in ( + "scopes", + "request_app_token", + "override_base_url", + "relative_path", + "http_method", + "accept_header", + "content_type", + "tenant", + "force_refresh", + "claims", + "correlation_id", + "long_running_session_key", + "fmi_path", + "pop_public_key", + "managed_identity_client_id", + ) + ): + return None + + acquire_options = AcquireTokenOptions( + tenant=getattr(args, "tenant", None), + force_refresh=getattr(args, "force_refresh", None), + claims=getattr(args, "claims", None), + correlation_id=getattr(args, "correlation_id", None), + long_running_web_api_session_key=getattr(args, "long_running_session_key", None), + fmi_path=getattr(args, "fmi_path", None), + pop_public_key=getattr(args, "pop_public_key", None), + managed_identity_user_assigned_client_id=getattr(args, "managed_identity_client_id", None), + ) + + if not any( + getattr(acquire_options, field) + is not None + for field in ( + "tenant", + "force_refresh", + "claims", + "correlation_id", + "long_running_web_api_session_key", + "fmi_path", + "pop_public_key", + "managed_identity_user_assigned_client_id", + ) + ): + acquire_options = None + + return SidecarCallOptions( + scopes=getattr(args, "scopes", None), + request_app_token=getattr(args, "request_app_token", None), + base_url=getattr(args, "override_base_url", None), + relative_path=getattr(args, "relative_path", None), + http_method=getattr(args, "http_method", None), + accept_header=getattr(args, "accept_header", None), + content_type=getattr(args, "content_type", None), + acquire_token_options=acquire_options, + ) + + +def _resolve_json_body(args: argparse.Namespace) -> Optional[Any]: + if getattr(args, "body_json", None): + return json.loads(args.body_json) + if getattr(args, "body_file", None): + data = args.body_file.read_text(encoding="utf-8") + return json.loads(data) + return None + + +def ensure_authorization_header(args: argparse.Namespace) -> str: + if not args.authorization_header: + raise SystemExit("This command requires --authorization-header.") + return args.authorization_header + + +def main() -> None: + args = parse_args() + options = build_call_options(args) + + try: + with MicrosoftIdentityWebSidecarClient(args.base_url) as client: + if args.command == "validate": + authorization_header = ensure_authorization_header(args) + result = client.validate_authorization_header(authorization_header) + _print_json({ + "protocol": result.protocol, + "token": result.token, + "claims": result.claims, + }) + elif args.command == "get-auth-header": + authorization_header = ensure_authorization_header(args) + result = client.get_authorization_header( + args.api_name, + authorization_header, + agent_identity=args.agent_identity, + agent_username=args.agent_username, + options=options, + ) + _print_json({"authorizationHeader": result.authorization_header}) + elif args.command == "get-auth-header-unauth": + result = client.get_authorization_header_unauthenticated( + args.api_name, + agent_identity=args.agent_identity, + agent_username=args.agent_username, + options=options, + ) + _print_json({"authorizationHeader": result.authorization_header}) + elif args.command == "invoke-downstream": + authorization_header = ensure_authorization_header(args) + body = _resolve_json_body(args) + result = client.invoke_downstream_api( + args.api_name, + authorization_header, + agent_identity=args.agent_identity, + agent_username=args.agent_username, + options=options, + json_body=body, + ) + _print_json({ + "statusCode": result.status_code, + "headers": result.headers, + "content": result.content, + }) + elif args.command == "invoke-downstream-unauth": + body = _resolve_json_body(args) + result = client.invoke_downstream_api_unauthenticated( + args.api_name, + agent_identity=args.agent_identity, + agent_username=args.agent_username, + options=options, + json_body=body, + ) + _print_json({ + "statusCode": result.status_code, + "headers": result.headers, + "content": result.content, + }) + else: + raise SystemExit(f"Unsupported command: {args.command}") + except SidecarError as sidecar_error: + _handle_sidecar_error(sidecar_error) + + +def _print_json(payload: Any) -> None: + print(json.dumps(payload, indent=2, ensure_ascii=False)) + + +def _handle_sidecar_error(error: SidecarError) -> None: + details: dict[str, Any] = { + "statusCode": error.status_code, + } + if error.problem_details: + details["problemDetails"] = { + "type": error.problem_details.type, + "title": error.problem_details.title, + "status": error.problem_details.status, + "detail": error.problem_details.detail, + "instance": error.problem_details.instance, + } + else: + details["message"] = str(error) + print(json.dumps(details, indent=2, ensure_ascii=False)) + raise SystemExit(1) + + +if __name__ == "__main__": + main() From 59260a965b8520fdd6a7981a8083ff6138bf53fb Mon Sep 17 00:00:00 2001 From: Keegan Caruso Date: Wed, 1 Oct 2025 14:16:24 -0700 Subject: [PATCH 2/2] feedback --- tests/DevApps/SidecarAdapter/python/README.md | 20 +++++++--- .../SidecarAdapter/python/get_token.py | 38 ++++++++++++++++--- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/tests/DevApps/SidecarAdapter/python/README.md b/tests/DevApps/SidecarAdapter/python/README.md index 902e2a3c3..1ff45fd38 100644 --- a/tests/DevApps/SidecarAdapter/python/README.md +++ b/tests/DevApps/SidecarAdapter/python/README.md @@ -2,6 +2,10 @@ This folder contains helper scripts for interacting with the Microsoft Identity Web Sidecar from Python. +## Requirements + +- [Install UV](https://astral.sh/uv) + ## Contents - `MicrosoftIdentityWebSidecarClient.py` – Typed client covering the Sidecar's `/Validate`, `/AuthorizationHeader`, and `/DownstreamApi` endpoints. @@ -17,24 +21,30 @@ Display the available commands: uv run --with requests main.py --help ``` +The examples depend on setting these variables + +```sh +$side_car_url = "" +# Example values, use appropriate values for the token you want to request. +$token = uv run --with msal get_token.py --client-id "f79f9db9-c582-4b7b-9d4c-0e8fd40623f0" --authority "https://login.microsoftonline.com/f645ad92-e38d-4d1a-b510-d1b09a74a8ca" --scope "api://556d438d-2f4b-4add-9713-ede4e5f5d7da/access_as_user" +``` + Example: validate an authorization header returned by `get_token.py`: ```sh -$token = uv run --with msal get_token.py -uv run --with requests main.py --base-url https://localhost:5001/sidecar --authorization-header "Bearer $token" validate +uv run --with requests main.py --base-url $side_car_url --authorization-header "Bearer $token" validate ``` Invoke a downstream API by name, supplying an override scope and a JSON payload stored in `body.json`: ```sh -uv run --with requests main.py --base-url https://localhost:5001/sidecar --authorization-header "Bearer $token" ` - --scope User.Read invoke-downstream graphApi --body-file body.json +uv run --with requests main.py --base-url $side_car_url --authorization-header "Bearer $token" --scope invoke-downstream --body-file ``` Invoke a downstream API by name, use the credentials configured by the application: ```sh -uv run --with requests main.py --base-url=http://localhost:5178 --agent-username=username --agent-identity=id invoke-downstream-unauth me +uv run --with requests main.py --base-url=$side_car_url --agent-username= --agent-identity= invoke-downstream-unauth me ``` For client-credential flows, omit `--authorization-header` and use the unauthenticated commands such as `get-auth-header-unauth` or `invoke-downstream-unauth`. diff --git a/tests/DevApps/SidecarAdapter/python/get_token.py b/tests/DevApps/SidecarAdapter/python/get_token.py index e164f6a56..5fae1ecd4 100644 --- a/tests/DevApps/SidecarAdapter/python/get_token.py +++ b/tests/DevApps/SidecarAdapter/python/get_token.py @@ -1,6 +1,8 @@ -from msal import PublicClientApplication, SerializableTokenCache +import argparse import os +from msal import PublicClientApplication, SerializableTokenCache + # Persistent token cache cache = SerializableTokenCache() @@ -8,9 +10,33 @@ if os.path.exists("token_cache.bin"): cache.deserialize(open("token_cache.bin", "r").read()) +parser = argparse.ArgumentParser( + description="Acquire a token using MSAL with a persistent cache." +) +parser.add_argument( + "--client-id", + required=True, + help="The application (client) ID registered in Azure AD." +) +parser.add_argument( + "--authority", + required=True, + help="The authority URL, e.g. https://login.microsoftonline.com/." +) +parser.add_argument( + "--scope", + required=True, + help="The scope for the access token." +) +args = parser.parse_args() + +client_id = args.client_id +authority = args.authority +scope = args.scope + app = PublicClientApplication( - client_id="f79f9db9-c582-4b7b-9d4c-0e8fd40623f0", - authority="https://login.microsoftonline.com/f645ad92-e38d-4d1a-b510-d1b09a74a8ca", + client_id=client_id, + authority=authority, token_cache=cache ) @@ -20,12 +46,12 @@ if accounts: result = app.acquire_token_silent( - scopes=["api://556d438d-2f4b-4add-9713-ede4e5f5d7da/access_as_user"], + scopes=[scope], account=accounts[0] ) if not result: - result = app.acquire_token_interactive(scopes=["api://556d438d-2f4b-4add-9713-ede4e5f5d7da/access_as_user"]) + result = app.acquire_token_interactive(scopes=[scope]) # Save cache after acquisition if cache.has_state_changed: @@ -33,6 +59,6 @@ cache_file.write(cache.serialize()) if (result): - print("Access token acquired:", result["access_token"]) + print(result["access_token"]) else: print("Failed to acquire token:", result.get("error"), result.get("error_description"))