Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 65 additions & 9 deletions hermes_cli/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,12 +985,27 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:
def _save_auth_store(auth_store: Dict[str, Any]) -> Path:
auth_file = _auth_file_path()
auth_file.parent.mkdir(parents=True, exist_ok=True)
# Tighten parent dir to 0o700 so siblings can't traverse to creds.
# No-op on Windows (POSIX mode bits not enforced); ignore failures.
try:
os.chmod(auth_file.parent, 0o700)
except OSError:
pass
auth_store["version"] = AUTH_STORE_VERSION
auth_store["updated_at"] = datetime.now(timezone.utc).isoformat()
payload = json.dumps(auth_store, indent=2) + "\n"
tmp_path = auth_file.with_name(f"{auth_file.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
try:
with tmp_path.open("w", encoding="utf-8") as handle:
# Create with 0o600 atomically via os.open(O_EXCL) + fdopen to close
# the TOCTOU window where default umask (often 0o644) briefly exposed
# OAuth tokens to other local users between open() and chmod().
# Mirrors agent/google_oauth.py (#19673) and tools/mcp_oauth.py (#21148).
fd = os.open(
str(tmp_path),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(payload)
handle.flush()
os.fsync(handle.fileno())
Expand Down Expand Up @@ -1554,10 +1569,33 @@ def _read_qwen_cli_tokens() -> Dict[str, Any]:
def _save_qwen_cli_tokens(tokens: Dict[str, Any]) -> Path:
auth_path = _qwen_cli_auth_path()
auth_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = auth_path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(tokens, indent=2, sort_keys=True) + "\n", encoding="utf-8")
os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR)
tmp_path.replace(auth_path)
try:
os.chmod(auth_path.parent, 0o700)
except OSError:
pass
# Per-process random temp suffix avoids collisions between concurrent
# writers and stale leftovers from a crashed prior write.
tmp_path = auth_path.with_name(f"{auth_path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
# Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU
# window where write_text() + post-write chmod briefly exposed tokens
# at process umask (typically 0o644). See #19673, #21148.
fd = os.open(
str(tmp_path),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(json.dumps(tokens, indent=2, sort_keys=True) + "\n")
fh.flush()
os.fsync(fh.fileno())
atomic_replace(tmp_path, auth_path)
finally:
try:
if tmp_path.exists():
tmp_path.unlink()
except OSError:
pass
return auth_path


Expand Down Expand Up @@ -2938,13 +2976,31 @@ def _write_shared_nous_state(state: Dict[str, Any]) -> None:
with _nous_shared_store_lock():
path = _nous_shared_store_path()
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(shared, indent=2, sort_keys=True))
try:
os.chmod(tmp, 0o600)
os.chmod(path.parent, 0o700)
except OSError:
pass
os.replace(tmp, path)
tmp = path.with_name(f"{path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
# Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU
# window where write_text() + post-write chmod briefly exposed Nous
# refresh_token at process umask. See #19673, #21148.
fd = os.open(
str(tmp),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(json.dumps(shared, indent=2, sort_keys=True))
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, path)
finally:
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
_oauth_trace(
"nous_shared_store_written",
path=str(path),
Expand Down
198 changes: 198 additions & 0 deletions tests/hermes_cli/test_auth_toctou_file_modes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""Regression tests for TOCTOU-safe credential file writers in ``hermes_cli.auth``.

Background
==========
The three writers below used to create a temp file via ``Path.write_text`` /
``Path.open('w')`` and only ``chmod``'d it to ``0o600`` afterward. Between
create and chmod the file existed at the process umask (typically ``0o644``),
briefly exposing OAuth tokens to other local users on multi-user hosts. The
fix switches them to ``os.open(O_EXCL, mode=0o600)`` + ``os.fdopen`` +
``fsync`` so the file is atomic at ``0o600`` on creation. Mirrors the fixes
shipped for ``agent/google_oauth.py`` (#19673) and ``tools/mcp_oauth.py``
(#21148).

These tests stay green only while the token file and its parent directory
end up at ``0o600`` / ``0o700`` after every write. POSIX-only — the mode-bit
enforcement does not exist on Windows.
"""

from __future__ import annotations

import json
import os
import stat
import sys
from unittest.mock import patch

import pytest


pytestmark = pytest.mark.skipif(
sys.platform.startswith("win"),
reason="POSIX mode bits not enforced on Windows",
)


# ---------------------------------------------------------------------------
# _save_auth_store (~/.hermes/auth.json — every native OAuth provider)
# ---------------------------------------------------------------------------


def test_save_auth_store_writes_0o600_with_0o700_parent(tmp_path, monkeypatch):
"""``_save_auth_store`` must land ``auth.json`` at 0o600 and parent at 0o700."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
old_umask = os.umask(0o022) # make the race observable if it regresses
try:
from hermes_cli import auth as auth_mod

auth_store = {
"version": auth_mod.AUTH_STORE_VERSION,
"providers": {"openai-codex": {"tokens": {"access_token": "secret-x"}}},
"active_provider": "openai-codex",
}
auth_path = auth_mod._save_auth_store(auth_store)
finally:
os.umask(old_umask)

mode = stat.S_IMODE(auth_path.stat().st_mode)
parent_mode = stat.S_IMODE(auth_path.parent.stat().st_mode)

assert mode == 0o600, (
f"auth.json mode 0o{mode:o} != 0o600 — TOCTOU race regressed"
)
assert parent_mode == 0o700, (
f"auth.json parent dir mode 0o{parent_mode:o} != 0o700 — siblings can traverse"
)

# Content survived the rewrite
data = json.loads(auth_path.read_text())
assert data["providers"]["openai-codex"]["tokens"]["access_token"] == "secret-x"


# ---------------------------------------------------------------------------
# _save_qwen_cli_tokens (Qwen CLI OAuth tokens)
# ---------------------------------------------------------------------------


def test_save_qwen_cli_tokens_writes_0o600_with_0o700_parent(tmp_path, monkeypatch):
"""``_save_qwen_cli_tokens`` must land the token file at 0o600 and parent at 0o700."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# The Qwen CLI auth path lives under $HOME/.qwen by default — isolate it.
monkeypatch.setenv("HOME", str(tmp_path))
old_umask = os.umask(0o022)
try:
from hermes_cli import auth as auth_mod

tokens = {
"access_token": "qwen-secret",
"refresh_token": "qwen-refresh",
"token_type": "Bearer",
"expiry_date": 123,
}
auth_path = auth_mod._save_qwen_cli_tokens(tokens)
finally:
os.umask(old_umask)

mode = stat.S_IMODE(auth_path.stat().st_mode)
parent_mode = stat.S_IMODE(auth_path.parent.stat().st_mode)

assert mode == 0o600, (
f"Qwen token file mode 0o{mode:o} != 0o600 — TOCTOU race regressed"
)
assert parent_mode == 0o700, (
f"Qwen token parent dir mode 0o{parent_mode:o} != 0o700"
)

data = json.loads(auth_path.read_text())
assert data["access_token"] == "qwen-secret"


# ---------------------------------------------------------------------------
# Nous shared-credential store write (inside _write_shared_nous_state)
# ---------------------------------------------------------------------------


def test_shared_nous_store_writes_0o600_with_0o700_parent(tmp_path, monkeypatch):
"""The Nous shared-credential store must land at 0o600 / parent 0o700."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# _nous_shared_store_path() refuses to touch the real shared store during
# pytest runs; redirect it into tmp_path explicitly.
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(tmp_path / "shared"))
old_umask = os.umask(0o022)
try:
from hermes_cli import auth as auth_mod

state = {
"access_token": "nous-access-xxx",
"refresh_token": "nous-refresh-xxx",
"token_type": "Bearer",
"scope": "openid profile",
"client_id": "test-client",
"obtained_at": "2026-01-01T00:00:00Z",
"expires_at": "2026-01-01T01:00:00Z",
}
auth_mod._write_shared_nous_state(state)
path = auth_mod._nous_shared_store_path()
finally:
os.umask(old_umask)

assert path.exists(), "shared Nous store was not written"
mode = stat.S_IMODE(path.stat().st_mode)
parent_mode = stat.S_IMODE(path.parent.stat().st_mode)

assert mode == 0o600, (
f"Nous shared store mode 0o{mode:o} != 0o600 — TOCTOU race regressed"
)
assert parent_mode == 0o700, (
f"Nous shared store parent dir mode 0o{parent_mode:o} != 0o700"
)

data = json.loads(path.read_text())
assert data["refresh_token"] == "nous-refresh-xxx"


# ---------------------------------------------------------------------------
# Atomicity: verify ``os.open`` is called with an explicit 0o600 mode.
# ---------------------------------------------------------------------------


def test_save_auth_store_uses_os_open_with_0o600_mode(tmp_path, monkeypatch):
"""Regression: the writer must call ``os.open`` with an explicit restricted
mode so the file is created at 0o600 atomically — closing the TOCTOU
window the previous ``Path.open('w')`` left open (fd inherited process
umask and was briefly 0o644 before post-write chmod)."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))

observed_opens: list[tuple[str, int, int]] = []
real_os_open = os.open

def spying_os_open(path, flags, mode=0o777, *args, **kwargs):
observed_opens.append((str(path), flags, mode))
return real_os_open(path, flags, mode, *args, **kwargs)

with patch.object(os, "open", spying_os_open):
from hermes_cli import auth as auth_mod

auth_mod._save_auth_store(
{"version": auth_mod.AUTH_STORE_VERSION, "providers": {}}
)

auth_tmp_opens = [
(p, fl, m) for (p, fl, m) in observed_opens if "auth.json.tmp" in p
]
assert auth_tmp_opens, (
f"os.open was never called for the auth.json temp file; "
f"observed={observed_opens!r}"
)
for path, flags, mode in auth_tmp_opens:
assert flags & os.O_CREAT, f"auth.json temp open missing O_CREAT: path={path}"
assert flags & os.O_EXCL, (
f"auth.json temp open missing O_EXCL — TOCTOU-safe pattern regressed: "
f"path={path}, flags={flags}"
)
# Must be exactly S_IRUSR | S_IWUSR (0o600) — no group/other bits.
expected = stat.S_IRUSR | stat.S_IWUSR
assert mode == expected, (
f"auth.json temp open mode 0o{mode:o} != 0o{expected:o} — "
f"umask would apply and potentially expose tokens"
)
Loading