Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion deploy/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,11 @@ MCP is an open protocol that standardizes how applications provide context to LL

### Connecting via MCP

The Crawl4AI server exposes two MCP endpoints:
The Crawl4AI server exposes three MCP endpoints:

- **Server-Sent Events (SSE)**: `http://localhost:11235/mcp/sse`
- **WebSocket**: `ws://localhost:11235/mcp/ws`
- **Streamable_http**: `http://localhost:11235/mcp/http`

### Using with Claude Code

Expand Down Expand Up @@ -297,6 +298,13 @@ You can test the MCP WebSocket connection using the test file included in the re
python tests/mcp/test_mcp_socket.py
```

You can test the MCP streamable_http connection using the test file included in the repository:

```bash
# From the repository root
python tests/mcp/test_mcp_http.py
```

### MCP Schemas

Access the MCP tool schemas at `http://localhost:11235/mcp/schema` for detailed information on each tool's parameters and capabilities.
Expand Down
107 changes: 82 additions & 25 deletions deploy/docker/mcp_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from __future__ import annotations
import inspect, json, re, anyio
from contextlib import suppress
from typing import Any, Callable, Dict, List, Tuple
from typing import Any, AsyncContextManager, Callable, Dict, List, Tuple
import httpx

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import JSONResponse
from fastapi import Request
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from mcp.server.sse import SseServerTransport
Expand All @@ -17,28 +18,42 @@
from mcp.server.lowlevel.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions

from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
import contextlib
from collections.abc import AsyncIterator


# ── opt‑in decorators ───────────────────────────────────────────
def mcp_resource(name: str | None = None):
def deco(fn):
fn.__mcp_kind__, fn.__mcp_name__ = "resource", name
return fn

return deco


def mcp_template(name: str | None = None):
def deco(fn):
fn.__mcp_kind__, fn.__mcp_name__ = "template", name
return fn

return deco


def mcp_tool(name: str | None = None):
def deco(fn):
fn.__mcp_kind__, fn.__mcp_name__ = "tool", name
return fn

return deco


# ── HTTP‑proxy helper for FastAPI endpoints ─────────────────────
def _make_http_proxy(base_url: str, route):
method = list(route.methods - {"HEAD", "OPTIONS"})[0]

async def proxy(**kwargs):
# replace `/items/{id}` style params first
path = route.path
Expand All @@ -61,16 +76,18 @@ async def proxy(**kwargs):
except httpx.HTTPStatusError as e:
# surface FastAPI error details instead of plain 500
raise HTTPException(e.response.status_code, e.response.text)

return proxy


# ── main entry point ────────────────────────────────────────────
def attach_mcp(
app: FastAPI,
*, # keyword‑only
*, # keyword‑only
base: str = "/mcp",
name: str | None = None,
base_url: str, # eg. "http://127.0.0.1:8020"
) -> None:
base_url: str, # eg. "http://127.0.0.1:8020"
) -> Callable[[Starlette], AsyncContextManager[None]]:
"""Call once after all routes are declared to expose WS+SSE MCP endpoints."""
server_name = name or app.title or "FastAPI-MCP"
mcp = Server(server_name)
Expand Down Expand Up @@ -116,32 +133,39 @@ def _body_model(fn: Callable) -> type[BaseModel] | None:
async def _list_tools() -> List[t.Tool]:
out = []
for k, (proxy, orig_fn) in tools.items():
desc = getattr(orig_fn, "__mcp_description__", None) or inspect.getdoc(orig_fn) or ""
schema = getattr(orig_fn, "__mcp_schema__", None) or _schema(_body_model(orig_fn))
out.append(
t.Tool(name=k, description=desc, inputSchema=schema)
desc = (
getattr(orig_fn, "__mcp_description__", None)
or inspect.getdoc(orig_fn)
or ""
)
schema = getattr(orig_fn, "__mcp_schema__", None) or _schema(
_body_model(orig_fn)
)
out.append(t.Tool(name=k, description=desc, inputSchema=schema))
return out


@mcp.call_tool()
async def _call_tool(name: str, arguments: Dict | None) -> List[t.TextContent]:
if name not in tools:
raise HTTPException(404, "tool not found")

proxy, _ = tools[name]
try:
res = await proxy(**(arguments or {}))
except HTTPException as exc:
# map server‑side errors into MCP "text/error" payloads
err = {"error": exc.status_code, "detail": exc.detail}
return [t.TextContent(type = "text", text=json.dumps(err))]
return [t.TextContent(type = "text", text=json.dumps(res, default=str))]
return [t.TextContent(type="text", text=json.dumps(err))]
return [t.TextContent(type="text", text=json.dumps(res, default=str))]

@mcp.list_resources()
async def _list_resources() -> List[t.Resource]:
return [
t.Resource(name=k, description=inspect.getdoc(f) or "", mime_type="application/json")
t.Resource(
name=k,
description=inspect.getdoc(f) or "",
mime_type="application/json",
)
for k, f in resources.items()
]

Expand All @@ -150,17 +174,15 @@ async def _read_resource(name: str) -> List[t.TextContent]:
if name not in resources:
raise HTTPException(404, "resource not found")
res = resources[name]()
return [t.TextContent(type = "text", text=json.dumps(res, default=str))]
return [t.TextContent(type="text", text=json.dumps(res, default=str))]

@mcp.list_resource_templates()
async def _list_templates() -> List[t.ResourceTemplate]:
return [
t.ResourceTemplate(
name=k,
description=inspect.getdoc(f) or "",
parameters={
p: {"type": "string"} for p in _path_params(app, f)
},
parameters={p: {"type": "string"} for p in _path_params(app, f)},
)
for k, f in templates.items()
]
Expand All @@ -183,12 +205,13 @@ async def _ws(ws: WebSocket):

from pydantic import TypeAdapter
from mcp.types import JSONRPCMessage

adapter = TypeAdapter(JSONRPCMessage)

init_done = anyio.Event()

async def srv_to_ws():
first = True
first = True
try:
async for msg in s2c_recv:
await ws.send_json(msg.model_dump())
Expand All @@ -198,15 +221,15 @@ async def srv_to_ws():
finally:
# make sure cleanup survives TaskGroup cancellation
with anyio.CancelScope(shield=True):
with suppress(RuntimeError): # idempotent close
with suppress(RuntimeError): # idempotent close
await ws.close()

async def ws_to_srv():
try:
# 1st frame is always "initialize"
first = adapter.validate_python(await ws.receive_json())
await c2s_send.send(first)
await init_done.wait() # block until server ready
await init_done.wait() # block until server ready
while True:
data = await ws.receive_json()
await c2s_send.send(adapter.validate_python(data))
Expand All @@ -231,20 +254,54 @@ async def _mcp_sse(request: Request):
# client → server frames are POSTed here
app.mount(f"{base}/messages", app=sse.handle_post_message)

# HTTP transport ───────────────────────────────────────

session_manager = StreamableHTTPSessionManager(
app=mcp,
event_store=None,
json_response=True,
stateless=True,
)

async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)

@contextlib.asynccontextmanager
async def mcp_lifespan(_: Starlette) -> AsyncIterator[None]:
"""Context manager for session manager."""
async with session_manager.run():
print("Application started with StreamableHTTP session manager!")
try:
yield
finally:
print("Application shutting down...")

app.router.mount(
f"{base}/http",
app=handle_streamable_http,
)

# ── schema endpoint ───────────────────────────────────────
@app.get(f"{base}/schema")
async def _schema_endpoint():
return JSONResponse({
"tools": [x.model_dump() for x in await _list_tools()],
"resources": [x.model_dump() for x in await _list_resources()],
"resource_templates": [x.model_dump() for x in await _list_templates()],
})
return JSONResponse(
{
"tools": [x.model_dump() for x in await _list_tools()],
"resources": [x.model_dump() for x in await _list_resources()],
"resource_templates": [x.model_dump() for x in await _list_templates()],
}
)

return mcp_lifespan


# ── helpers ────────────────────────────────────────────────────
def _route_name(path: str) -> str:
return re.sub(r"[/{}}]", "_", path).strip("_")


def _path_params(app: FastAPI, fn: Callable) -> List[str]:
for r in app.routes:
if r.endpoint is fn:
Expand Down
39 changes: 23 additions & 16 deletions deploy/docker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@
)

from utils import (
FilterType, load_config, setup_logging, verify_email_domain
FilterType, combine_lifespans, load_config, setup_logging, verify_email_domain
)
import os
import sys
import time
import asyncio
from typing import List
from contextlib import asynccontextmanager
import pathlib

Expand Down Expand Up @@ -104,8 +103,8 @@ async def capped_arun(self, *a, **kw):
@asynccontextmanager
async def lifespan(_: FastAPI):
await get_crawler(BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
)) # warm‑up
app.state.janitor = asyncio.create_task(janitor()) # idle GC
yield
Expand All @@ -116,8 +115,7 @@ async def lifespan(_: FastAPI):
app = FastAPI(
title=config["app"]["title"],
version=config["app"]["version"],
lifespan=lifespan,
)
) #all lifespans will be added just before passing app to asgi server

# ── static playground ──────────────────────────────────────
STATIC_DIR = pathlib.Path(__file__).parent / "static" / "playground"
Expand Down Expand Up @@ -244,11 +242,11 @@ async def get_markdown(
body.url, body.f, body.q, body.c, config
)
return JSONResponse({
"url": body.url,
"filter": body.f,
"query": body.q,
"cache": body.c,
"markdown": markdown,
"url": body.url,
"filter": body.f,
"query": body.q,
"cache": body.c,
"markdown": markdown,
"success": True
})

Expand Down Expand Up @@ -527,7 +525,7 @@ async def get_context(
20, ge=1, description="absolute cap on returned chunks"),
):
"""
This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai.
This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai.
You can use this as a context for any AI assistant. Use this endpoint for AI assistants to retrieve library context for decision making or code generation tasks.
Alway is BEST practice you provide a query to filter the context. Otherwise the lenght of the response will be very long.

Expand Down Expand Up @@ -562,8 +560,8 @@ async def get_context(
if context_type == "doc":
return JSONResponse({"doc_context": doc_content})
return JSONResponse({
"code_context": code_content,
"doc_context": doc_content,
"code_context": code_content,
"doc_context": doc_content,
})

tokens = query.split()
Expand Down Expand Up @@ -598,13 +596,22 @@ async def get_context(
return JSONResponse(results)


# attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema)
# attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema, /mcp/http)
print(f"MCP server running on {config['app']['host']}:{config['app']['port']}")
attach_mcp(
mcp_lifespan = attach_mcp(
app,
base_url=f"http://{config['app']['host']}:{config['app']['port']}"
)






# ───────────────────── Combined FastAPI lifespan ──────────────────────

app.router.lifespan_context = combine_lifespans(lifespan, mcp_lifespan)

# ────────────────────────── cli ──────────────────────────────
if __name__ == "__main__":
import uvicorn
Expand Down
21 changes: 20 additions & 1 deletion deploy/docker/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import dns.resolver
import logging
import yaml
Expand Down Expand Up @@ -63,4 +64,22 @@ def verify_email_domain(email: str) -> bool:
records = dns.resolver.resolve(domain, 'MX')
return True if records else False
except Exception as e:
return False
return False


def combine_lifespans(*lifespans):
@contextlib.asynccontextmanager
async def combined(app):
# Nest the lifespans like contextlib.ExitStack does for async
stack = contextlib.AsyncExitStack()
await stack.__aenter__()

try:
for lifespan in lifespans:

await stack.enter_async_context(lifespan(app))
yield
finally:
await stack.__aexit__(None, None, None)

return combined
Loading