Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 77 additions & 19 deletions gateway/platforms/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,12 +1693,12 @@ async def _dispatch(it) -> None:
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
if isinstance(result, dict) and result.get("messages"):
full_history.extend(result["messages"])
else:
full_history.append({"role": "assistant", "content": final_response_text})
full_history = self._build_response_conversation_history(
conversation_history,
user_message,
result,
final_response_text,
)
_persist_response_snapshot(
completed_env,
conversation_history_snapshot=full_history,
Expand Down Expand Up @@ -1965,17 +1965,22 @@ async def _compute_response():

# Build the full conversation history for storage
# (includes tool calls from the agent run)
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
# Add agent's internal messages if available
agent_messages = result.get("messages", [])
if agent_messages:
full_history.extend(agent_messages)
else:
full_history.append({"role": "assistant", "content": final_response})
full_history = self._build_response_conversation_history(
conversation_history,
user_message,
result,
final_response,
)

# Build output items (includes tool calls + final message)
output_items = self._extract_output_items(result)
# Build output items from the current turn only. AIAgent returns a
# full transcript in result["messages"], while older/mocked paths may
# return only the current turn suffix.
output_start_index = self._response_messages_turn_start_index(
conversation_history,
user_message,
result,
)
output_items = self._extract_output_items(result, start_index=output_start_index)

response_data = {
"id": response_id,
Expand Down Expand Up @@ -2264,17 +2269,70 @@ async def _handle_run_job(self, request: "web.Request") -> "web.Response":
# ------------------------------------------------------------------

@staticmethod
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
def _build_response_conversation_history(
conversation_history: List[Dict[str, Any]],
user_message: Any,
result: Dict[str, Any],
final_response: Any,
) -> List[Dict[str, Any]]:
"""Build the stored Responses transcript without duplicating history."""
prior = list(conversation_history)
current_user = {"role": "user", "content": user_message}
agent_messages = result.get("messages") if isinstance(result, dict) else None

if isinstance(agent_messages, list) and agent_messages:
turn_start = APIServerAdapter._response_messages_turn_start_index(
conversation_history,
user_message,
result,
)
if turn_start:
return list(agent_messages)

full_history = prior
full_history.append(current_user)
full_history.extend(agent_messages)
return full_history

full_history = prior
full_history.append(current_user)
full_history.append({"role": "assistant", "content": final_response})
return full_history

@staticmethod
def _response_messages_turn_start_index(
conversation_history: List[Dict[str, Any]],
user_message: Any,
result: Dict[str, Any],
) -> int:
"""Detect transcript-shaped result["messages"] and return turn start."""
agent_messages = result.get("messages") if isinstance(result, dict) else None
if not isinstance(agent_messages, list) or not agent_messages:
return 0

prior = list(conversation_history)
current_user = {"role": "user", "content": user_message}
expected_prefix = prior + [current_user]
if agent_messages[:len(expected_prefix)] == expected_prefix:
return len(expected_prefix)
if prior and agent_messages[:len(prior)] == prior:
return len(prior)
return 0

@staticmethod
def _extract_output_items(result: Dict[str, Any], start_index: int = 0) -> List[Dict[str, Any]]:
"""
Build the full output item array from the agent's messages.
Build the output item array from the agent's messages.

Walks *result["messages"]* and emits:
Walks *result["messages"]* starting at *start_index* and emits:
- ``function_call`` items for each tool_call on assistant messages
- ``function_call_output`` items for each tool-role message
- a final ``message`` item with the assistant's text reply
"""
items: List[Dict[str, Any]] = []
messages = result.get("messages", [])
if start_index > 0:
messages = messages[start_index:]

for msg in messages:
role = msg.get("role")
Expand Down
205 changes: 205 additions & 0 deletions tests/gateway/test_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,146 @@ async def test_previous_response_id_chaining(self, adapter):
assert len(call_kwargs["conversation_history"]) > 0
assert call_kwargs["user_message"] == "Now add 1 more"

@pytest.mark.asyncio
async def test_previous_response_id_stores_full_agent_transcript_once(self, adapter):
"""Chained Responses storage must not append result["messages"] twice."""
first_history = [
{"role": "user", "content": "What is 1+1?"},
{"role": "assistant", "content": "2"},
]

app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (
{
"final_response": "2",
"messages": list(first_history),
"api_calls": 1,
},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
resp1 = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "What is 1+1?"},
)

assert resp1.status == 200
resp1_data = await resp1.json()
stored_first = adapter._response_store.get(resp1_data["id"])
assert stored_first["conversation_history"] == first_history

second_history = first_history + [
{"role": "user", "content": "Now add 1 more"},
{"role": "assistant", "content": "3"},
]
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (
{
"final_response": "3",
"messages": list(second_history),
"api_calls": 1,
},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
resp2 = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": "Now add 1 more",
"previous_response_id": resp1_data["id"],
},
)

assert resp2.status == 200
resp2_data = await resp2.json()
stored_second = adapter._response_store.get(resp2_data["id"])
stored_history = stored_second["conversation_history"]
assert stored_history == second_history
assert stored_history.count(first_history[0]) == 1
assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1

@pytest.mark.asyncio
async def test_previous_response_id_outputs_only_current_turn_items(self, adapter):
"""Response output must not replay previous tool artifacts."""
prior_history = [
{"role": "user", "content": "Read old file"},
{
"role": "assistant",
"tool_calls": [
{
"id": "call_old",
"function": {
"name": "read_file",
"arguments": '{"path":"old.txt"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_old",
"content": '{"content":"old"}',
},
{"role": "assistant", "content": "old"},
]
adapter._response_store.put(
"resp_prev",
{
"response": {"id": "resp_prev", "status": "completed"},
"conversation_history": list(prior_history),
"session_id": "api-test-session",
},
)
full_agent_transcript = prior_history + [
{"role": "user", "content": "Read new file"},
{
"role": "assistant",
"tool_calls": [
{
"id": "call_new",
"function": {
"name": "read_file",
"arguments": '{"path":"new.txt"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_new",
"content": '{"content":"new"}',
},
{"role": "assistant", "content": "new"},
]

app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (
{
"final_response": "new",
"messages": list(full_agent_transcript),
"api_calls": 1,
},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
resp = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": "Read new file",
"previous_response_id": "resp_prev",
},
)
assert resp.status == 200
data = await resp.json()

output_json = json.dumps(data["output"])
assert "call_new" in output_json
assert "call_old" not in output_json
assert "old.txt" not in output_json

@pytest.mark.asyncio
async def test_previous_response_id_preserves_session(self, adapter):
"""Chained responses via previous_response_id reuse the same session_id."""
Expand Down Expand Up @@ -1580,6 +1720,71 @@ async def _mock_run_agent(**kwargs):
assert data["status"] == "completed"
assert data["output"][-1]["content"][0]["text"] == "Stored response"

@pytest.mark.asyncio
async def test_streamed_previous_response_id_stores_full_agent_transcript_once(self, adapter):
prior_history = [
{"role": "user", "content": "What is 1+1?"},
{"role": "assistant", "content": "2"},
]
adapter._response_store.put(
"resp_prev",
{
"response": {"id": "resp_prev", "status": "completed"},
"conversation_history": list(prior_history),
"session_id": "api-test-session",
},
)

expected_history = prior_history + [
{"role": "user", "content": "Now add 1 more"},
{"role": "assistant", "content": "3"},
]

app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
async def _mock_run_agent(**kwargs):
cb = kwargs.get("stream_delta_callback")
if cb:
cb("3")
return (
{
"final_response": "3",
"messages": list(expected_history),
"api_calls": 1,
},
{"input_tokens": 1, "output_tokens": 1, "total_tokens": 2},
)

with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent):
resp = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": "Now add 1 more",
"previous_response_id": "resp_prev",
"stream": True,
},
)
body = await resp.text()

assert resp.status == 200
response_id = None
for line in body.splitlines():
if line.startswith("data: "):
try:
payload = json.loads(line[len("data: "):])
except json.JSONDecodeError:
continue
if payload.get("type") == "response.completed":
response_id = payload["response"]["id"]
break

assert response_id
stored_history = adapter._response_store.get(response_id)["conversation_history"]
assert stored_history == expected_history
assert stored_history.count(prior_history[0]) == 1
assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1

@pytest.mark.asyncio
async def test_stream_cancelled_persists_incomplete_snapshot(self, adapter):
"""Server-side asyncio.CancelledError (shutdown, request timeout) must
Expand Down