diff --git a/gateway/platforms/_http_client_limits.py b/gateway/platforms/_http_client_limits.py new file mode 100644 index 00000000000..4d8a7c86e93 --- /dev/null +++ b/gateway/platforms/_http_client_limits.py @@ -0,0 +1,84 @@ +"""Shared HTTP client factory for long-lived platform adapters. + +Gateway messaging platforms (QQ Bot, Feishu, WeCom, DingTalk, Signal, +BlueBubbles, WeCom-callback) keep a persistent ``httpx.AsyncClient`` +alive for the adapter's lifetime. That amortises TLS/connection setup +across many API calls, but it also means the process's file-descriptor +pressure is sensitive to how aggressively the pool recycles idle keep- +alive connections. + +httpx's default ``keepalive_expiry`` is 5 seconds. On macOS behind +Cloudflare Warp (and other transparent proxies), peer-initiated FIN can +sit in ``CLOSE_WAIT`` longer than that before the local socket actually +drains — which, multiplied across 7 long-lived adapters plus the LLM +client and MCP clients, walks straight into the default 256 fd limit. +See #18451. + +``platform_httpx_limits()`` returns a tighter ``httpx.Limits`` the +adapter factories use instead of the httpx default. The values chosen: + +* ``max_keepalive_connections=10`` — plenty for any single adapter; + platform APIs rarely parallelise beyond this. +* ``keepalive_expiry=2.0`` — close idle sockets aggressively so a + proxy's lingering CLOSE_WAIT window can't starve the process. + +Override via ``HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY`` / +``HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE`` env vars when tuning under load. +""" + +from __future__ import annotations + +import os + +try: + import httpx +except ImportError: # pragma: no cover — optional dep + httpx = None # type: ignore[assignment] + + +_DEFAULT_KEEPALIVE_EXPIRY_S = 2.0 +_DEFAULT_MAX_KEEPALIVE = 10 + + +def platform_httpx_limits() -> "httpx.Limits | None": + """Return ``httpx.Limits`` tuned for persistent platform-adapter clients. + + Returns ``None`` when httpx isn't importable, so callers can fall + back to httpx's built-in default without a hard dependency on this + helper being reachable. + """ + if httpx is None: + return None + + def _env_float(name: str, default: float) -> float: + raw = os.environ.get(name, "").strip() + if not raw: + return default + try: + val = float(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name, "").strip() + if not raw: + return default + try: + val = int(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + keepalive_expiry = _env_float( + "HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", _DEFAULT_KEEPALIVE_EXPIRY_S + ) + max_keepalive = _env_int( + "HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", _DEFAULT_MAX_KEEPALIVE + ) + + return httpx.Limits( + max_keepalive_connections=max_keepalive, + # Leave max_connections at httpx default (100) — plenty of headroom. + keepalive_expiry=keepalive_expiry, + ) diff --git a/gateway/platforms/bluebubbles.py b/gateway/platforms/bluebubbles.py index afcbf1a7e47..31120785c09 100644 --- a/gateway/platforms/bluebubbles.py +++ b/gateway/platforms/bluebubbles.py @@ -162,7 +162,9 @@ async def connect(self) -> bool: return False from aiohttp import web - self.client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits()) try: await self._api_get("/api/v1/ping") info = await self._api_get("/api/v1/server/info") diff --git a/gateway/platforms/dingtalk.py b/gateway/platforms/dingtalk.py index 3037e402b2c..f1520e22c65 100644 --- a/gateway/platforms/dingtalk.py +++ b/gateway/platforms/dingtalk.py @@ -228,7 +228,11 @@ async def connect(self) -> bool: return False try: - self._http_client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient( + timeout=30.0, limits=platform_httpx_limits(), + ) credential = dingtalk_stream.Credential( self._client_id, self._client_secret diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 8bc2ae816ed..a6b522c4a24 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -2922,13 +2922,18 @@ async def _download_remote_document( }, ) response.raise_for_status() + # Snapshot Content-Type and body while the client context is + # still active so pooled connections fully release on exit. + # See #18451. + content_type_hdr = str(response.headers.get("Content-Type", "")) + body = response.content filename = self._derive_remote_filename( file_url, - content_type=str(response.headers.get("Content-Type", "")), + content_type=content_type_hdr, default_name=preferred_name, default_ext=default_ext, ) - cached_path = cache_document_from_bytes(response.content, filename) + cached_path = cache_document_from_bytes(body, filename) return cached_path, filename @staticmethod diff --git a/gateway/platforms/qqbot/adapter.py b/gateway/platforms/qqbot/adapter.py index 10e1f62e72c..c6e5d428c6e 100644 --- a/gateway/platforms/qqbot/adapter.py +++ b/gateway/platforms/qqbot/adapter.py @@ -243,10 +243,14 @@ async def connect(self) -> bool: return False try: + # Tighter keepalive pool so idle CLOSE_WAIT sockets drain + # faster behind proxies like Cloudflare Warp (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits self._http_client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, event_hooks={"response": [_ssrf_redirect_guard]}, + limits=platform_httpx_limits(), ) # 1. Get access token diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 225430600df..77d3c18cb61 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -248,7 +248,9 @@ async def connect(self) -> bool: except Exception as e: logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e) - self.client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits()) try: # Health check — verify signal-cli daemon is reachable try: diff --git a/gateway/platforms/wecom.py b/gateway/platforms/wecom.py index 7ba0fa21b90..453b95a7178 100644 --- a/gateway/platforms/wecom.py +++ b/gateway/platforms/wecom.py @@ -206,7 +206,11 @@ async def connect(self) -> bool: return False try: - self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient( + timeout=30.0, follow_redirects=True, limits=platform_httpx_limits(), + ) await self._open_connection() self._mark_connected() self._listen_task = asyncio.create_task(self._listen_loop()) diff --git a/gateway/platforms/wecom_callback.py b/gateway/platforms/wecom_callback.py index 5440792dea1..139c67fe7c1 100644 --- a/gateway/platforms/wecom_callback.py +++ b/gateway/platforms/wecom_callback.py @@ -119,7 +119,9 @@ async def connect(self) -> bool: pass try: - self._http_client = httpx.AsyncClient(timeout=20.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient(timeout=20.0, limits=platform_httpx_limits()) self._app = web.Application() self._app.router.add_get("/health", self._handle_health) self._app.router.add_get(self._path, self._handle_verify) diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index b3e655a51b6..921dd70d722 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -902,11 +902,15 @@ async def send_typing(self, chat_id: str, metadata=None) -> None: try: import aiohttp - await self._http_session.post( + # Must wrap in `async with` — a bare `await session.post(...)` + # leaves the response object alive until GC, holding its TCP + # socket in CLOSE_WAIT. See #18451. + async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/typing", json={"chatId": chat_id}, timeout=aiohttp.ClientTimeout(total=5) - ) + ): + pass except Exception: pass # Ignore typing indicator failures diff --git a/scripts/release.py b/scripts/release.py index 939a485d6b0..0c046ee46e3 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -373,6 +373,7 @@ "h3057183414@gmail.com": "CoreyNoDream", "franksong2702@gmail.com": "franksong2702", "673088860@qq.com": "ambition0802", + "beibei1988@proton.me": "beibi9966", # ── bulk addition: 75 emails resolved via API, PR salvage bodies, noreply # crossref, and GH contributor list matching (April 2026 audit) ── "1115117931@qq.com": "aaronagent", diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index ea5a8057293..8042d38e3f3 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -1771,6 +1771,69 @@ async def _run(): self.assertIn("GIF downgraded to file", caption) self.assertIn("look", caption) + def test_download_remote_document_reads_response_before_httpx_client_closes(self): + """#18451 — snapshot Content-Type + body while the httpx.AsyncClient + context is still active so pooled connections fully release on + exit. Otherwise the response is only readable because httpx + eagerly buffers it; a future refactor to .stream() would silently + read-after-close.""" + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + events: list[str] = [] + + class _FakeResponse: + headers = {"Content-Type": "application/octet-stream"} + + def raise_for_status(self) -> None: + events.append("raise_for_status") + + @property + def content(self) -> bytes: + events.append("content_read") + return b"doc-bytes" + + class _FakeAsyncClient: + def __init__(self, *_a: object, **_k: object) -> None: + pass + + async def __aenter__(self) -> "_FakeAsyncClient": + events.append("client_enter") + return self + + async def __aexit__(self, *exc: object) -> None: + events.append("client_exit") + + async def get(self, *_a: object, **_k: object) -> _FakeResponse: + events.append("get") + return _FakeResponse() + + with tempfile.TemporaryDirectory() as tmp: + with patch.dict(os.environ, {"HERMES_HOME": tmp}, clear=False): + adapter = FeishuAdapter(PlatformConfig()) + + async def _run() -> tuple[str, str]: + with patch("tools.url_safety.is_safe_url", return_value=True): + with patch("httpx.AsyncClient", _FakeAsyncClient): + with patch( + "gateway.platforms.feishu.cache_document_from_bytes", + return_value="/tmp/cached-doc.bin", + ): + return await adapter._download_remote_document( + "https://example.com/doc.bin", + default_ext=".bin", + preferred_name="doc", + ) + + path, filename = asyncio.run(_run()) + + self.assertEqual(path, "/tmp/cached-doc.bin") + self.assertTrue(filename) + # content_read MUST happen before client_exit — otherwise we're + # reading response body after the connection pool has been torn + # down, which only works by accident (httpx's eager buffering). + self.assertLess(events.index("content_read"), events.index("client_exit")) + def test_dedup_state_persists_across_adapter_restart(self): from gateway.config import PlatformConfig from gateway.platforms.feishu import FeishuAdapter diff --git a/tests/gateway/test_platform_http_client_limits.py b/tests/gateway/test_platform_http_client_limits.py new file mode 100644 index 00000000000..fe613fb1f08 --- /dev/null +++ b/tests/gateway/test_platform_http_client_limits.py @@ -0,0 +1,114 @@ +"""Tests for the shared httpx.Limits helper that all long-lived platform +adapters use to tighten their keep-alive pool. + +Context: #18451 — on macOS behind Cloudflare Warp, httpx's default +keepalive_expiry=5s let idle CLOSE_WAIT sockets accumulate across +multiple long-lived gateway adapters (QQ Bot, Feishu, WeCom, DingTalk, +Signal, BlueBubbles, WeCom-callback) until the process hit the default +256 fd limit. These tests just verify the helper returns sensibly +tuned limits and respects env-var overrides; the actual fd-pressure +behaviour is only observable at runtime under load. +""" + +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(autouse=True) +def _clear_env(monkeypatch): + monkeypatch.delenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", raising=False) + + +def test_returns_none_when_httpx_unavailable(monkeypatch): + """If httpx can't be imported, the helper returns None so callers + fall back to httpx's built-in Limits default without raising.""" + import gateway.platforms._http_client_limits as mod + monkeypatch.setattr(mod, "httpx", None) + assert mod.platform_httpx_limits() is None + + +def test_default_limits_tighten_keepalive_below_httpx_default(): + import httpx + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert isinstance(limits, httpx.Limits) + # httpx default keepalive_expiry is 5.0 — ours must be shorter so + # CLOSE_WAIT sockets drain promptly behind proxies like Warp. + assert limits.keepalive_expiry is not None + assert limits.keepalive_expiry < 5.0 + # max_keepalive_connections must be positive and reasonable for a + # single adapter (platform APIs rarely parallelise beyond ~10). + assert limits.max_keepalive_connections is not None + assert 1 <= limits.max_keepalive_connections <= 50 + + +def test_env_override_keepalive_expiry(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "7.5") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert limits.keepalive_expiry == 7.5 + + +def test_env_override_max_keepalive(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "25") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert limits.max_keepalive_connections == 25 + + +def test_env_override_rejects_garbage(monkeypatch): + """Malformed env values fall back to defaults rather than raising.""" + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "not-a-number") + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "-3") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + # Non-positive / non-numeric → fell back to defaults (not the override values) + assert limits.keepalive_expiry is not None and limits.keepalive_expiry > 0 + assert limits.max_keepalive_connections is not None + assert limits.max_keepalive_connections > 0 + + +def test_helper_is_importable_from_every_platform_that_uses_it(): + """Every persistent-httpx-client platform adapter imports this helper. + If any of those modules fails to import, this test surfaces it before + the regression shows up as a runtime adapter-startup crash.""" + # Just importing exercises the helper's import path for each adapter. + import gateway.platforms.qqbot.adapter # noqa: F401 + import gateway.platforms.wecom # noqa: F401 + import gateway.platforms.dingtalk # noqa: F401 + import gateway.platforms.signal # noqa: F401 + import gateway.platforms.bluebubbles # noqa: F401 + import gateway.platforms.wecom_callback # noqa: F401 + + +class TestWhatsappTypingLeakFix: + """#18451 — whatsapp.send_typing previously used a bare + `await self._http_session.post(...)` which leaked the aiohttp + response object until GC, holding its TCP socket in CLOSE_WAIT. + Must now wrap the call in `async with` so the response is + released immediately when the call returns. + + We verify by inspecting the source text rather than exercising + the coroutine — the test suite would otherwise need a live + aiohttp server, and the contract we care about is structural. + """ + + def test_bare_await_removed(self): + import inspect + import gateway.platforms.whatsapp as mod + + src = inspect.getsource(mod.WhatsAppAdapter.send_typing) + # The fix must be structural: the post() call is inside an + # `async with`, not a bare `await`. + assert "async with self._http_session.post(" in src, ( + "send_typing must wrap self._http_session.post(...) in " + "`async with` to release the aiohttp response socket " + "(#18451). Otherwise the response sits in CLOSE_WAIT " + "until GC." + ) + # The old bare-await form must be gone. + assert "await self._http_session.post(" not in src