diff --git a/packages/agentmesh-integrations/sb-runtime-skill/README.md b/packages/agentmesh-integrations/sb-runtime-skill/README.md new file mode 100644 index 000000000..8582ef01c --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/README.md @@ -0,0 +1,137 @@ +# sb-runtime + AgentMesh Governance Skill + +**Public Preview.** Governance skill that evaluates policy and emits Ed25519-signed decision receipts in the [Veritas Acta receipt format](https://datatracker.ietf.org/doc/draft-farley-acta-signed-receipts/). Parallel to `openshell-skill`: same policy contract, drop-in replacement at the governance layer, with receipts added. + +> sb-runtime is one implementation of the Veritas Acta receipt format. This skill is its AgentMesh integration point. See [docs/integrations/sb-runtime.md](../../../docs/integrations/sb-runtime.md) for the architecture overview and for guidance on composing with [nono](https://github.com/always-further/nono) as the Linux sandbox primitive. + +## Install + +```bash +pip install sb-runtime-agentmesh +``` + +## What the skill adds over `openshell-skill` + +| Capability | openshell-skill | sb-runtime-skill | +|---|:---:|:---:| +| YAML policy loading | yes | yes (same schema) | +| Trust score tracking | yes | yes | +| Audit log | yes | yes | +| Ed25519-signed decision receipts | no | yes | +| Receipt chain linkage (`previousReceiptHash`) | no | yes | +| Policy digest pinned into receipts | no | yes (`sha256:...`) | +| Sandbox backend recorded in receipt | no | yes (`nono` \| `openshell` \| `sb_runtime_builtin` \| `none`) | +| Offline verification (`@veritasacta/verify`) | no | yes | + +## Usage + +### As a library + +```python +from pathlib import Path +from sb_runtime_agentmesh import GovernanceSkill, SandboxBackend + +skill = GovernanceSkill( + policy_dir=Path("./policies"), + sandbox_backend=SandboxBackend.NONO, # wrap the agent in a nono sandbox + ring=2, # sb-runtime does policy + receipts; nono does the sandbox +) + +decision = skill.check_policy( + action="shell:curl https://api.github.com/repos/org/repo/issues", + context={"agent_did": "did:agent:researcher-1"}, +) + +if decision.allowed: + # Ring 2: execute inside the nono capability set + ... + +# The signed receipt is on decision.receipt - Veritas Acta format. +import json +print(json.dumps(decision.receipt, indent=2)) +``` + +### As a CLI + +```bash +# Generate an operator key once +python -c "from sb_runtime_agentmesh import Signer; print(Signer.generate().private_pem().decode())" > operator.pem + +# Evaluate policy, sign receipt, write to disk +sb-runtime-governance check-policy \ + --action shell:python \ + --policy-dir ./policies \ + --sandbox-backend nono \ + --ring 2 \ + --key operator.pem \ + --receipts-dir ./receipts + +# Verify a written receipt +sb-runtime-governance verify ./receipts/20260419T133001123456Z.json \ + --public-key operator-public.pem +``` + +## Deployment modes + +### Standalone sb-runtime (Ring 3) + +One binary owns everything: Cedar evaluation, Landlock + seccomp sandbox, receipt signing. Set `sandbox_backend=SandboxBackend.SB_RUNTIME_BUILTIN` and `ring=3`. + +### sb-runtime + nono composition (recommended on Linux) + +nono owns the sandbox layer; this skill contributes only Cedar + signed receipts. Set `sandbox_backend=SandboxBackend.NONO` and `ring=2`. Wrap your agent process in nono externally: + +```bash +nono run --policy ./nono-capabilities.yaml -- \ + python -m your_agent # calls the skill in-process +``` + +The receipt's `payload.sandbox_backend == "nono"` field makes the composition visible to auditors. + +### sb-runtime + OpenShell composition + +OpenShell owns the container boundary; this skill contributes Cedar + receipts. Set `sandbox_backend=SandboxBackend.OPENSHELL` and `ring=2`. + +## Receipt format + +Every decision produces an envelope of the form: + +```json +{ + "payload": { + "type": "sb-runtime:decision", + "agent_id": "did:agent:researcher-1", + "action": "shell:python", + "decision": "allow", + "ring": 2, + "sandbox_backend": "nono", + "policy_id": "allow-shell", + "policy_digest": "sha256:...", + "trust_score": 1.0, + "issuer_id": "sb:issuer:...", + "issued_at": "2026-04-19T13:30:01.123Z", + "previousReceiptHash": "..." + }, + "signature": { + "alg": "EdDSA", + "kid": "...", + "sig": "..." + } +} +``` + +The canonical form is JCS-RFC 8785 with ASCII-only keys per [AIP-0001](https://github.com/VeritasActa/Acta/blob/main/specs/aip/AIP-0001.md). Verification does not depend on this skill, sb-runtime, or AgentMesh: + +```bash +npx @veritasacta/verify receipt.json --key operator-public.pem +``` + +## Spec alignment + +- [draft-farley-acta-signed-receipts-02](https://datatracker.ietf.org/doc/draft-farley-acta-signed-receipts/) +- [AIP-0001](https://github.com/VeritasActa/Acta) (receipt format, ASCII-only JCS) +- [VeritasActa/agt-integration-profile](https://github.com/VeritasActa/agt-integration-profile) (AGT to Veritas Acta normative field mapping) + +## License + +MIT. See `LICENSE` at the repo root. diff --git a/packages/agentmesh-integrations/sb-runtime-skill/pyproject.toml b/packages/agentmesh-integrations/sb-runtime-skill/pyproject.toml new file mode 100644 index 000000000..056224859 --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "sb_runtime_agentmesh" +version = "0.1.0" +description = "Public Preview - sb-runtime governance skill with signed receipts (Veritas Acta format)" +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.10" +authors = [{ name = "Tom Farley (ScopeBlind)", email = "tommy@scopeblind.com" }] +dependencies = [ + "pyyaml>=6.0,<7.0", + "cryptography>=41.0,<47.0", +] + +[project.optional-dependencies] +agentmesh = ["agentmesh-platform>=3.0,<4.0"] +dev = ["pytest>=7.0"] + +[project.urls] +Homepage = "https://github.com/microsoft/agent-governance-toolkit" +Documentation = "https://github.com/microsoft/agent-governance-toolkit/blob/main/docs/integrations/sb-runtime.md" +"sb-runtime source" = "https://github.com/ScopeBlind/sb-runtime" +"Veritas Acta draft" = "https://datatracker.ietf.org/doc/draft-farley-acta-signed-receipts/" + +[project.scripts] +sb-runtime-governance = "sb_runtime_agentmesh.cli:main" + +[tool.hatch.build.targets.wheel] +packages = ["sb_runtime_agentmesh"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/__init__.py b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/__init__.py new file mode 100644 index 000000000..4c9ce7aee --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2026 Tom Farley (ScopeBlind). +# Licensed under the MIT License. +"""sb-runtime governance skill: policy evaluation + Ed25519-signed decision receipts (Veritas Acta format).""" + +from sb_runtime_agentmesh.skill import GovernanceSkill, PolicyDecision, SandboxBackend +from sb_runtime_agentmesh.receipts import Signer, sign_receipt, verify_receipt + +__all__ = [ + "GovernanceSkill", + "PolicyDecision", + "SandboxBackend", + "Signer", + "sign_receipt", + "verify_receipt", +] +__version__ = "0.1.0" diff --git a/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/cli.py b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/cli.py new file mode 100644 index 000000000..c0fe6b06f --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/cli.py @@ -0,0 +1,122 @@ +# Copyright (c) 2026 Tom Farley (ScopeBlind). +# Licensed under the MIT License. +"""CLI entry point for the sb-runtime governance skill.""" + +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +from sb_runtime_agentmesh.receipts import Signer, verify_receipt +from sb_runtime_agentmesh.skill import GovernanceSkill, SandboxBackend + + +def _load_signer(key_path: Path | None) -> Signer: + if key_path is None: + return Signer.generate() + return Signer.from_pem(key_path.read_bytes()) + + +def main(argv=None) -> int: + parser = argparse.ArgumentParser(prog="sb-runtime-governance") + sub = parser.add_subparsers(dest="command") + + cp = sub.add_parser( + "check-policy", + help="Evaluate policy and emit a signed decision receipt", + ) + cp.add_argument("--action", required=True) + cp.add_argument("--context", default="{}") + cp.add_argument("--policy-dir", required=True) + cp.add_argument( + "--sandbox-backend", + choices=[b.value for b in SandboxBackend], + default=SandboxBackend.SB_RUNTIME_BUILTIN.value, + ) + cp.add_argument("--ring", type=int, default=3) + cp.add_argument("--key", type=Path, default=None, help="Operator Ed25519 key (PEM)") + cp.add_argument("--no-sign", action="store_true", help="Skip receipt signing") + cp.add_argument( + "--receipts-dir", + type=Path, + default=None, + help="If set, write the signed receipt to /.json", + ) + + ts = sub.add_parser("trust-score") + ts.add_argument("--agent-did", required=True) + + vf = sub.add_parser("verify", help="Verify a receipt file against an Ed25519 public key (PEM)") + vf.add_argument("receipt", type=Path) + vf.add_argument("--public-key", type=Path, required=True) + + pk = sub.add_parser("public-key", help="Print operator public key (PEM)") + pk.add_argument("--key", type=Path, default=None) + + args = parser.parse_args(argv) + + if args.command == "check-policy": + signer = _load_signer(args.key) + skill = GovernanceSkill( + policy_dir=Path(args.policy_dir), + signer=signer, + sandbox_backend=SandboxBackend(args.sandbox_backend), + ring=args.ring, + ) + ctx = json.loads(args.context) if args.context else {} + decision = skill.check_policy(args.action, ctx, sign=not args.no_sign) + output = { + "allowed": decision.allowed, + "action": decision.action, + "reason": decision.reason, + "policy_name": decision.policy_name, + "policy_digest": decision.policy_digest, + "ring": decision.ring, + "sandbox_backend": decision.sandbox_backend.value, + "receipt": decision.receipt, + } + if args.receipts_dir and decision.receipt is not None: + args.receipts_dir.mkdir(parents=True, exist_ok=True) + from datetime import datetime, timezone + + stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") + out_path = args.receipts_dir / f"{stamp}.json" + out_path.write_text(json.dumps(decision.receipt, indent=2, sort_keys=True)) + output["receipt_path"] = str(out_path) + print(json.dumps(output, indent=2, sort_keys=True)) + return 0 if decision.allowed else 1 + + if args.command == "trust-score": + skill = GovernanceSkill() + print( + json.dumps( + { + "agent_did": args.agent_did, + "trust_score": skill.get_trust_score(args.agent_did), + } + ) + ) + return 0 + + if args.command == "verify": + from cryptography.hazmat.primitives import serialization + + pub = serialization.load_pem_public_key(args.public_key.read_bytes()) + envelope = json.loads(args.receipt.read_text()) + ok = verify_receipt(envelope, pub) + print(json.dumps({"verified": ok, "kid": envelope.get("signature", {}).get("kid")})) + return 0 if ok else 1 + + if args.command == "public-key": + signer = _load_signer(args.key) + sys.stdout.write(signer.public_pem().decode("ascii")) + return 0 + + parser.print_help() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/receipts.py b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/receipts.py new file mode 100644 index 000000000..4fd7246cb --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/receipts.py @@ -0,0 +1,189 @@ +# Copyright (c) 2026 Tom Farley (ScopeBlind). +# Licensed under the MIT License. +"""Ed25519 signing + JCS canonical JSON for sb-runtime decision receipts. + +Produces receipts in the format specified by +draft-farley-acta-signed-receipts (Veritas Acta). The output is +bit-compatible with receipts emitted by the sb-runtime Rust binary and +verifies with the @veritasacta/verify reference CLI. +""" + +from __future__ import annotations + +import base64 +import hashlib +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Mapping, Optional + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + + +def _canonicalize(obj: Any) -> bytes: + """Produce a JCS-conformant canonical byte string. + + Conforms to RFC 8785 for the payload shapes used by Veritas Acta + receipts (string keys, string/int/float/bool/null values, nested + objects and arrays). ASCII-only key enforcement per AIP-0001 is + applied at the object level. + """ + _assert_ascii_keys(obj) + return json.dumps( + obj, + sort_keys=True, + ensure_ascii=True, + separators=(",", ":"), + allow_nan=False, + ).encode("utf-8") + + +def _assert_ascii_keys(obj: Any, path: str = "$") -> None: + if isinstance(obj, Mapping): + for key in obj.keys(): + if not isinstance(key, str): + raise ValueError(f"Non-string key at {path}: {key!r}") + try: + key.encode("ascii") + except UnicodeEncodeError as exc: + raise ValueError( + f"Non-ASCII key at {path}.{key!r} violates AIP-0001" + ) from exc + _assert_ascii_keys(obj[key], f"{path}.{key}") + elif isinstance(obj, list): + for i, item in enumerate(obj): + _assert_ascii_keys(item, f"{path}[{i}]") + + +def _b64url(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + + +def _b64url_decode(s: str) -> bytes: + pad = 4 - (len(s) % 4) + if pad != 4: + s = s + ("=" * pad) + return base64.urlsafe_b64decode(s) + + +def _jwk_thumbprint(public_key: Ed25519PublicKey) -> str: + raw = public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + jwk = {"crv": "Ed25519", "kty": "OKP", "x": _b64url(raw)} + digest = hashlib.sha256(_canonicalize(jwk)).digest() + return _b64url(digest) + + +@dataclass +class Signer: + """Thin wrapper around an Ed25519 private key. + + Use ``Signer.generate()`` for ephemeral keys in tests or + ``Signer.from_pem(pem_bytes)`` to load an operator key. + """ + + private_key: Ed25519PrivateKey + kid: str + + @classmethod + def generate(cls, kid: Optional[str] = None) -> "Signer": + pk = Ed25519PrivateKey.generate() + resolved_kid = kid or _jwk_thumbprint(pk.public_key()) + return cls(private_key=pk, kid=resolved_kid) + + @classmethod + def from_pem(cls, pem: bytes, kid: Optional[str] = None) -> "Signer": + pk = serialization.load_pem_private_key(pem, password=None) + if not isinstance(pk, Ed25519PrivateKey): + raise ValueError("PEM must contain an Ed25519 private key") + resolved_kid = kid or _jwk_thumbprint(pk.public_key()) + return cls(private_key=pk, kid=resolved_kid) + + def public_pem(self) -> bytes: + return self.private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + def private_pem(self) -> bytes: + return self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + +def sign_receipt( + payload: Mapping[str, Any], + signer: Signer, + previous_receipt_hash: Optional[str] = None, +) -> dict: + """Sign a receipt payload and return the full signed envelope. + + The envelope matches draft-farley-acta-signed-receipts section 2: + ``{payload: {...}, signature: {alg, kid, sig}}``. If + ``previous_receipt_hash`` is provided it is inserted into the + payload before signing, establishing chain linkage. + """ + final_payload = dict(payload) + final_payload.setdefault( + "issued_at", datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") + ) + if previous_receipt_hash is not None: + final_payload["previousReceiptHash"] = previous_receipt_hash + + canonical = _canonicalize(final_payload) + signature = signer.private_key.sign(canonical) + + return { + "payload": final_payload, + "signature": { + "alg": "EdDSA", + "kid": signer.kid, + "sig": _b64url(signature), + }, + } + + +def verify_receipt(envelope: Mapping[str, Any], public_key: Ed25519PublicKey) -> bool: + """Verify a signed receipt envelope against the provided public key. + + Returns True on success; False on any signature or structural + failure. Raises ValueError for malformed envelopes (missing fields). + """ + if not isinstance(envelope, Mapping): + raise ValueError("Envelope must be a mapping") + payload = envelope.get("payload") + signature = envelope.get("signature") + if payload is None or signature is None: + raise ValueError("Envelope must contain payload and signature") + alg = signature.get("alg") + if alg != "EdDSA": + return False + sig_b64 = signature.get("sig") + if not isinstance(sig_b64, str): + return False + + canonical = _canonicalize(payload) + try: + public_key.verify(_b64url_decode(sig_b64), canonical) + except InvalidSignature: + return False + return True + + +def receipt_hash(envelope: Mapping[str, Any]) -> str: + """Compute the chain-linkage hash of a signed receipt. + + Matches the definition used by @veritasacta/verify: SHA-256 of the + canonical form of the full envelope, base64url-encoded. + """ + digest = hashlib.sha256(_canonicalize(envelope)).digest() + return _b64url(digest) diff --git a/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/skill.py b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/skill.py new file mode 100644 index 000000000..f6f19691b --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/sb_runtime_agentmesh/skill.py @@ -0,0 +1,314 @@ +# Copyright (c) 2026 Tom Farley (ScopeBlind). +# Licensed under the MIT License. +"""Governance skill for sb-runtime: policy evaluation + Ed25519-signed receipts.""" + +from __future__ import annotations + +import hashlib +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from typing import Any, Optional + +import yaml + +from sb_runtime_agentmesh.receipts import Signer, receipt_hash, sign_receipt + + +class SandboxBackend(str, Enum): + """Sandbox layer that wraps the process sb-runtime governs. + + ``none`` + No sandbox wrapping. Only appropriate for test harnesses. + ``sb_runtime_builtin`` + sb-runtime's own Landlock + seccomp (Ring 3). Suitable for + single-binary deployments where sb-runtime owns the entire + security boundary. + ``nono`` + A nono capability set wraps the agent process, and sb-runtime + runs in Ring 2 (policy + receipts only). Recommended for Linux + operators who treat the sandbox layer as separate from the + receipts layer. See docs/integrations/sb-runtime.md section + "Composing sb-runtime with nono". + ``openshell`` + OpenShell container wraps the agent process; sb-runtime runs + in Ring 2. Recommended for container-based deployments. + """ + + NONE = "none" + SB_RUNTIME_BUILTIN = "sb_runtime_builtin" + NONO = "nono" + OPENSHELL = "openshell" + + +@dataclass +class PolicyDecision: + allowed: bool + action: str + reason: str + policy_name: Optional[str] = None + policy_digest: Optional[str] = None + trust_score: float = 0.0 + ring: int = 2 + sandbox_backend: SandboxBackend = SandboxBackend.NONE + receipt: Optional[dict] = None + + +@dataclass +class _PolicyRule: + name: str + field: str + operator: str + value: Any + action: str + priority: int = 0 + message: str = "" + + +@dataclass +class _PolicyDigestBundle: + digest: str = "" + ruleset: list[dict] = field(default_factory=list) + + +class GovernanceSkill: + """Policy + receipts skill that mirrors the OpenShell skill contract. + + Accepts the same policy YAML shape as ``openshell_agentmesh``. + Adds: + + - Ed25519-signed decision receipts (Veritas Acta format) attached + to each ``PolicyDecision``. + - Chain linkage via ``previousReceiptHash`` across successive + decisions. + - Sandbox-backend field recording which layer wrapped the process + (``sb_runtime_builtin`` | ``nono`` | ``openshell`` | ``none``). + """ + + def __init__( + self, + policy_dir: Optional[Path] = None, + trust_threshold: float = 0.5, + signer: Optional[Signer] = None, + sandbox_backend: SandboxBackend = SandboxBackend.SB_RUNTIME_BUILTIN, + ring: int = 3, + issuer_id: Optional[str] = None, + ) -> None: + self._rules: list[_PolicyRule] = [] + self._trust_scores: dict[str, float] = {} + self._audit_log: list[dict] = [] + self._trust_threshold = trust_threshold + self._policy_bundle = _PolicyDigestBundle() + self._signer = signer or Signer.generate() + self._sandbox_backend = SandboxBackend(sandbox_backend) + self._ring = int(ring) + self._issuer_id = issuer_id or f"sb:issuer:{self._signer.kid[:12]}" + self._previous_receipt_hash: Optional[str] = None + if policy_dir: + self.load_policies(policy_dir) + + # ------------------------------------------------------------------ + # Policy loading + # ------------------------------------------------------------------ + + def load_policies(self, policy_dir: Path) -> int: + policy_dir = Path(policy_dir) + if not policy_dir.is_dir(): + raise FileNotFoundError(f"Policy directory not found: {policy_dir}") + self._rules.clear() + ruleset = [] + for yaml_file in sorted(policy_dir.glob("*.yaml")): + with open(yaml_file, encoding="utf-8") as f: + doc = yaml.safe_load(f) + if not doc: + continue + for rd in doc.get("rules", []): + cond = rd.get("condition", {}) + rule = _PolicyRule( + name=rd.get("name", yaml_file.stem), + field=cond.get("field", "action"), + operator=cond.get("operator", "equals"), + value=cond.get("value", ""), + action=rd.get("action", "deny"), + priority=rd.get("priority", 0), + message=rd.get("message", ""), + ) + self._rules.append(rule) + ruleset.append( + { + "name": rule.name, + "field": rule.field, + "operator": rule.operator, + "value": rule.value, + "action": rule.action, + "priority": rule.priority, + } + ) + self._rules.sort(key=lambda r: r.priority, reverse=True) + # Policy digest binds the entire active ruleset into receipts + canonical_ruleset = sorted( + ruleset, + key=lambda r: (-int(r["priority"]), str(r["name"])), + ) + digest_material = repr(canonical_ruleset).encode("utf-8") + self._policy_bundle = _PolicyDigestBundle( + digest="sha256:" + hashlib.sha256(digest_material).hexdigest(), + ruleset=canonical_ruleset, + ) + return len(self._rules) + + # ------------------------------------------------------------------ + # Policy evaluation + receipt signing + # ------------------------------------------------------------------ + + def check_policy( + self, + action: str, + context: Optional[dict] = None, + *, + sign: bool = True, + ) -> PolicyDecision: + context = context or {} + agent_did = context.get("agent_did", "unknown") + trust = self.get_trust_score(agent_did) + + matched: Optional[_PolicyRule] = None + for rule in self._rules: + target = action if rule.field == "action" else context.get(rule.field, "") + if self._match(rule.operator, target, rule.value): + matched = rule + break + + if matched is not None: + allowed = matched.action == "allow" + reason = matched.message or ( + ("Allowed" if allowed else "Denied") + " by rule: " + matched.name + ) + decision_outcome = "allow" if allowed else "deny" + policy_name = matched.name + else: + allowed = False + reason = "No matching rule - default deny" + decision_outcome = "deny" + policy_name = None + + decision = PolicyDecision( + allowed=allowed, + action=action, + reason=reason, + policy_name=policy_name, + policy_digest=self._policy_bundle.digest or None, + trust_score=trust, + ring=self._ring, + sandbox_backend=self._sandbox_backend, + ) + + if sign: + decision.receipt = self._sign_decision( + decision=decision, + agent_did=agent_did, + decision_outcome=decision_outcome, + ) + + self.log_action(action, decision_outcome, agent_did, context) + return decision + + def _sign_decision( + self, + decision: PolicyDecision, + agent_did: str, + decision_outcome: str, + ) -> dict: + payload = { + "type": "sb-runtime:decision", + "agent_id": agent_did, + "action": decision.action, + "decision": decision_outcome, + "ring": decision.ring, + "sandbox_backend": decision.sandbox_backend.value, + "policy_id": decision.policy_name or "default_deny", + "policy_digest": decision.policy_digest or "", + "trust_score": round(decision.trust_score, 6), + "issuer_id": self._issuer_id, + } + envelope = sign_receipt( + payload=payload, + signer=self._signer, + previous_receipt_hash=self._previous_receipt_hash, + ) + self._previous_receipt_hash = receipt_hash(envelope) + return envelope + + # ------------------------------------------------------------------ + # Trust + audit + # ------------------------------------------------------------------ + + def get_trust_score(self, agent_did: str) -> float: + return self._trust_scores.get(agent_did, 1.0) + + def adjust_trust(self, agent_did: str, delta: float) -> float: + current = self.get_trust_score(agent_did) + new_score = max(0.0, min(1.0, current + delta)) + self._trust_scores[agent_did] = new_score + return new_score + + def log_action( + self, + action: str, + decision: str, + agent_did: str = "unknown", + context: Optional[dict] = None, + ) -> dict: + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "action": action, + "decision": decision, + "agent_did": agent_did, + "trust_score": self.get_trust_score(agent_did), + "context": context or {}, + } + self._audit_log.append(entry) + return entry + + def get_audit_log(self, limit: int = 50) -> list[dict]: + return self._audit_log[-limit:] + + # ------------------------------------------------------------------ + # Public accessors + # ------------------------------------------------------------------ + + @property + def signer(self) -> Signer: + return self._signer + + @property + def policy_digest(self) -> str: + return self._policy_bundle.digest + + @property + def sandbox_backend(self) -> SandboxBackend: + return self._sandbox_backend + + @property + def ring(self) -> int: + return self._ring + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + @staticmethod + def _match(operator: str, target: Any, value: Any) -> bool: + if operator == "equals": + return target == value + if operator == "starts_with": + return isinstance(target, str) and target.startswith(str(value)) + if operator == "contains": + return str(value) in str(target) + if operator == "matches": + return isinstance(target, str) and bool(re.search(str(value), target)) + if operator == "in": + return target in (value if isinstance(value, list) else [value]) + return False diff --git a/packages/agentmesh-integrations/sb-runtime-skill/tests/test_skill.py b/packages/agentmesh-integrations/sb-runtime-skill/tests/test_skill.py new file mode 100644 index 000000000..2bf387878 --- /dev/null +++ b/packages/agentmesh-integrations/sb-runtime-skill/tests/test_skill.py @@ -0,0 +1,215 @@ +# Copyright (c) 2026 Tom Farley (ScopeBlind). +# Licensed under the MIT License. +import pytest +import yaml +from cryptography.hazmat.primitives import serialization + +from sb_runtime_agentmesh.receipts import Signer, receipt_hash, verify_receipt +from sb_runtime_agentmesh.skill import GovernanceSkill, SandboxBackend + +SAMPLE = { + "apiVersion": "governance.toolkit/v1", + "rules": [ + { + "name": "allow-read", + "condition": {"field": "action", "operator": "starts_with", "value": "file:read"}, + "action": "allow", + "priority": 90, + }, + { + "name": "allow-shell", + "condition": {"field": "action", "operator": "in", "value": ["shell:ls", "shell:python", "shell:git"]}, + "action": "allow", + "priority": 80, + }, + { + "name": "block-danger", + "condition": {"field": "action", "operator": "matches", "value": "shell:(rm|dd|curl)"}, + "action": "deny", + "priority": 100, + "message": "Blocked", + }, + ], +} + + +@pytest.fixture +def policy_dir(tmp_path): + with open(tmp_path / "p.yaml", "w", encoding="utf-8") as f: + yaml.dump(SAMPLE, f) + return tmp_path + + +class TestPolicyContract: + """Mirrors the openshell-skill test suite so the contract is identical.""" + + def test_allow_read(self, policy_dir): + assert GovernanceSkill(policy_dir=policy_dir).check_policy("file:read:/workspace/main.py").allowed + + def test_allow_shell(self, policy_dir): + assert GovernanceSkill(policy_dir=policy_dir).check_policy("shell:python").allowed + + def test_deny(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("shell:rm -rf /") + assert not decision.allowed + + def test_default_deny(self, policy_dir): + assert not GovernanceSkill(policy_dir=policy_dir).check_policy("unknown").allowed + + def test_trust(self): + skill = GovernanceSkill() + assert skill.get_trust_score("x") == 1.0 + skill.adjust_trust("x", -0.3) + assert skill.get_trust_score("x") == pytest.approx(0.7) + + def test_audit(self, policy_dir): + skill = GovernanceSkill(policy_dir=policy_dir) + skill.check_policy("file:read:/t") + skill.check_policy("shell:rm /") + log = skill.get_audit_log() + assert len(log) == 2 + assert log[0]["decision"] == "allow" + + def test_load(self, policy_dir): + assert GovernanceSkill().load_policies(policy_dir) == 3 + + def test_missing(self): + with pytest.raises(FileNotFoundError): + GovernanceSkill(policy_dir="/nope") # type: ignore[arg-type] + + def test_priority(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("shell:rm") + assert not decision.allowed + assert decision.policy_name == "block-danger" + + +class TestReceipts: + """Receipt signing is the material addition over openshell-skill.""" + + def test_receipt_attached_on_allow(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("shell:python") + assert decision.receipt is not None + payload = decision.receipt["payload"] + assert payload["decision"] == "allow" + assert payload["action"] == "shell:python" + assert payload["type"] == "sb-runtime:decision" + assert "policy_digest" in payload + assert payload["policy_digest"].startswith("sha256:") + + def test_receipt_attached_on_deny(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("shell:rm /") + assert decision.receipt is not None + assert decision.receipt["payload"]["decision"] == "deny" + # The denial is the receipt - deny decisions MUST produce a receipt too, + # otherwise there is no tamper-evident proof that the block occurred. + + def test_receipt_verifies_with_public_key(self, policy_dir): + skill = GovernanceSkill(policy_dir=policy_dir) + decision = skill.check_policy("file:read:/x") + pub_pem = skill.signer.public_pem() + pub = serialization.load_pem_public_key(pub_pem) + assert verify_receipt(decision.receipt, pub) + + def test_receipt_tampering_fails_verification(self, policy_dir): + skill = GovernanceSkill(policy_dir=policy_dir) + decision = skill.check_policy("file:read:/x") + pub = serialization.load_pem_public_key(skill.signer.public_pem()) + # Flip a field + tampered = { + "payload": {**decision.receipt["payload"], "decision": "allow_forged"}, + "signature": decision.receipt["signature"], + } + assert not verify_receipt(tampered, pub) + + def test_chain_linkage(self, policy_dir): + """Successive decisions link via previousReceiptHash.""" + skill = GovernanceSkill(policy_dir=policy_dir) + first = skill.check_policy("file:read:/a") + second = skill.check_policy("file:read:/b") + assert "previousReceiptHash" not in first.receipt["payload"] + assert second.receipt["payload"]["previousReceiptHash"] == receipt_hash(first.receipt) + + def test_no_sign_flag_skips_receipt(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("file:read:/x", sign=False) + assert decision.receipt is None + + +class TestSandboxBackend: + """Receipt payload records which sandbox layer wrapped the process.""" + + def test_default_is_sb_runtime_builtin(self, policy_dir): + decision = GovernanceSkill(policy_dir=policy_dir).check_policy("file:read:/x") + assert decision.sandbox_backend == SandboxBackend.SB_RUNTIME_BUILTIN + assert decision.receipt["payload"]["sandbox_backend"] == "sb_runtime_builtin" + + def test_nono_backend_sets_ring_2(self, policy_dir): + skill = GovernanceSkill( + policy_dir=policy_dir, + sandbox_backend=SandboxBackend.NONO, + ring=2, + ) + decision = skill.check_policy("file:read:/x") + assert decision.sandbox_backend == SandboxBackend.NONO + assert decision.ring == 2 + assert decision.receipt["payload"]["sandbox_backend"] == "nono" + assert decision.receipt["payload"]["ring"] == 2 + + def test_openshell_backend(self, policy_dir): + skill = GovernanceSkill( + policy_dir=policy_dir, + sandbox_backend=SandboxBackend.OPENSHELL, + ring=2, + ) + decision = skill.check_policy("file:read:/x") + assert decision.receipt["payload"]["sandbox_backend"] == "openshell" + + def test_sandbox_backend_covered_by_signature(self, policy_dir): + """Forging the sandbox backend after signing breaks verification.""" + skill = GovernanceSkill(policy_dir=policy_dir, sandbox_backend=SandboxBackend.NONO, ring=2) + decision = skill.check_policy("file:read:/x") + pub = serialization.load_pem_public_key(skill.signer.public_pem()) + forged = { + "payload": {**decision.receipt["payload"], "sandbox_backend": "sb_runtime_builtin"}, + "signature": decision.receipt["signature"], + } + assert not verify_receipt(forged, pub) + + +class TestPolicyDigest: + def test_policy_digest_deterministic(self, tmp_path): + yaml.safe_dump(SAMPLE, (tmp_path / "p.yaml").open("w")) + one = GovernanceSkill(policy_dir=tmp_path).policy_digest + two = GovernanceSkill(policy_dir=tmp_path).policy_digest + assert one == two + assert one.startswith("sha256:") + + def test_policy_digest_changes_when_rules_change(self, tmp_path): + yaml.safe_dump(SAMPLE, (tmp_path / "p.yaml").open("w")) + before = GovernanceSkill(policy_dir=tmp_path).policy_digest + + modified = { + "apiVersion": "governance.toolkit/v1", + "rules": SAMPLE["rules"] + [ + { + "name": "extra-rule", + "condition": {"field": "action", "operator": "equals", "value": "file:write"}, + "action": "deny", + "priority": 50, + } + ], + } + (tmp_path / "p.yaml").write_text(yaml.safe_dump(modified)) + after = GovernanceSkill(policy_dir=tmp_path).policy_digest + assert before != after + + +class TestSignerKeyLoading: + def test_generate_produces_deterministic_kid(self, tmp_path): + signer = Signer.generate() + pem = signer.private_pem() + reloaded = Signer.from_pem(pem) + assert signer.kid == reloaded.kid + + def test_explicit_kid_overrides_thumbprint(self): + signer = Signer.generate(kid="sb:issuer:test-fixture") + assert signer.kid == "sb:issuer:test-fixture"