Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions gateway/platforms/_http_client_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Shared HTTP client factory for long-lived platform adapters.

Gateway messaging platforms (QQ Bot, Feishu, WeCom, DingTalk, Signal,
BlueBubbles, WeCom-callback) keep a persistent ``httpx.AsyncClient``
alive for the adapter's lifetime. That amortises TLS/connection setup
across many API calls, but it also means the process's file-descriptor
pressure is sensitive to how aggressively the pool recycles idle keep-
alive connections.

httpx's default ``keepalive_expiry`` is 5 seconds. On macOS behind
Cloudflare Warp (and other transparent proxies), peer-initiated FIN can
sit in ``CLOSE_WAIT`` longer than that before the local socket actually
drains — which, multiplied across 7 long-lived adapters plus the LLM
client and MCP clients, walks straight into the default 256 fd limit.
See #18451.

``platform_httpx_limits()`` returns a tighter ``httpx.Limits`` the
adapter factories use instead of the httpx default. The values chosen:

* ``max_keepalive_connections=10`` — plenty for any single adapter;
platform APIs rarely parallelise beyond this.
* ``keepalive_expiry=2.0`` — close idle sockets aggressively so a
proxy's lingering CLOSE_WAIT window can't starve the process.

Override via ``HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY`` /
``HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE`` env vars when tuning under load.
"""

from __future__ import annotations

import os

try:
import httpx
except ImportError: # pragma: no cover — optional dep
httpx = None # type: ignore[assignment]


_DEFAULT_KEEPALIVE_EXPIRY_S = 2.0
_DEFAULT_MAX_KEEPALIVE = 10


def platform_httpx_limits() -> "httpx.Limits | None":
"""Return ``httpx.Limits`` tuned for persistent platform-adapter clients.

Returns ``None`` when httpx isn't importable, so callers can fall
back to httpx's built-in default without a hard dependency on this
helper being reachable.
"""
if httpx is None:
return None

def _env_float(name: str, default: float) -> float:
raw = os.environ.get(name, "").strip()
if not raw:
return default
try:
val = float(raw)
except (TypeError, ValueError):
return default
return val if val > 0 else default

def _env_int(name: str, default: int) -> int:
raw = os.environ.get(name, "").strip()
if not raw:
return default
try:
val = int(raw)
except (TypeError, ValueError):
return default
return val if val > 0 else default

keepalive_expiry = _env_float(
"HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", _DEFAULT_KEEPALIVE_EXPIRY_S
)
max_keepalive = _env_int(
"HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", _DEFAULT_MAX_KEEPALIVE
)

return httpx.Limits(
max_keepalive_connections=max_keepalive,
# Leave max_connections at httpx default (100) — plenty of headroom.
keepalive_expiry=keepalive_expiry,
)
4 changes: 3 additions & 1 deletion gateway/platforms/bluebubbles.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ async def connect(self) -> bool:
return False
from aiohttp import web

self.client = httpx.AsyncClient(timeout=30.0)
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits())
try:
await self._api_get("/api/v1/ping")
info = await self._api_get("/api/v1/server/info")
Expand Down
6 changes: 5 additions & 1 deletion gateway/platforms/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ async def connect(self) -> bool:
return False

try:
self._http_client = httpx.AsyncClient(timeout=30.0)
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self._http_client = httpx.AsyncClient(
timeout=30.0, limits=platform_httpx_limits(),
)

credential = dingtalk_stream.Credential(
self._client_id, self._client_secret
Expand Down
9 changes: 7 additions & 2 deletions gateway/platforms/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2922,13 +2922,18 @@ async def _download_remote_document(
},
)
response.raise_for_status()
# Snapshot Content-Type and body while the client context is
# still active so pooled connections fully release on exit.
# See #18451.
content_type_hdr = str(response.headers.get("Content-Type", ""))
body = response.content
filename = self._derive_remote_filename(
file_url,
content_type=str(response.headers.get("Content-Type", "")),
content_type=content_type_hdr,
default_name=preferred_name,
default_ext=default_ext,
)
cached_path = cache_document_from_bytes(response.content, filename)
cached_path = cache_document_from_bytes(body, filename)
return cached_path, filename

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions gateway/platforms/qqbot/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ async def connect(self) -> bool:
return False

try:
# Tighter keepalive pool so idle CLOSE_WAIT sockets drain
# faster behind proxies like Cloudflare Warp (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self._http_client = httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
event_hooks={"response": [_ssrf_redirect_guard]},
limits=platform_httpx_limits(),
)

# 1. Get access token
Expand Down
4 changes: 3 additions & 1 deletion gateway/platforms/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ async def connect(self) -> bool:
except Exception as e:
logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e)

self.client = httpx.AsyncClient(timeout=30.0)
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits())
try:
# Health check — verify signal-cli daemon is reachable
try:
Expand Down
6 changes: 5 additions & 1 deletion gateway/platforms/wecom.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ async def connect(self) -> bool:
return False

try:
self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True)
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self._http_client = httpx.AsyncClient(
timeout=30.0, follow_redirects=True, limits=platform_httpx_limits(),
)
await self._open_connection()
self._mark_connected()
self._listen_task = asyncio.create_task(self._listen_loop())
Expand Down
4 changes: 3 additions & 1 deletion gateway/platforms/wecom_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ async def connect(self) -> bool:
pass

try:
self._http_client = httpx.AsyncClient(timeout=20.0)
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
from gateway.platforms._http_client_limits import platform_httpx_limits
self._http_client = httpx.AsyncClient(timeout=20.0, limits=platform_httpx_limits())
self._app = web.Application()
self._app.router.add_get("/health", self._handle_health)
self._app.router.add_get(self._path, self._handle_verify)
Expand Down
8 changes: 6 additions & 2 deletions gateway/platforms/whatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,15 @@ async def send_typing(self, chat_id: str, metadata=None) -> None:
try:
import aiohttp

await self._http_session.post(
# Must wrap in `async with` — a bare `await session.post(...)`
# leaves the response object alive until GC, holding its TCP
# socket in CLOSE_WAIT. See #18451.
async with self._http_session.post(
f"http://127.0.0.1:{self._bridge_port}/typing",
json={"chatId": chat_id},
timeout=aiohttp.ClientTimeout(total=5)
)
):
pass
except Exception:
pass # Ignore typing indicator failures

Expand Down
1 change: 1 addition & 0 deletions scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@
"h3057183414@gmail.com": "CoreyNoDream",
"franksong2702@gmail.com": "franksong2702",
"673088860@qq.com": "ambition0802",
"beibei1988@proton.me": "beibi9966",
# ── bulk addition: 75 emails resolved via API, PR salvage bodies, noreply
# crossref, and GH contributor list matching (April 2026 audit) ──
"1115117931@qq.com": "aaronagent",
Expand Down
63 changes: 63 additions & 0 deletions tests/gateway/test_feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,69 @@ async def _run():
self.assertIn("GIF downgraded to file", caption)
self.assertIn("look", caption)

def test_download_remote_document_reads_response_before_httpx_client_closes(self):
"""#18451 — snapshot Content-Type + body while the httpx.AsyncClient
context is still active so pooled connections fully release on
exit. Otherwise the response is only readable because httpx
eagerly buffers it; a future refactor to .stream() would silently
read-after-close."""
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter

events: list[str] = []

class _FakeResponse:
headers = {"Content-Type": "application/octet-stream"}

def raise_for_status(self) -> None:
events.append("raise_for_status")

@property
def content(self) -> bytes:
events.append("content_read")
return b"doc-bytes"

class _FakeAsyncClient:
def __init__(self, *_a: object, **_k: object) -> None:
pass

async def __aenter__(self) -> "_FakeAsyncClient":
events.append("client_enter")
return self

async def __aexit__(self, *exc: object) -> None:
events.append("client_exit")

async def get(self, *_a: object, **_k: object) -> _FakeResponse:
events.append("get")
return _FakeResponse()

with tempfile.TemporaryDirectory() as tmp:
with patch.dict(os.environ, {"HERMES_HOME": tmp}, clear=False):
adapter = FeishuAdapter(PlatformConfig())

async def _run() -> tuple[str, str]:
with patch("tools.url_safety.is_safe_url", return_value=True):
with patch("httpx.AsyncClient", _FakeAsyncClient):
with patch(
"gateway.platforms.feishu.cache_document_from_bytes",
return_value="/tmp/cached-doc.bin",
):
return await adapter._download_remote_document(
"https://example.com/doc.bin",
default_ext=".bin",
preferred_name="doc",
)

path, filename = asyncio.run(_run())

self.assertEqual(path, "/tmp/cached-doc.bin")
self.assertTrue(filename)
# content_read MUST happen before client_exit — otherwise we're
# reading response body after the connection pool has been torn
# down, which only works by accident (httpx's eager buffering).
self.assertLess(events.index("content_read"), events.index("client_exit"))

def test_dedup_state_persists_across_adapter_restart(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
Expand Down
114 changes: 114 additions & 0 deletions tests/gateway/test_platform_http_client_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Tests for the shared httpx.Limits helper that all long-lived platform
adapters use to tighten their keep-alive pool.

Context: #18451 — on macOS behind Cloudflare Warp, httpx's default
keepalive_expiry=5s let idle CLOSE_WAIT sockets accumulate across
multiple long-lived gateway adapters (QQ Bot, Feishu, WeCom, DingTalk,
Signal, BlueBubbles, WeCom-callback) until the process hit the default
256 fd limit. These tests just verify the helper returns sensibly
tuned limits and respects env-var overrides; the actual fd-pressure
behaviour is only observable at runtime under load.
"""

from __future__ import annotations

import os

import pytest


@pytest.fixture(autouse=True)
def _clear_env(monkeypatch):
monkeypatch.delenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", raising=False)
monkeypatch.delenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", raising=False)


def test_returns_none_when_httpx_unavailable(monkeypatch):
"""If httpx can't be imported, the helper returns None so callers
fall back to httpx's built-in Limits default without raising."""
import gateway.platforms._http_client_limits as mod
monkeypatch.setattr(mod, "httpx", None)
assert mod.platform_httpx_limits() is None


def test_default_limits_tighten_keepalive_below_httpx_default():
import httpx
from gateway.platforms._http_client_limits import platform_httpx_limits
limits = platform_httpx_limits()
assert isinstance(limits, httpx.Limits)
# httpx default keepalive_expiry is 5.0 — ours must be shorter so
# CLOSE_WAIT sockets drain promptly behind proxies like Warp.
assert limits.keepalive_expiry is not None
assert limits.keepalive_expiry < 5.0
# max_keepalive_connections must be positive and reasonable for a
# single adapter (platform APIs rarely parallelise beyond ~10).
assert limits.max_keepalive_connections is not None
assert 1 <= limits.max_keepalive_connections <= 50


def test_env_override_keepalive_expiry(monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "7.5")
from gateway.platforms._http_client_limits import platform_httpx_limits
limits = platform_httpx_limits()
assert limits.keepalive_expiry == 7.5


def test_env_override_max_keepalive(monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "25")
from gateway.platforms._http_client_limits import platform_httpx_limits
limits = platform_httpx_limits()
assert limits.max_keepalive_connections == 25


def test_env_override_rejects_garbage(monkeypatch):
"""Malformed env values fall back to defaults rather than raising."""
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "not-a-number")
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "-3")
from gateway.platforms._http_client_limits import platform_httpx_limits
limits = platform_httpx_limits()
# Non-positive / non-numeric → fell back to defaults (not the override values)
assert limits.keepalive_expiry is not None and limits.keepalive_expiry > 0
assert limits.max_keepalive_connections is not None
assert limits.max_keepalive_connections > 0


def test_helper_is_importable_from_every_platform_that_uses_it():
"""Every persistent-httpx-client platform adapter imports this helper.
If any of those modules fails to import, this test surfaces it before
the regression shows up as a runtime adapter-startup crash."""
# Just importing exercises the helper's import path for each adapter.
import gateway.platforms.qqbot.adapter # noqa: F401
import gateway.platforms.wecom # noqa: F401
import gateway.platforms.dingtalk # noqa: F401
import gateway.platforms.signal # noqa: F401
import gateway.platforms.bluebubbles # noqa: F401
import gateway.platforms.wecom_callback # noqa: F401


class TestWhatsappTypingLeakFix:
"""#18451 — whatsapp.send_typing previously used a bare
`await self._http_session.post(...)` which leaked the aiohttp
response object until GC, holding its TCP socket in CLOSE_WAIT.
Must now wrap the call in `async with` so the response is
released immediately when the call returns.

We verify by inspecting the source text rather than exercising
the coroutine — the test suite would otherwise need a live
aiohttp server, and the contract we care about is structural.
"""

def test_bare_await_removed(self):
import inspect
import gateway.platforms.whatsapp as mod

src = inspect.getsource(mod.WhatsAppAdapter.send_typing)
# The fix must be structural: the post() call is inside an
# `async with`, not a bare `await`.
assert "async with self._http_session.post(" in src, (
"send_typing must wrap self._http_session.post(...) in "
"`async with` to release the aiohttp response socket "
"(#18451). Otherwise the response sits in CLOSE_WAIT "
"until GC."
)
# The old bare-await form must be gone.
assert "await self._http_session.post(" not in src
Loading