Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bf05808
docs(spec): stakeholder interview subagents design
ChristianMoellmann May 23, 2026
815e475
docs(plan): stakeholder interview subagents implementation plan
ChristianMoellmann May 23, 2026
f63bc55
chore(interviews): add deps and pytest scaffold for interview subsystem
ChristianMoellmann May 23, 2026
071f8b5
feat(interviews): add interview config keys (token budget, workers, l…
ChristianMoellmann May 23, 2026
f1898b4
feat(interviews): add pydantic models for instruments and responses
ChristianMoellmann May 23, 2026
29be754
feat(interviews): YAML instrument loader with pydantic validation and…
ChristianMoellmann May 23, 2026
eb3c362
feat(interviews): LLM stub mode for deterministic CI tests
ChristianMoellmann May 23, 2026
289a0cf
feat(interviews): StakeholderInterviewer base with in-character promp…
ChristianMoellmann May 23, 2026
0fcb815
feat(interviews): longitudinal subagent + 12-item Likert instrument
ChristianMoellmann May 23, 2026
75762cc
feat(interviews): diversity subagent with Q-sort + 6 Likert axes + PC…
ChristianMoellmann May 23, 2026
5d7111b
feat(interviews): Delphi subagent (3 rounds: open, rate, revise) + co…
ChristianMoellmann May 23, 2026
ae4941d
feat(interviews): scenario subagent with 4 futures × 4 dimensions + p…
ChristianMoellmann May 23, 2026
998cf1a
feat(interviews): JSONL/JSON storage layout with run_id directories a…
ChristianMoellmann May 23, 2026
cca6736
feat(interviews): Zep writer adapts add_activity/add_text_episode for…
ChristianMoellmann May 23, 2026
b3e2039
feat(interviews): orchestrator with two-phase lifecycle, parallel fan…
ChristianMoellmann May 23, 2026
3322bcb
feat(interviews): on_ready / on_completed hook registry on Simulation…
ChristianMoellmann May 23, 2026
d79c81d
feat(interviews): synthesiser emits cross-method report + tidy CSV + …
ChristianMoellmann May 23, 2026
bc07170
feat(interviews): persona + Zep memory adapters bridging existing ser…
ChristianMoellmann May 23, 2026
52bae0a
feat(interviews): Flask blueprint /api/interview with task-based asyn…
ChristianMoellmann May 23, 2026
61f13a8
test(interviews): end-to-end pipeline test + content-aware LLM stubs …
ChristianMoellmann May 23, 2026
fede66c
feat(interviews): Step4b Vue scaffold with five-tab navigation, API c…
ChristianMoellmann May 23, 2026
acaa061
feat(interviews): d3 visualisations for longitudinal Δ, diversity PCA…
ChristianMoellmann May 23, 2026
6b04ea5
feat(interviews): auto-trigger lifecycle hooks + bridge SimulationRun…
ChristianMoellmann May 23, 2026
6e1489f
fix(interviews): wire Zep updater/memory/hooks correctly for producti…
ChristianMoellmann May 23, 2026
6a53c11
feat(interviews): capture raw LLM output on schema-validation failures
ChristianMoellmann May 23, 2026
895a5fb
fix(interviews): accept stringified ints in all 4 subagent validators
ChristianMoellmann May 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ def create_app(config_class=Config):
SimulationRunner.register_cleanup()
if should_log_startup:
logger.info("已注册模拟进程清理函数")

# Install interview lifecycle hooks on the SimulationManager class.
# Hooks are stored on the class itself (not on a particular instance), so
# any fresh `SimulationManager()` constructed later (e.g. per request in
# the Flask API) will see them. We still bridge `_notify_on_completed`
# into SimulationRunner via a transient instance so the runner's monitor
# thread fires the completed hooks when a simulation process exits.
from .services.simulation_manager import SimulationManager
from .services.interviews.lifecycle import install_hooks

install_hooks(SimulationManager)
SimulationRunner.register_on_completed(SimulationManager()._notify_on_completed)
if should_log_startup:
logger.info("已安装面试生命周期钩子")

# 请求日志中间件
@app.before_request
Expand All @@ -63,10 +77,8 @@ def log_response(response):
return response

# 注册蓝图
from .api import graph_bp, simulation_bp, report_bp
app.register_blueprint(graph_bp, url_prefix='/api/graph')
app.register_blueprint(simulation_bp, url_prefix='/api/simulation')
app.register_blueprint(report_bp, url_prefix='/api/report')
from .api import register_blueprints
register_blueprints(app)

# 健康检查
@app.route('/health')
Expand Down
11 changes: 10 additions & 1 deletion backend/app/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
API路由模块
"""

from flask import Blueprint
from flask import Blueprint, Flask

graph_bp = Blueprint('graph', __name__)
simulation_bp = Blueprint('simulation', __name__)
report_bp = Blueprint('report', __name__)
interview_bp = Blueprint('interview', __name__)

from . import graph # noqa: E402, F401
from . import simulation # noqa: E402, F401
from . import report # noqa: E402, F401
from . import interview # noqa: E402, F401


def register_blueprints(app: Flask) -> None:
"""Register all API blueprints on *app* with their canonical URL prefixes."""
app.register_blueprint(graph_bp, url_prefix='/api/graph')
app.register_blueprint(simulation_bp, url_prefix='/api/simulation')
app.register_blueprint(report_bp, url_prefix='/api/report')
app.register_blueprint(interview_bp, url_prefix='/api/interview')
225 changes: 225 additions & 0 deletions backend/app/api/interview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from __future__ import annotations
import threading
import traceback
import uuid
from pathlib import Path
from flask import Blueprint, jsonify, request, send_file
from app.config import Config
from app.models.interview import SubagentKind, InterviewPhase
from app.services.interviews.adapters import FileSystemPersonaProvider, ZepMemoryProvider
from app.services.interviews.zep_writer import InterviewZepWriter
from app.services.interview_orchestrator import InterviewOrchestrator
from app.services.interview_synthesizer import InterviewSynthesizer
from app.services.interviews.storage import InterviewStore
from app.utils.llm_client import LLMClient
from app.utils.logger import get_logger

from . import interview_bp

logger = get_logger(__name__)


class _NullUpdater:
"""No-op stand-in for ``ZepGraphMemoryUpdater`` used when Zep is unavailable.

Exposes ``add_text_episode`` so ``InterviewZepWriter._emit`` succeeds silently —
the interview pipeline still produces local artefacts; Zep just isn't updated.
"""

def add_text_episode(self, graph_id, text): # noqa: ARG002 - matches real API
return None


class _NullMemory:
"""Fallback memory provider that always reports unavailable digests."""

def get_digest(self, agent_id, max_chars=2000): # noqa: ARG002 - matches Protocol
from app.services.interviews.base import MemoryDigest
return MemoryDigest(text="[memory unavailable]", available=False)

_TASKS: dict[str, dict] = {}
_LOCK = threading.Lock()

INSTRUMENT_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments"


def _uploads_root() -> Path:
return Path(getattr(Config, "UPLOADS_DIR", "uploads"))


def _load_graph_id(sim_id: str) -> str:
"""Read the Zep ``graph_id`` for a simulation from its persisted state.

The graph_id is written by ``SimulationManager`` into
``uploads/simulations/{sim_id}/state.json``. Returns ``""`` if the state
file is missing or unreadable — callers should treat empty graph_id as
"Zep unavailable" and fall back to the null memory/writer path.
"""
try:
from app.services.simulation_manager import SimulationManager
state = SimulationManager().get_simulation(sim_id)
if state and state.graph_id:
return state.graph_id
except Exception as e: # pragma: no cover - defensive
logger.warning(f"_load_graph_id({sim_id}) failed: {e!r}")
return ""


def _build_orchestrator(sim_id: str) -> InterviewOrchestrator:
sim_dir = _uploads_root() / "simulations" / sim_id
reddit = sim_dir / "reddit_profiles.json"
twitter = sim_dir / "twitter_profiles.csv"
personas = FileSystemPersonaProvider(
reddit_path=reddit if reddit.exists() else None,
twitter_path=twitter if twitter.exists() else None,
)
# Build agent_id -> Zep entity uuid map from the persisted profile files.
agent_to_entity = personas.agent_to_entity()

# Resolve the graph_id from the simulation's persisted state — NOT from a
# ``graph_id.txt`` (nothing in the codebase writes such a file).
graph_id = _load_graph_id(sim_id)

memory: object
zep_writer: InterviewZepWriter
if not graph_id:
logger.warning(
f"interview: no graph_id for sim {sim_id} — Zep memory/writer disabled "
"(simulation state missing or graph_id empty)"
)
memory = _NullMemory()
zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="")
else:
try:
from app.services.zep_entity_reader import ZepEntityReader
from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater

reader = ZepEntityReader()
updater = ZepGraphMemoryUpdater(graph_id=graph_id)
memory = ZepMemoryProvider(
reader, graph_id=graph_id, agent_to_entity=agent_to_entity
)
zep_writer = InterviewZepWriter(memory_updater=updater, graph_id=graph_id)
if not agent_to_entity:
logger.warning(
f"interview: empty agent_to_entity map for sim {sim_id} — "
"memory digests will be unavailable. Check that profile files "
"include `source_entity_uuid`."
)
except Exception as e:
logger.warning(
f"interview: Zep init failed for sim {sim_id} ({e!r}); "
"falling back to null memory/writer"
)
memory = _NullMemory()
zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="")
llm = LLMClient(api_key=Config.LLM_API_KEY, base_url=Config.LLM_BASE_URL,
model=Config.LLM_MODEL_NAME)
return InterviewOrchestrator(
llm=llm, memory=memory, personas=personas,
instrument_dir=INSTRUMENT_DIR, store_root=_uploads_root(), sim_id=sim_id,
zep_writer=zep_writer, max_workers=Config.INTERVIEW_MAX_WORKERS,
language=Config.INTERVIEW_DEFAULT_LANGUAGE,
)


def _run_task(task_id: str, fn) -> None:
with _LOCK:
_TASKS[task_id] = {"status": "running", "progress": {}, "result": None, "error": None}
try:
result = fn(task_id)
with _LOCK:
_TASKS[task_id]["status"] = "completed"; _TASKS[task_id]["result"] = result
except Exception as e:
with _LOCK:
_TASKS[task_id]["status"] = "failed"
_TASKS[task_id]["error"] = repr(e)
_TASKS[task_id]["traceback"] = traceback.format_exc()


def _start_task(fn) -> str:
task_id = uuid.uuid4().hex[:12]
with _LOCK:
_TASKS[task_id] = {"status": "queued", "progress": {}, "result": None, "error": None}
threading.Thread(target=_run_task, args=(task_id, fn), daemon=True).start()
return task_id


def _envelope(data=None, error=None, status: int = 200):
body = {"success": error is None, "data": data or {}, "error": error}
return jsonify(body), status


@interview_bp.route("/<sim_id>/pre", methods=["POST"])
def post_pre(sim_id: str):
orch = _build_orchestrator(sim_id)
task_id = _start_task(lambda tid: orch.run_pre())
return _envelope({"task_id": task_id})


@interview_bp.route("/<sim_id>/post", methods=["POST"])
def post_post(sim_id: str):
orch = _build_orchestrator(sim_id)
def run(tid):
out = orch.run_post()
synth = InterviewSynthesizer(store=orch.store)
out["synthesis"] = synth.run()[:1000] # short preview
return out
task_id = _start_task(run)
return _envelope({"task_id": task_id})


@interview_bp.route("/<sim_id>/rerun", methods=["POST"])
def post_rerun(sim_id: str):
body = request.get_json(silent=True) or {}
sub = body.get("subagent")
try: subagent = SubagentKind(sub)
except ValueError: return _envelope(error=f"unknown subagent {sub!r}", status=400)
orch = _build_orchestrator(sim_id)
task_id = _start_task(lambda tid: orch.rerun(subagent))
return _envelope({"task_id": task_id})


@interview_bp.route("/<sim_id>/status", methods=["GET"])
def get_status(sim_id: str):
task_id = request.args.get("task_id")
with _LOCK:
task = _TASKS.get(task_id)
if task is None: return _envelope(error="unknown task_id", status=404)
return _envelope({"status": task["status"], "progress": task.get("progress", {}),
"result": task.get("result"), "error": task.get("error")})


@interview_bp.route("/<sim_id>/results/<subagent>", methods=["GET"])
def get_results(sim_id: str, subagent: str):
try: sub = SubagentKind(subagent)
except ValueError: return _envelope(error=f"unknown subagent {subagent!r}", status=400)
store = InterviewStore(root=_uploads_root(), sim_id=sim_id)
phase = InterviewPhase.T1 if sub != SubagentKind.LONGITUDINAL else InterviewPhase.T1
run = store.latest_run(phase, sub)
if run is None: return _envelope(error="no results yet", status=404)
agg = (run / "aggregate.json")
if not agg.exists(): return _envelope(error="aggregate missing", status=404)
import json as _j
return _envelope({"aggregate": _j.loads(agg.read_text(encoding="utf-8")),
"run_dir": str(run)})


@interview_bp.route("/<sim_id>/results/synthesis", methods=["GET"])
def get_synthesis(sim_id: str):
store = InterviewStore(root=_uploads_root(), sim_id=sim_id)
report = store.base / "synthesis" / "report.md"
if not report.exists():
synth = InterviewSynthesizer(store=store)
synth.run()
return _envelope({"report_markdown": report.read_text(encoding="utf-8")})


@interview_bp.route("/<sim_id>/export.csv", methods=["GET"])
def get_export_csv(sim_id: str):
store = InterviewStore(root=_uploads_root(), sim_id=sim_id)
csv_path = store.base / "synthesis" / "exports" / "all_responses.csv"
if not csv_path.exists():
InterviewSynthesizer(store=store).run()
return send_file(csv_path, mimetype="text/csv", as_attachment=True,
download_name=f"{sim_id}_interviews.csv")
8 changes: 8 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class Config:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '../uploads')
ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'}
# Root directory for simulation uploads (used by the interview subsystem)
UPLOADS_DIR = os.environ.get("UPLOADS_DIR", os.path.join(os.path.dirname(__file__), '../uploads'))

# 文本处理配置
DEFAULT_CHUNK_SIZE = 500 # 默认切块大小
Expand All @@ -62,6 +64,12 @@ class Config:
REPORT_AGENT_MAX_TOOL_CALLS = int(os.environ.get('REPORT_AGENT_MAX_TOOL_CALLS', '5'))
REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2'))
REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5'))

# Interview subsystem
INTERVIEW_MAX_TOKENS_PER_RUN = int(os.environ.get("INTERVIEW_MAX_TOKENS_PER_RUN", 15_000_000))
INTERVIEW_MAX_WORKERS = int(os.environ.get("INTERVIEW_MAX_WORKERS", 8))
INTERVIEW_DEFAULT_LANGUAGE = os.environ.get("INTERVIEW_DEFAULT_LANGUAGE", "de")
LLM_STUB_MODE = os.environ.get("LLM_STUB_MODE", "false").lower() == "true"

@classmethod
def validate(cls):
Expand Down
Loading