Skip to content

Commit c24bddb

Browse files
authored
feat: add fixes to anthropic and bump version (#2427)
2 parents e278384 + bed8572 commit c24bddb

22 files changed

+824
-404
lines changed

letta/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
__version__ = "0.6.23"
2-
1+
__version__ = "0.6.24"
32

43
# import clients
54
from letta.client.client import LocalClient, RESTClient, create_client

letta/agent.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,6 @@ def _handle_ai_response(
447447
function_call = (
448448
response_message.function_call if response_message.function_call is not None else response_message.tool_calls[0].function
449449
)
450-
451-
# Get the name of the function
452450
function_name = function_call.name
453451
self.logger.info(f"Request to call function {function_name} with tool_call_id: {tool_call_id}")
454452

@@ -461,7 +459,9 @@ def _handle_ai_response(
461459
if not target_letta_tool:
462460
error_msg = f"No function named {function_name}"
463461
function_response = "None" # more like "never ran?"
464-
messages = self._handle_function_error_response(error_msg, tool_call_id, function_name, function_args, function_response, messages)
462+
messages = self._handle_function_error_response(
463+
error_msg, tool_call_id, function_name, function_args, function_response, messages
464+
)
465465
return messages, False, True # force a heartbeat to allow agent to handle error
466466

467467
# Failure case 2: function name is OK, but function args are bad JSON
@@ -471,7 +471,9 @@ def _handle_ai_response(
471471
except Exception:
472472
error_msg = f"Error parsing JSON for function '{function_name}' arguments: {function_call.arguments}"
473473
function_response = "None" # more like "never ran?"
474-
messages = self._handle_function_error_response(error_msg, tool_call_id, function_name, function_args, function_response, messages)
474+
messages = self._handle_function_error_response(
475+
error_msg, tool_call_id, function_name, function_args, function_response, messages
476+
)
475477
return messages, False, True # force a heartbeat to allow agent to handle error
476478

477479
# Check if inner thoughts is in the function call arguments (possible apparently if you are using Azure)

letta/client/streaming.py

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,45 @@
1717

1818

1919
def _sse_post(url: str, data: dict, headers: dict) -> Generator[Union[LettaStreamingResponse, ChatCompletionChunk], None, None]:
20-
21-
with httpx.Client() as client:
20+
"""
21+
Sends an SSE POST request and yields parsed response chunks.
22+
"""
23+
# TODO: Please note his is a very generous timeout for e2b reasons
24+
with httpx.Client(timeout=httpx.Timeout(5 * 60.0, read=5 * 60.0)) as client:
2225
with connect_sse(client, method="POST", url=url, json=data, headers=headers) as event_source:
2326

24-
# Inspect for errors before iterating (see https://github.com/florimondmanca/httpx-sse/pull/12)
27+
# Check for immediate HTTP errors before processing the SSE stream
2528
if not event_source.response.is_success:
26-
# handle errors
27-
pass
28-
29-
logger.warning("Caught error before iterating SSE request:", vars(event_source.response))
30-
logger.warning(event_source.response.read().decode("utf-8"))
29+
response_bytes = event_source.response.read()
30+
logger.warning(f"SSE request error: {vars(event_source.response)}")
31+
logger.warning(response_bytes.decode("utf-8"))
3132

3233
try:
33-
response_bytes = event_source.response.read()
3434
response_dict = json.loads(response_bytes.decode("utf-8"))
35-
# e.g.: This model's maximum context length is 8192 tokens. However, your messages resulted in 8198 tokens (7450 in the messages, 748 in the functions). Please reduce the length of the messages or functions.
36-
if (
37-
"error" in response_dict
38-
and "message" in response_dict["error"]
39-
and OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING in response_dict["error"]["message"]
40-
):
41-
logger.error(response_dict["error"]["message"])
42-
raise LLMError(response_dict["error"]["message"])
35+
error_message = response_dict.get("error", {}).get("message", "")
36+
37+
if OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING in error_message:
38+
logger.error(error_message)
39+
raise LLMError(error_message)
4340
except LLMError:
4441
raise
45-
except:
46-
logger.error(f"Failed to parse SSE message, throwing SSE HTTP error up the stack")
42+
except Exception:
43+
logger.error("Failed to parse SSE message, raising HTTP error")
4744
event_source.response.raise_for_status()
4845

4946
try:
5047
for sse in event_source.iter_sse():
51-
# if sse.data == OPENAI_SSE_DONE:
52-
# print("finished")
53-
# break
54-
if sse.data in [status.value for status in MessageStreamStatus]:
55-
# break
48+
if sse.data in {status.value for status in MessageStreamStatus}:
5649
yield MessageStreamStatus(sse.data)
50+
if sse.data == MessageStreamStatus.done.value:
51+
# We received the [DONE], so stop reading the stream.
52+
break
5753
else:
5854
chunk_data = json.loads(sse.data)
55+
5956
if "reasoning" in chunk_data:
6057
yield ReasoningMessage(**chunk_data)
61-
elif "message_type" in chunk_data and chunk_data["message_type"] == "assistant_message":
58+
elif chunk_data.get("message_type") == "assistant_message":
6259
yield AssistantMessage(**chunk_data)
6360
elif "tool_call" in chunk_data:
6461
yield ToolCallMessage(**chunk_data)
@@ -67,33 +64,31 @@ def _sse_post(url: str, data: dict, headers: dict) -> Generator[Union[LettaStrea
6764
elif "step_count" in chunk_data:
6865
yield LettaUsageStatistics(**chunk_data)
6966
elif chunk_data.get("object") == get_args(ChatCompletionChunk.__annotations__["object"])[0]:
70-
yield ChatCompletionChunk(**chunk_data) # Add your processing logic for chat chunks here
67+
yield ChatCompletionChunk(**chunk_data)
7168
else:
7269
raise ValueError(f"Unknown message type in chunk_data: {chunk_data}")
7370

7471
except SSEError as e:
75-
logger.error("Caught an error while iterating the SSE stream:", str(e))
76-
if "application/json" in str(e): # Check if the error is because of JSON response
77-
# TODO figure out a better way to catch the error other than re-trying with a POST
78-
response = client.post(url=url, json=data, headers=headers) # Make the request again to get the JSON response
79-
if response.headers["Content-Type"].startswith("application/json"):
80-
error_details = response.json() # Parse the JSON to get the error message
81-
logger.error("Request:", vars(response.request))
82-
logger.error("POST Error:", error_details)
83-
logger.error("Original SSE Error:", str(e))
72+
logger.error(f"SSE stream error: {e}")
73+
74+
if "application/json" in str(e):
75+
response = client.post(url=url, json=data, headers=headers)
76+
77+
if response.headers.get("Content-Type", "").startswith("application/json"):
78+
error_details = response.json()
79+
logger.error(f"POST Error: {error_details}")
8480
else:
8581
logger.error("Failed to retrieve JSON error message via retry.")
86-
else:
87-
logger.error("SSEError not related to 'application/json' content type.")
8882

89-
# Optionally re-raise the exception if you need to propagate it
9083
raise e
9184

9285
except Exception as e:
93-
if event_source.response.request is not None:
94-
logger.error("HTTP Request:", vars(event_source.response.request))
95-
if event_source.response is not None:
96-
logger.error("HTTP Status:", event_source.response.status_code)
97-
logger.error("HTTP Headers:", event_source.response.headers)
98-
logger.error("Exception message:", str(e))
86+
logger.error(f"Unexpected exception: {e}")
87+
88+
if event_source.response.request:
89+
logger.error(f"HTTP Request: {vars(event_source.response.request)}")
90+
if event_source.response:
91+
logger.error(f"HTTP Status: {event_source.response.status_code}")
92+
logger.error(f"HTTP Headers: {event_source.response.headers}")
93+
9994
raise e

letta/constants.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@
5151
BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace"]
5252
# Multi agent tools
5353
MULTI_AGENT_TOOLS = ["send_message_to_agent_and_wait_for_reply", "send_message_to_agents_matching_all_tags", "send_message_to_agent_async"]
54-
MULTI_AGENT_SEND_MESSAGE_MAX_RETRIES = 3
55-
MULTI_AGENT_SEND_MESSAGE_TIMEOUT = 20 * 60
56-
MULTI_AGENT_CONCURRENT_SENDS = 15
5754

5855
# The name of the tool used to send message to the user
5956
# May not be relevant in cases where the agent has multiple ways to message to user (send_imessage, send_discord_mesasge, ...)

letta/llm_api/anthropic.py

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
from letta.errors import BedrockError, BedrockPermissionError
2121
from letta.llm_api.aws_bedrock import get_bedrock_client
22+
from letta.llm_api.helpers import add_inner_thoughts_to_functions
23+
from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION
2224
from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages
2325
from letta.schemas.message import Message as _Message
2426
from letta.schemas.message import MessageRole as _MessageRole
@@ -513,9 +515,23 @@ def convert_anthropic_stream_event_to_chatcompletion(
513515
def _prepare_anthropic_request(
514516
data: ChatCompletionRequest,
515517
inner_thoughts_xml_tag: Optional[str] = "thinking",
518+
# if true, prefix fill the generation with the thinking tag
519+
prefix_fill: bool = True,
520+
# if true, put COT inside the tool calls instead of inside the content
521+
put_inner_thoughts_in_kwargs: bool = False,
516522
) -> dict:
517523
"""Prepare the request data for Anthropic API format."""
518-
# convert the tools
524+
525+
# if needed, put inner thoughts as a kwarg for all tools
526+
if data.tools and put_inner_thoughts_in_kwargs:
527+
functions = add_inner_thoughts_to_functions(
528+
functions=[t.function.model_dump() for t in data.tools],
529+
inner_thoughts_key=INNER_THOUGHTS_KWARG,
530+
inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION,
531+
)
532+
data.tools = [Tool(function=f) for f in functions]
533+
534+
# convert the tools to Anthropic's payload format
519535
anthropic_tools = None if data.tools is None else convert_tools_to_anthropic_format(data.tools)
520536

521537
# pydantic -> dict
@@ -529,11 +545,25 @@ def _prepare_anthropic_request(
529545
data.pop("tools")
530546
data.pop("tool_choice", None)
531547
elif anthropic_tools is not None:
548+
# TODO eventually enable parallel tool use
532549
data["tools"] = anthropic_tools
533-
if len(anthropic_tools) == 1:
550+
551+
# tool_choice_type other than "auto" only plays nice if thinking goes inside the tool calls
552+
if put_inner_thoughts_in_kwargs:
553+
if len(anthropic_tools) == 1:
554+
data["tool_choice"] = {
555+
"type": "tool",
556+
"name": anthropic_tools[0]["name"],
557+
"disable_parallel_tool_use": True,
558+
}
559+
else:
560+
data["tool_choice"] = {
561+
"type": "any",
562+
"disable_parallel_tool_use": True,
563+
}
564+
else:
534565
data["tool_choice"] = {
535-
"type": "tool",
536-
"name": anthropic_tools[0]["name"],
566+
"type": "auto",
537567
"disable_parallel_tool_use": True,
538568
}
539569

@@ -548,8 +578,21 @@ def _prepare_anthropic_request(
548578
message["content"] = None
549579

550580
# Convert to Anthropic format
551-
msg_objs = [_Message.dict_to_message(user_id=None, agent_id=None, openai_message_dict=m) for m in data["messages"]]
552-
data["messages"] = [m.to_anthropic_dict(inner_thoughts_xml_tag=inner_thoughts_xml_tag) for m in msg_objs]
581+
msg_objs = [
582+
_Message.dict_to_message(
583+
user_id=None,
584+
agent_id=None,
585+
openai_message_dict=m,
586+
)
587+
for m in data["messages"]
588+
]
589+
data["messages"] = [
590+
m.to_anthropic_dict(
591+
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
592+
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
593+
)
594+
for m in msg_objs
595+
]
553596

554597
# Ensure first message is user
555598
if data["messages"][0]["role"] != "user":
@@ -558,6 +601,16 @@ def _prepare_anthropic_request(
558601
# Handle alternating messages
559602
data["messages"] = merge_tool_results_into_user_messages(data["messages"])
560603

604+
# Handle prefix fill (not compatible with inner-thouguhts-in-kwargs)
605+
# https://docs.anthropic.com/en/api/messages#body-messages
606+
# NOTE: cannot prefill with tools for opus:
607+
# Your API request included an `assistant` message in the final position, which would pre-fill the `assistant` response. When using tools with "claude-3-opus-20240229"
608+
if prefix_fill and not put_inner_thoughts_in_kwargs and "opus" not in data["model"]:
609+
data["messages"].append(
610+
# Start the thinking process for the assistant
611+
{"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"},
612+
)
613+
561614
# Validate max_tokens
562615
assert "max_tokens" in data, data
563616

@@ -571,6 +624,7 @@ def _prepare_anthropic_request(
571624
def anthropic_chat_completions_request(
572625
data: ChatCompletionRequest,
573626
inner_thoughts_xml_tag: Optional[str] = "thinking",
627+
put_inner_thoughts_in_kwargs: bool = False,
574628
betas: List[str] = ["tools-2024-04-04"],
575629
) -> ChatCompletionResponse:
576630
"""https://docs.anthropic.com/claude/docs/tool-use"""
@@ -580,7 +634,11 @@ def anthropic_chat_completions_request(
580634
anthropic_client = anthropic.Anthropic(api_key=anthropic_override_key)
581635
elif model_settings.anthropic_api_key:
582636
anthropic_client = anthropic.Anthropic()
583-
data = _prepare_anthropic_request(data, inner_thoughts_xml_tag)
637+
data = _prepare_anthropic_request(
638+
data=data,
639+
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
640+
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
641+
)
584642
response = anthropic_client.beta.messages.create(
585643
**data,
586644
betas=betas,
@@ -611,14 +669,19 @@ def anthropic_bedrock_chat_completions_request(
611669
def anthropic_chat_completions_request_stream(
612670
data: ChatCompletionRequest,
613671
inner_thoughts_xml_tag: Optional[str] = "thinking",
672+
put_inner_thoughts_in_kwargs: bool = False,
614673
betas: List[str] = ["tools-2024-04-04"],
615674
) -> Generator[ChatCompletionChunkResponse, None, None]:
616675
"""Stream chat completions from Anthropic API.
617676
618677
Similar to OpenAI's streaming, but using Anthropic's native streaming support.
619678
See: https://docs.anthropic.com/claude/reference/messages-streaming
620679
"""
621-
data = _prepare_anthropic_request(data, inner_thoughts_xml_tag)
680+
data = _prepare_anthropic_request(
681+
data=data,
682+
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
683+
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
684+
)
622685

623686
anthropic_override_key = ProviderManager().get_anthropic_override_key()
624687
if anthropic_override_key:
@@ -666,6 +729,7 @@ def anthropic_chat_completions_process_stream(
666729
chat_completion_request: ChatCompletionRequest,
667730
stream_interface: Optional[Union[AgentChunkStreamingInterface, AgentRefreshStreamingInterface]] = None,
668731
inner_thoughts_xml_tag: Optional[str] = "thinking",
732+
put_inner_thoughts_in_kwargs: bool = False,
669733
create_message_id: bool = True,
670734
create_message_datetime: bool = True,
671735
betas: List[str] = ["tools-2024-04-04"],
@@ -743,6 +807,7 @@ def anthropic_chat_completions_process_stream(
743807
anthropic_chat_completions_request_stream(
744808
data=chat_completion_request,
745809
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
810+
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
746811
betas=betas,
747812
)
748813
):

0 commit comments

Comments
 (0)