feat(state): track per-skill cron invocations + EMA query for labyrinth#19507
feat(state): track per-skill cron invocations + EMA query for labyrinth#19507Brecht-H wants to merge 2 commits into
Conversation
Adds a `skill_invocations` table to state.db and writes one row per skill listed on a cron job at completion (success or failure paths). Tokens, cost and duration are sourced from the existing session row. Includes a `skill_stats_daily` view that buckets invocations by day and (skill_name, model, provider), and a new `SessionDB.query_skill_ema()` method that applies exponential weighting (default alpha=0.3, ~5d half-life) so the dashboard can A/B local Qwen against external models once analyzer crons start firing. SCHEMA_VERSION 11 → 12. Pure additive: existing rows untouched, new table created on next connection-open via the existing executescript() path. No Alembic. Slash-command and ad-hoc skill_view invocations are NOT tracked in v1. Multi-skill crons over-account: each skill in `job["skills"]` gets the full session cost. Both are acceptable for the analyzer-cron use case (1 skill per cron) and can iterate later. Constraint: no new external dependencies — uses sqlite3 + stdlib only. Rejected: per-skill cost split (would require model attribution inside a single agent run, which Hermes does not currently track) | Reason: defer to v2 once NousResearch#87 surfaces real-world skew. Confidence: high (smoke-tested end-to-end on tmp DB) Scope-risk: narrow (additive table, no existing-row touchpoints) Not-tested: live cron fire (validated by Step 3 — Pass 2 v2 handover) Machine: orion-terminal Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Superseded by clean rebase onto upstream main — see new PR (link below). The original branch was based on local main which carries an unrelated TUI-truncation commit also in PR #18548; the clean rebase keeps this PR scoped to the 2 files actually changed by Step 1. |
There was a problem hiding this comment.
Pull request overview
Adds new persistence and query primitives in state.db to support per-skill cron observability (including an EMA query intended for dashboards), and updates the cron runner to record one invocation row per configured skill. Also introduces a TUI gateway safety valve that truncates oversized tool outputs before assembling the LLM payload.
Changes:
- Add
skill_invocationstable +skill_stats_dailyview, bumpSCHEMA_VERSIONto 12, and introduceSessionDB.record_skill_invocation()/SessionDB.query_skill_ema(). - Update
cron/scheduler.pyto persist per-skill invocation rows for each cron run (success/failure). - Add gateway-side per-turn truncation of tool outputs, with focused tests.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
hermes_state.py |
Adds schema objects and new write/read APIs for skill invocation tracking and EMA aggregation. |
cron/scheduler.py |
Hooks cron completion/failure paths to record skill invocation rows sourced from the session row usage/cost fields. |
tui_gateway/server.py |
Truncates oversized tool outputs in conversation history before agent payload assembly. |
tests/tui_gateway/test_tool_output_truncation.py |
Adds unit tests for the new truncation helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| messages: list[dict], | ||
| max_chars: int = _MAX_TOOL_CHARS, | ||
| ) -> list[dict]: | ||
| """Return a shallow copy of *messages* with oversized tool texts truncated. | ||
|
|
||
| Only tool messages, or flat role-less messages that look like normalized | ||
| tool output, are rewritten. A trailing ``... [truncated, N chars total]`` | ||
| banner is appended so the model still knows data was dropped. | ||
| """ | ||
| out: list[dict] = [] |
| # Safety valve: truncate oversized tool outputs before the agent | ||
| # assembles the LLM payload, preventing mid-turn context-window | ||
| # stalls when parallel tools dump large stdout. | ||
| safe_history = _truncate_tool_messages(history) | ||
|
|
||
| result = agent.run_conversation( | ||
| run_message, | ||
| conversation_history=list(history), | ||
| conversation_history=safe_history, |
| sample_count = sum(int(r["invocation_count"] or 0) for r in rs) | ||
| success_count = sum(int(r["success_count"] or 0) for r in rs) | ||
| failure_count = sum(int(r["failure_count"] or 0) for r in rs) | ||
|
|
||
| ema_success_rate = sum( | ||
| w * ( | ||
| (int(r["success_count"] or 0) / max(1, int(r["invocation_count"] or 0))) | ||
| ) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| ema_duration = sum( | ||
| w * float(r["avg_duration_s"] or 0.0) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| ema_cost = sum( | ||
| w * float(r["avg_cost_usd"] or 0.0) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| last_invoked = max(float(r["last_invoked_at"] or 0.0) for r in rs) | ||
| provider_latest = rs[-1].get("provider") | ||
|
|
||
| out.append({ | ||
| "skill_name": skill_name, | ||
| "model": model, | ||
| "provider": provider_latest, | ||
| "sample_count": sample_count, | ||
| "success_count": success_count, | ||
| "failure_count": failure_count, | ||
| "ema_success_rate": ema_success_rate, | ||
| "ema_duration_s": ema_duration, | ||
| "ema_cost_per_call": ema_cost, | ||
| "days_with_data": n, | ||
| "last_invoked_at": last_invoked, | ||
| }) |
| AVG(duration_seconds) AS avg_duration_s, | ||
| SUM(estimated_cost_usd) AS total_cost_usd, | ||
| AVG(estimated_cost_usd) AS avg_cost_usd, | ||
| SUM(input_tokens) AS total_input_tokens, | ||
| SUM(output_tokens) AS total_output_tokens, | ||
| AVG(quality_score) AS avg_quality_score, |
| DEFAULT_DB_PATH = get_hermes_home() / "state.db" | ||
|
|
||
| SCHEMA_VERSION = 11 | ||
| SCHEMA_VERSION = 12 |
|
|
||
| CREATE TABLE IF NOT EXISTS skill_invocations ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| session_id TEXT REFERENCES sessions(id), |
| def record_skill_invocation( | ||
| self, | ||
| *, | ||
| skill_name: str, | ||
| invoked_at: float, | ||
| session_id: Optional[str] = None, | ||
| cron_id: Optional[str] = None, | ||
| skill_version: Optional[str] = None, | ||
| completed_at: Optional[float] = None, | ||
| model: Optional[str] = None, | ||
| provider: Optional[str] = None, | ||
| duration_seconds: Optional[float] = None, | ||
| input_tokens: int = 0, | ||
| output_tokens: int = 0, | ||
| cache_read_tokens: int = 0, | ||
| cache_write_tokens: int = 0, | ||
| estimated_cost_usd: Optional[float] = None, | ||
| success: Optional[bool] = None, | ||
| end_reason: Optional[str] = None, | ||
| quality_score: Optional[float] = None, | ||
| ) -> int: | ||
| """Insert one ``skill_invocations`` row and return the new id. | ||
|
|
||
| One row per (skill_name, cron run) — see ``cron/scheduler.py`` for the | ||
| cron-side caller. Slash-command and ad-hoc skill_view invocations are | ||
| not tracked here in v1. | ||
|
|
||
| ``success`` is stored as 0/1 to match the SQLite REAL-vs-INTEGER | ||
| convention used elsewhere in the schema; ``None`` is left as NULL. | ||
| """ | ||
| sql = """INSERT INTO skill_invocations ( | ||
| session_id, cron_id, skill_name, skill_version, | ||
| invoked_at, completed_at, model, provider, | ||
| duration_seconds, | ||
| input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, | ||
| estimated_cost_usd, success, end_reason, quality_score | ||
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""" | ||
| success_int: Optional[int] | ||
| if success is None: | ||
| success_int = None | ||
| else: | ||
| success_int = 1 if success else 0 | ||
| params = ( | ||
| session_id, cron_id, skill_name, skill_version, | ||
| invoked_at, completed_at, model, provider, | ||
| duration_seconds, | ||
| input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, | ||
| estimated_cost_usd, success_int, end_reason, quality_score, | ||
| ) | ||
|
|
||
| new_id_holder: Dict[str, int] = {} | ||
|
|
||
| def _do(conn): | ||
| cur = conn.execute(sql, params) | ||
| new_id_holder["id"] = int(cur.lastrowid or 0) | ||
|
|
||
| self._execute_write(_do) | ||
| return new_id_holder.get("id", 0) | ||
|
|
||
| def query_skill_ema( | ||
| self, | ||
| window_days: int = 14, | ||
| alpha: float = 0.3, | ||
| ) -> List[Dict[str, Any]]: | ||
| """Per-(skill_name, model) exponentially-weighted moving averages. | ||
|
|
||
| Reads ``skill_stats_daily`` for the last *window_days* and applies | ||
| exponential weighting where the most recent day has weight ``alpha``, | ||
| the next has ``alpha * (1-alpha)``, and so on. The default | ||
| ``alpha=0.3`` gives roughly a 5-day half-life — recent behaviour | ||
| dominates without dropping older data on a hard window. | ||
|
|
||
| Returns a list of dicts (one per (skill_name, model) bucket) with | ||
| keys:: | ||
|
|
||
| skill_name, model, provider, | ||
| sample_count, success_count, failure_count, | ||
| ema_success_rate, ema_duration_s, ema_cost_per_call, | ||
| days_with_data, last_invoked_at | ||
|
|
||
| Sorted by ``last_invoked_at`` descending so the dashboard surfaces | ||
| currently-active skills first. Buckets with zero rows in the window | ||
| are silently omitted. | ||
| """ | ||
| if window_days <= 0 or alpha <= 0 or alpha >= 1: | ||
| return [] | ||
| cutoff_ts = time.time() - window_days * 86400.0 | ||
| sql = ( | ||
| "SELECT skill_name, model, provider, day, " | ||
| "invocation_count, success_count, failure_count, " | ||
| "avg_duration_s, avg_cost_usd, avg_quality_score, last_invoked_at " | ||
| "FROM skill_stats_daily " | ||
| "WHERE last_invoked_at >= ? " | ||
| "ORDER BY skill_name, model, day" | ||
| ) | ||
| with self._lock: | ||
| cur = self._conn.execute(sql, (cutoff_ts,)) | ||
| rows = [dict(r) for r in cur.fetchall()] | ||
|
|
||
| groups: Dict[Tuple[str, Optional[str]], List[Dict[str, Any]]] = {} | ||
| for r in rows: | ||
| key = (r["skill_name"], r["model"]) | ||
| groups.setdefault(key, []).append(r) | ||
|
|
||
| out: List[Dict[str, Any]] = [] | ||
| for (skill_name, model), rs in groups.items(): | ||
| rs.sort(key=lambda r: r["day"]) | ||
| n = len(rs) | ||
| raw_weights = [alpha * (1 - alpha) ** (n - 1 - i) for i in range(n)] | ||
| wsum = sum(raw_weights) | ||
| weights = ( | ||
| [w / wsum for w in raw_weights] | ||
| if wsum > 0 | ||
| else [1.0 / n] * n | ||
| ) | ||
|
|
||
| sample_count = sum(int(r["invocation_count"] or 0) for r in rs) | ||
| success_count = sum(int(r["success_count"] or 0) for r in rs) | ||
| failure_count = sum(int(r["failure_count"] or 0) for r in rs) | ||
|
|
||
| ema_success_rate = sum( | ||
| w * ( | ||
| (int(r["success_count"] or 0) / max(1, int(r["invocation_count"] or 0))) | ||
| ) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| ema_duration = sum( | ||
| w * float(r["avg_duration_s"] or 0.0) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| ema_cost = sum( | ||
| w * float(r["avg_cost_usd"] or 0.0) | ||
| for w, r in zip(weights, rs) | ||
| ) | ||
| last_invoked = max(float(r["last_invoked_at"] or 0.0) for r in rs) | ||
| provider_latest = rs[-1].get("provider") | ||
|
|
||
| out.append({ | ||
| "skill_name": skill_name, | ||
| "model": model, | ||
| "provider": provider_latest, | ||
| "sample_count": sample_count, | ||
| "success_count": success_count, | ||
| "failure_count": failure_count, | ||
| "ema_success_rate": ema_success_rate, | ||
| "ema_duration_s": ema_duration, | ||
| "ema_cost_per_call": ema_cost, | ||
| "days_with_data": n, | ||
| "last_invoked_at": last_invoked, | ||
| }) | ||
|
|
||
| out.sort(key=lambda x: x["last_invoked_at"], reverse=True) | ||
| return out | ||
|
|
| def test_env_override_changes_default(self) -> None: | ||
| old = os.environ.get("HERMES_TUI_MAX_TOOL_CHARS") | ||
| try: | ||
| os.environ["HERMES_TUI_MAX_TOOL_CHARS"] = "1234" | ||
| # Force re-import of the module-level constant by re-reading | ||
| max_chars = int(os.environ.get("HERMES_TUI_MAX_TOOL_CHARS", "8000")) | ||
| big = "z" * 5000 | ||
| msgs = [{"role": "tool", "content": big}] | ||
| result = server._truncate_tool_messages(msgs, max_chars=max_chars) | ||
| assert len(result[0]["content"]) <= 1300 |
| # Default threshold is tuned for qwen3.6-27b (~32k context). Each tool | ||
| # message is capped at 8k chars (≈2k tokens), which leaves room for system | ||
| # prompt + prior history + response headroom. | ||
| _MAX_TOOL_CHARS = int(os.environ.get("HERMES_TUI_MAX_TOOL_CHARS", "8000")) |
Summary
Adds a
skill_invocationstable tostate.db(SQLite), wires the cron runner to write one row per skill listed on a cron job at completion (success and failure paths), and exposes aSessionDB.query_skill_ema()method so observability dashboards can A/B local Qwen against external models.This is the data layer for Hermes-platform per-skill EMA tracking (build #87) — needed before installing analyzer crons that route to paid providers (DeepSeek, Kimi-via-OpenRouter, etc.) without flying blind on quality vs cost.
Changes
Schema (
hermes_state.py)skill_invocations(session_id, cron_id, skill_name, model, provider, duration, tokens, cost, success, end_reason, …)skill_stats_dailyaggregating counts/cost/duration per (skill_name, model, provider, day)SCHEMA_VERSIONbumped 11 → 12. Pure additive — no Alembic, no destructive ops on existing rows.Writer (
hermes_state.py)SessionDB.record_skill_invocation(...)keyword-only API, uses the existing_execute_writeWAL-safe helper.Cron hook (
cron/scheduler.py)run_job()now records timestamps around the agent run and writes oneskill_invocationsrow per name injob["skills"]after the agent exits (both success and failure paths). Tokens / cost / duration are sourced from the existingsessionsrow, so no new instrumentation in the agent loop.Read API (
hermes_state.py)SessionDB.query_skill_ema(window_days=14, alpha=0.3)returns per-(skill_name, model) exponentially-weighted moving averages for success rate, cost-per-call, and duration. Default α≈5d half-life.Smoke test (local, off-tree)
Inserts validated for success=True / success=False / success=None paths. View aggregates cleanly. EMA query returns expected exponential weighting (verified 8-row dataset across 7 days).
v1 scope notes
skill_view()calls are not tracked yet — out of scope for v1, additive in v2 if needed.job["skills"]gets the full session cost. The current analyzer pattern is one skill per cron (Pass 2 v2 Steps 3–5), so this corner case is rare. Per-skill cost split would require model-attributed instrumentation inside the agent run, which Hermes does not currently track.query_skill_ema) lands here. The plumbing inplugins/hermes-labyrinth/dashboard/plugin_api.py(a 5-line@router.get('/skills/ema')wrapper) is a follow-up because labyrinth is upstreamed atstainlu/hermes-labyrinth— no Brecht-H fork yet, no clean place for the change tonight.Test plan
record_skill_invocation()insert +skill_stats_dailyaggregationquery_skill_ema()returns expected EMA valuesanalyze:derivatives-anomaly)Pass 2 v2 cross-reference
~/hermes-plan/handovers/2026-05-03-HANDOVER-orion-research-pass2-v2-corrected.md~/hermes-plan/PASS2_STEP0_PROVIDER_AUDIT_2026-05-03.md— confirmed raw-SQL migration path (no Alembic) before this PR.🤖 Generated with Claude Code