Skip to content

Commit ffdc581

Browse files
Hermes Sovereign AgentCoreclaude
andcommitted
feat(state): track per-skill cron invocations + EMA query for labyrinth
Adds a `skill_invocations` table to state.db and writes one row per skill listed on a cron job at completion (success or failure paths). Tokens, cost and duration are sourced from the existing session row. Includes a `skill_stats_daily` view that buckets invocations by day and (skill_name, model, provider), and a new `SessionDB.query_skill_ema()` method that applies exponential weighting (default alpha=0.3, ~5d half-life) so the dashboard can A/B local Qwen against external models once analyzer crons start firing. SCHEMA_VERSION 11 → 12. Pure additive: existing rows untouched, new table created on next connection-open via the existing executescript() path. No Alembic. Slash-command and ad-hoc skill_view invocations are NOT tracked in v1. Multi-skill crons over-account: each skill in `job["skills"]` gets the full session cost. Both are acceptable for the analyzer-cron use case (1 skill per cron) and can iterate later. Constraint: no new external dependencies — uses sqlite3 + stdlib only. Rejected: per-skill cost split (would require model attribution inside a single agent run, which Hermes does not currently track) | Reason: defer to v2 once #87 surfaces real-world skew. Confidence: high (smoke-tested end-to-end on tmp DB) Scope-risk: narrow (additive table, no existing-row touchpoints) Not-tested: live cron fire (validated by Step 3 — Pass 2 v2 handover) Machine: orion-terminal Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 601e5f1 commit ffdc581

2 files changed

Lines changed: 245 additions & 5 deletions

File tree

cron/scheduler.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
except ImportError:
2828
msvcrt = None
2929
from pathlib import Path
30-
from typing import List, Optional
30+
from typing import List, Optional, Tuple
3131

3232
# Add parent directory to path for imports BEFORE repo-level imports.
3333
# Without this, standalone invocations (e.g. after `hermes update` reloads
@@ -999,6 +999,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
999999
if prompt is None:
10001000
logger.info("Job '%s': script produced no output, skipping AI call.", job_name)
10011001
return True, "", SILENT_MARKER, None
1002+
_invoked_at: Optional[float] = _hermes_now().timestamp()
1003+
_skill_outcome: Optional[Tuple[bool, Optional[str]]] = None
10021004
origin = _resolve_origin(job)
10031005
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
10041006

@@ -1355,12 +1357,14 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
13551357
"""
13561358

13571359
logger.info("Job '%s' completed successfully", job_name)
1360+
_skill_outcome = (True, "complete")
13581361
return True, output, final_response, None
1359-
1362+
13601363
except Exception as e:
13611364
error_msg = f"{type(e).__name__}: {str(e)}"
13621365
logger.exception("Job '%s' failed: %s", job_name, error_msg)
1363-
1366+
_skill_outcome = (False, error_msg[:200] if error_msg else None)
1367+
13641368
output = f"""# Cron Job: {job_name} (FAILED)
13651369
13661370
**Job ID:** {job_id}
@@ -1393,6 +1397,44 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
13931397
for _var_name in _cron_delivery_vars:
13941398
_VAR_MAP[_var_name].set("")
13951399
if _session_db:
1400+
# Skill-invocation EMA hook (build #87): one row per
1401+
# skill listed on this cron job, with cost/duration/tokens
1402+
# sourced from the agent session row. Slash-command and
1403+
# ad-hoc skill_view calls are not tracked here in v1.
1404+
if _skill_outcome is not None and _invoked_at is not None:
1405+
_job_skills = job.get("skills") or []
1406+
if _job_skills:
1407+
try:
1408+
_completed_at = _hermes_now().timestamp()
1409+
_sess = _session_db.get_session(_cron_session_id) or {}
1410+
_success_flag, _end_reason_val = _skill_outcome
1411+
_duration = _completed_at - _invoked_at
1412+
for _skill_name in _job_skills:
1413+
_sn = str(_skill_name).strip()
1414+
if not _sn:
1415+
continue
1416+
_session_db.record_skill_invocation(
1417+
skill_name=_sn,
1418+
invoked_at=_invoked_at,
1419+
session_id=_cron_session_id,
1420+
cron_id=job_id,
1421+
completed_at=_completed_at,
1422+
duration_seconds=_duration,
1423+
model=_sess.get("model"),
1424+
provider=_sess.get("billing_provider"),
1425+
input_tokens=int(_sess.get("input_tokens") or 0),
1426+
output_tokens=int(_sess.get("output_tokens") or 0),
1427+
cache_read_tokens=int(_sess.get("cache_read_tokens") or 0),
1428+
cache_write_tokens=int(_sess.get("cache_write_tokens") or 0),
1429+
estimated_cost_usd=_sess.get("estimated_cost_usd"),
1430+
success=_success_flag,
1431+
end_reason=_end_reason_val,
1432+
)
1433+
except (Exception, KeyboardInterrupt) as e:
1434+
logger.debug(
1435+
"Job '%s': failed to record skill invocations: %s",
1436+
job_id, e,
1437+
)
13961438
try:
13971439
_session_db.end_session(_cron_session_id, "cron_complete")
13981440
except (Exception, KeyboardInterrupt) as e:

hermes_state.py

Lines changed: 200 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@
2525

2626
from agent.memory_manager import sanitize_context
2727
from hermes_constants import get_hermes_home
28-
from typing import Any, Callable, Dict, List, Optional, TypeVar
28+
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
2929

3030
logger = logging.getLogger(__name__)
3131

3232
T = TypeVar("T")
3333

3434
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
3535

36-
SCHEMA_VERSION = 11
36+
SCHEMA_VERSION = 12
3737

3838
SCHEMA_SQL = """
3939
CREATE TABLE IF NOT EXISTS schema_version (
@@ -98,6 +98,50 @@
9898
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
9999
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
100100
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
101+
102+
CREATE TABLE IF NOT EXISTS skill_invocations (
103+
id INTEGER PRIMARY KEY AUTOINCREMENT,
104+
session_id TEXT REFERENCES sessions(id),
105+
cron_id TEXT,
106+
skill_name TEXT NOT NULL,
107+
skill_version TEXT,
108+
invoked_at REAL NOT NULL,
109+
completed_at REAL,
110+
model TEXT,
111+
provider TEXT,
112+
duration_seconds REAL,
113+
input_tokens INTEGER DEFAULT 0,
114+
output_tokens INTEGER DEFAULT 0,
115+
cache_read_tokens INTEGER DEFAULT 0,
116+
cache_write_tokens INTEGER DEFAULT 0,
117+
estimated_cost_usd REAL,
118+
success INTEGER,
119+
end_reason TEXT,
120+
quality_score REAL
121+
);
122+
123+
CREATE INDEX IF NOT EXISTS idx_skill_invocations_skill ON skill_invocations(skill_name, invoked_at DESC);
124+
CREATE INDEX IF NOT EXISTS idx_skill_invocations_cron ON skill_invocations(cron_id, invoked_at DESC);
125+
CREATE INDEX IF NOT EXISTS idx_skill_invocations_session ON skill_invocations(session_id);
126+
127+
CREATE VIEW IF NOT EXISTS skill_stats_daily AS
128+
SELECT
129+
skill_name,
130+
model,
131+
provider,
132+
DATE(invoked_at, 'unixepoch') AS day,
133+
COUNT(*) AS invocation_count,
134+
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) AS success_count,
135+
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failure_count,
136+
AVG(duration_seconds) AS avg_duration_s,
137+
SUM(estimated_cost_usd) AS total_cost_usd,
138+
AVG(estimated_cost_usd) AS avg_cost_usd,
139+
SUM(input_tokens) AS total_input_tokens,
140+
SUM(output_tokens) AS total_output_tokens,
141+
AVG(quality_score) AS avg_quality_score,
142+
MAX(invoked_at) AS last_invoked_at
143+
FROM skill_invocations
144+
GROUP BY skill_name, model, provider, day;
101145
"""
102146

103147
FTS_SQL = """
@@ -677,6 +721,160 @@ def _do(conn):
677721
conn.execute(sql, params)
678722
self._execute_write(_do)
679723

724+
def record_skill_invocation(
725+
self,
726+
*,
727+
skill_name: str,
728+
invoked_at: float,
729+
session_id: Optional[str] = None,
730+
cron_id: Optional[str] = None,
731+
skill_version: Optional[str] = None,
732+
completed_at: Optional[float] = None,
733+
model: Optional[str] = None,
734+
provider: Optional[str] = None,
735+
duration_seconds: Optional[float] = None,
736+
input_tokens: int = 0,
737+
output_tokens: int = 0,
738+
cache_read_tokens: int = 0,
739+
cache_write_tokens: int = 0,
740+
estimated_cost_usd: Optional[float] = None,
741+
success: Optional[bool] = None,
742+
end_reason: Optional[str] = None,
743+
quality_score: Optional[float] = None,
744+
) -> int:
745+
"""Insert one ``skill_invocations`` row and return the new id.
746+
747+
One row per (skill_name, cron run) — see ``cron/scheduler.py`` for the
748+
cron-side caller. Slash-command and ad-hoc skill_view invocations are
749+
not tracked here in v1.
750+
751+
``success`` is stored as 0/1 to match the SQLite REAL-vs-INTEGER
752+
convention used elsewhere in the schema; ``None`` is left as NULL.
753+
"""
754+
sql = """INSERT INTO skill_invocations (
755+
session_id, cron_id, skill_name, skill_version,
756+
invoked_at, completed_at, model, provider,
757+
duration_seconds,
758+
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
759+
estimated_cost_usd, success, end_reason, quality_score
760+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
761+
success_int: Optional[int]
762+
if success is None:
763+
success_int = None
764+
else:
765+
success_int = 1 if success else 0
766+
params = (
767+
session_id, cron_id, skill_name, skill_version,
768+
invoked_at, completed_at, model, provider,
769+
duration_seconds,
770+
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
771+
estimated_cost_usd, success_int, end_reason, quality_score,
772+
)
773+
774+
new_id_holder: Dict[str, int] = {}
775+
776+
def _do(conn):
777+
cur = conn.execute(sql, params)
778+
new_id_holder["id"] = int(cur.lastrowid or 0)
779+
780+
self._execute_write(_do)
781+
return new_id_holder.get("id", 0)
782+
783+
def query_skill_ema(
784+
self,
785+
window_days: int = 14,
786+
alpha: float = 0.3,
787+
) -> List[Dict[str, Any]]:
788+
"""Per-(skill_name, model) exponentially-weighted moving averages.
789+
790+
Reads ``skill_stats_daily`` for the last *window_days* and applies
791+
exponential weighting where the most recent day has weight ``alpha``,
792+
the next has ``alpha * (1-alpha)``, and so on. The default
793+
``alpha=0.3`` gives roughly a 5-day half-life — recent behaviour
794+
dominates without dropping older data on a hard window.
795+
796+
Returns a list of dicts (one per (skill_name, model) bucket) with
797+
keys::
798+
799+
skill_name, model, provider,
800+
sample_count, success_count, failure_count,
801+
ema_success_rate, ema_duration_s, ema_cost_per_call,
802+
days_with_data, last_invoked_at
803+
804+
Sorted by ``last_invoked_at`` descending so the dashboard surfaces
805+
currently-active skills first. Buckets with zero rows in the window
806+
are silently omitted.
807+
"""
808+
if window_days <= 0 or alpha <= 0 or alpha >= 1:
809+
return []
810+
cutoff_ts = time.time() - window_days * 86400.0
811+
sql = (
812+
"SELECT skill_name, model, provider, day, "
813+
"invocation_count, success_count, failure_count, "
814+
"avg_duration_s, avg_cost_usd, avg_quality_score, last_invoked_at "
815+
"FROM skill_stats_daily "
816+
"WHERE last_invoked_at >= ? "
817+
"ORDER BY skill_name, model, day"
818+
)
819+
with self._lock:
820+
cur = self._conn.execute(sql, (cutoff_ts,))
821+
rows = [dict(r) for r in cur.fetchall()]
822+
823+
groups: Dict[Tuple[str, Optional[str]], List[Dict[str, Any]]] = {}
824+
for r in rows:
825+
key = (r["skill_name"], r["model"])
826+
groups.setdefault(key, []).append(r)
827+
828+
out: List[Dict[str, Any]] = []
829+
for (skill_name, model), rs in groups.items():
830+
rs.sort(key=lambda r: r["day"])
831+
n = len(rs)
832+
raw_weights = [alpha * (1 - alpha) ** (n - 1 - i) for i in range(n)]
833+
wsum = sum(raw_weights)
834+
weights = (
835+
[w / wsum for w in raw_weights]
836+
if wsum > 0
837+
else [1.0 / n] * n
838+
)
839+
840+
sample_count = sum(int(r["invocation_count"] or 0) for r in rs)
841+
success_count = sum(int(r["success_count"] or 0) for r in rs)
842+
failure_count = sum(int(r["failure_count"] or 0) for r in rs)
843+
844+
ema_success_rate = sum(
845+
w * (
846+
(int(r["success_count"] or 0) / max(1, int(r["invocation_count"] or 0)))
847+
)
848+
for w, r in zip(weights, rs)
849+
)
850+
ema_duration = sum(
851+
w * float(r["avg_duration_s"] or 0.0)
852+
for w, r in zip(weights, rs)
853+
)
854+
ema_cost = sum(
855+
w * float(r["avg_cost_usd"] or 0.0)
856+
for w, r in zip(weights, rs)
857+
)
858+
last_invoked = max(float(r["last_invoked_at"] or 0.0) for r in rs)
859+
provider_latest = rs[-1].get("provider")
860+
861+
out.append({
862+
"skill_name": skill_name,
863+
"model": model,
864+
"provider": provider_latest,
865+
"sample_count": sample_count,
866+
"success_count": success_count,
867+
"failure_count": failure_count,
868+
"ema_success_rate": ema_success_rate,
869+
"ema_duration_s": ema_duration,
870+
"ema_cost_per_call": ema_cost,
871+
"days_with_data": n,
872+
"last_invoked_at": last_invoked,
873+
})
874+
875+
out.sort(key=lambda x: x["last_invoked_at"], reverse=True)
876+
return out
877+
680878
def ensure_session(
681879
self,
682880
session_id: str,

0 commit comments

Comments
 (0)