Skip to content

Rework PydanticAI adapter: upstream parity + granular checkpoints#156

Draft
safoinme wants to merge 4 commits intodevelopfrom
feature/pydanticai-integration
Draft

Rework PydanticAI adapter: upstream parity + granular checkpoints#156
safoinme wants to merge 4 commits intodevelopfrom
feature/pydanticai-integration

Conversation

@safoinme
Copy link
Copy Markdown
Contributor

@safoinme safoinme commented Apr 15, 2026

Summary

Reworks the PydanticAI adapter to mirror the upstream pydantic_ai.durable_exec.kitaru structure and adds Kitaru-specific durability controls on top.

  • Upstream parity. Split monolithic files into the upstream layout — _agent.py, _model.py, _toolset.py, _function_toolset.py, _mcp_server.py,
    _hitl.py, _policy.py, _events.py, _tracking.py, _otel.py, _kitaru_internal.py, _utils.py, _run_context.py. A narrow ruff ignore set
    (UP006/007/008/035/045, E501, I001, SIM105) keeps vendored style stable against upstream.
  • Granular checkpoint mode. New KitaruAgent(granular_checkpoints=True, model_checkpoint_config=..., tool_checkpoint_config=..., tool_checkpoint_config_by_name=..., mcp_checkpoint_config=...) opens a checkpoint per model / tool / MCP call instead of one per turn. Per-tool overrides accept
    False to opt a tool out entirely. Turn mode (default) keeps the aggregating run artifact.
  • turn_checkpoint_config kwarg so users can tune retries / type on the auto-opened turn checkpoint.
  • persist_message_history=True. Opt-in conversation memory — the adapter remembers result.all_messages() on the instance and auto-injects it as
    message_history on the next call when the caller doesn't pass one. One instance = one conversation.
  • Streaming fallback. When an event_stream_handler is supplied, granular mode transparently falls back to a turn checkpoint — per-call checkpointing can't drain
    an async stream inside a sync ZenML step.
  • MCP cache_tools honored to skip redundant tools/list round-trips on checkpoint re-entry; MCP calls are tagged toolset_kind='mcp' and default to
    type='mcp_call' in the dashboard.
  • HITL bridging. PydanticAI's native requires_approval=True, ApprovalRequired, and CallDeferred all route through kitaru.wait() with no decorator;
    hitl_tool(...) remains the explicit marker.
  • OTel correlation. Kitaru event IDs attached to the current span when opentelemetry is installed; no-ops otherwise.
  • Analytics. PYDANTIC_AI_WRAPPED reports granular_checkpoints and persist_message_history; run-completion events carry method + error class.
  • Import-time feature guard. Missing pydantic-ai-slim surfaces as KitaruFeatureNotAvailableError instead of an ImportError deep in the module graph.
  • Adapter README. src/kitaru/adapters/pydantic_ai/README.md documents concepts, install, usage patterns, checkpoint modes, streaming, capture policy,
    conversation memory, and troubleshooting — shaped for later upstreaming into PydanticAI docs.
  • Example + docs refreshed to show the auto-flow / auto-checkpoint path and the granular config knobs.

Known limits and follow-ups

Kitaru-core (not overcomable in this adapter):

  • No nested checkpoints — MVP rule. Granular mode runs at flow scope to avoid it.

Adapter follow-up (tracked, not in this PR):

  • runtime='isolated' — currently rejected with KitaruUsageError. Lifting the limit requires making every adapter wrapper (KitaruModel, KitaruToolset,
    KitaruMCPServer) reconstructible from serializable construction args on the far side of the process boundary and wiring KitaruRunContext through the granular
    dispatcher. The pydantic payloads already serialize via TypeAdapter, so most of the work is on the wrapper-identity side.

By design:

  • Auto-flow is local-only. Remote stacks need an explicit @kitaru.flow — serializing an arbitrary agent closure isn't worth the machinery when a one-line decorator
    does the job.

Test plan

  • just check passes (format, lint, typecheck, typos, yaml, actionlint, links)
  • just test passes — including tests/test_pydantic_ai_adapter.py covering reject_isolated_runtime, per-tool False opt-out, with_default_type
    immutability, the _use_granular streaming fallback, and persist_message_history load/store semantics
  • examples/pydantic_ai_agent/pydantic_ai_adapter.py runs end-to-end with and without granular_checkpoints=True
  • Two successive run_sync() calls on a persist_message_history=True instance see each other's turns without the caller threading message_history
  • Passing runtime='isolated' in any checkpoint config raises KitaruUsageError with the guidance message
  • MCP example with cache_tools=True skips tools/list on checkpoint re-entry
  • HITL via requires_approval=True, ApprovalRequired, CallDeferred, and hitl_tool all suspend and resume through kitaru.wait()
  • Streaming: event_stream_handler with granular_checkpoints=True records a turn checkpoint (fallback); run_stream() / iter() require explicit
    @kitaru.checkpoint in both modes

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 15, 2026

Site Preview

Preview URL https://kitaru-site-preview-156.zenml-migration.workers.dev
Branch feature/pydanticai-integration
Commit 583a42bbdad9ef0e7da975b11e3ca14cde671f73

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant