Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ def __init__(self, config: Optional[GatewayConfig] = None):
# /new and /reset. /model and other mid-session operations
# preserve the queue.
self._queued_events: Dict[str, List[MessageEvent]] = {}
self._pending_native_image_paths_by_session: Dict[str, List[str]] = {}
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
self._session_run_generation: Dict[str, int] = {}

Expand Down Expand Up @@ -5079,22 +5080,29 @@ async def _prepare_inbound_message_text(
preprocessing pipeline so sender attribution, image enrichment, STT,
document notes, reply context, and @ references all behave the same.

Side effect: writes ``self._pending_native_image_paths`` to a list of
local image paths when the active model supports native vision AND
the user has images attached. The caller consumes and clears this
attribute at the ``run_conversation`` site to build a multimodal user
turn. When the list is empty, the ``_enrich_message_with_vision``
text path has already run and images are represented in-text.
Side effect: buffers per-session native image paths when the active
model supports native vision AND the user has images attached. The
caller consumes and clears that session-scoped buffer at the
``run_conversation`` site to build a multimodal user turn. When the
list is empty, the ``_enrich_message_with_vision`` text path has
already run and images are represented in-text.
"""
history = history or []
message_text = event.text or ""
# Reset per-call buffer; set only when native routing is chosen.
self._pending_native_image_paths = []
_group_sessions_per_user = getattr(self.config, "group_sessions_per_user", True)
_thread_sessions_per_user = getattr(self.config, "thread_sessions_per_user", False)
# Use the same helper every other call site uses so the write key here
# matches the consume key at the run_conversation site — even if the
# session store overrides build_session_key's default behavior.
session_key = self._session_key_for_source(source)
# Reset only this session's per-call buffer; other sessions may be
# concurrently preparing multimodal turns on the same runner.
self._consume_pending_native_image_paths(session_key)

_is_shared_multi_user = is_shared_multi_user_session(
source,
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
group_sessions_per_user=_group_sessions_per_user,
thread_sessions_per_user=_thread_sessions_per_user,
)
if _is_shared_multi_user and source.user_name:
message_text = f"[{source.user_name}] {message_text}"
Expand All @@ -5115,7 +5123,11 @@ async def _prepare_inbound_message_text(
_img_mode = self._decide_image_input_mode()
if _img_mode == "native":
# Defer attachment to the run_conversation call site.
self._pending_native_image_paths = list(image_paths)
pending_native = getattr(self, "_pending_native_image_paths_by_session", None)
if pending_native is None:
pending_native = {}
self._pending_native_image_paths_by_session = pending_native
pending_native[session_key] = list(image_paths)
logger.info(
"Image routing: native (model supports vision). %d image(s) will be attached inline.",
len(image_paths),
Expand Down Expand Up @@ -5254,6 +5266,12 @@ async def _prepare_inbound_message_text(

return message_text

def _consume_pending_native_image_paths(self, session_key: str) -> List[str]:
pending_native = getattr(self, "_pending_native_image_paths_by_session", None)
if not pending_native:
return []
return list(pending_native.pop(session_key, []) or [])

async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int):
"""Inner handler that runs under the _running_agents sentinel guard."""
_msg_start_time = time.time()
Expand Down Expand Up @@ -12129,8 +12147,7 @@ def _approval_notify_sync(approval_data: dict) -> None:
# attachment, wrap the user turn as an OpenAI-style multimodal
# content list. Consume-and-clear so subsequent turns on the same
# runner instance don't re-attach stale images.
_native_imgs = list(getattr(self, "_pending_native_image_paths", []) or [])
self._pending_native_image_paths = []
_native_imgs = self._consume_pending_native_image_paths(session_key)
if _native_imgs:
try:
from agent.image_routing import build_native_content_parts
Expand Down
79 changes: 79 additions & 0 deletions tests/gateway/test_native_image_buffer_isolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pytest

from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.run import GatewayRunner
from gateway.session import SessionSource, build_session_key


def _make_runner() -> GatewayRunner:
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake")},
)
runner.adapters = {}
runner._model = "openai/gpt-4.1-mini"
runner._base_url = None
runner._decide_image_input_mode = lambda: "native"
return runner


def _source(chat_id: str) -> SessionSource:
return SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="private",
user_name=f"user-{chat_id}",
)


def _image_event(source: SessionSource, path: str) -> MessageEvent:
return MessageEvent(
text="see image",
message_type=MessageType.PHOTO,
source=source,
media_urls=[path],
media_types=["image/png"],
)


@pytest.mark.asyncio
async def test_native_image_buffer_isolated_per_session():
runner = _make_runner()
source_a = _source("chat-a")
source_b = _source("chat-b")

await runner._prepare_inbound_message_text(
event=_image_event(source_a, "/tmp/a.png"),
source=source_a,
history=[],
)
await runner._prepare_inbound_message_text(
event=_image_event(source_b, "/tmp/b.png"),
source=source_b,
history=[],
)

assert runner._consume_pending_native_image_paths(build_session_key(source_a)) == ["/tmp/a.png"]
assert runner._consume_pending_native_image_paths(build_session_key(source_b)) == ["/tmp/b.png"]


@pytest.mark.asyncio
async def test_native_image_buffer_not_cleared_by_other_sessions_without_images():
runner = _make_runner()
source_a = _source("chat-a")
source_b = _source("chat-b")

await runner._prepare_inbound_message_text(
event=_image_event(source_a, "/tmp/a.png"),
source=source_a,
history=[],
)
await runner._prepare_inbound_message_text(
event=MessageEvent(text="plain text", source=source_b),
source=source_b,
history=[],
)

assert runner._consume_pending_native_image_paths(build_session_key(source_a)) == ["/tmp/a.png"]
assert runner._consume_pending_native_image_paths(build_session_key(source_b)) == []
Loading