Skip to content

Commit 220a9a8

Browse files
GabrielVasilescu04cristipufu
authored andcommitted
feat: add mapper for chat messages
1 parent fb053c9 commit 220a9a8

File tree

5 files changed

+2102
-1796
lines changed

5 files changed

+2102
-1796
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.1.9"
3+
version = "0.1.10"
44
description = "UiPath Langchain"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
77
dependencies = [
8+
"uipath-core>=0.0.6, <0.1.0",
89
"uipath-runtime>=0.1.2, <0.2.0",
910
"uipath>=2.2.11, <2.3.0",
1011
"langgraph>=1.0.0, <2.0.0",
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1+
from .mapper import UiPathChatMessagesMapper
12
from .models import UiPathAzureChatOpenAI, UiPathChat
23

3-
__all__ = [
4-
"UiPathChat",
5-
"UiPathAzureChatOpenAI",
6-
]
4+
__all__ = ["UiPathChat", "UiPathAzureChatOpenAI", "UiPathChatMessagesMapper"]
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
import json
2+
import logging
3+
import uuid
4+
from datetime import datetime, timezone
5+
from typing import Any, cast
6+
7+
from langchain_core.messages import (
8+
AIMessageChunk,
9+
BaseMessage,
10+
HumanMessage,
11+
TextContentBlock,
12+
ToolCallChunk,
13+
ToolMessage,
14+
)
15+
from uipath.core.chat import (
16+
UiPathConversationContentPartChunkEvent,
17+
UiPathConversationContentPartEndEvent,
18+
UiPathConversationContentPartEvent,
19+
UiPathConversationContentPartStartEvent,
20+
UiPathConversationEvent,
21+
UiPathConversationExchangeEvent,
22+
UiPathConversationMessage,
23+
UiPathConversationMessageEndEvent,
24+
UiPathConversationMessageEvent,
25+
UiPathConversationMessageStartEvent,
26+
UiPathConversationToolCallEndEvent,
27+
UiPathConversationToolCallEvent,
28+
UiPathConversationToolCallStartEvent,
29+
UiPathInlineValue,
30+
)
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class UiPathChatMessagesMapper:
36+
"""Stateful mapper that converts LangChain messages to UiPath conversation events.
37+
38+
Maintains state across multiple message conversions to properly track:
39+
- The AI message ID associated with each tool call for proper correlation with ToolMessage
40+
"""
41+
42+
def __init__(self):
43+
"""Initialize the mapper with empty state."""
44+
self.tool_call_to_ai_message: dict[str, str] = {}
45+
self.seen_message_ids: set[str] = set()
46+
47+
def _wrap_in_conversation_event(
48+
self,
49+
msg_event: UiPathConversationMessageEvent,
50+
exchange_id: str | None = None,
51+
conversation_id: str | None = None,
52+
) -> UiPathConversationEvent:
53+
"""Helper to wrap a message event into a conversation-level event."""
54+
return UiPathConversationEvent(
55+
conversation_id=conversation_id or str(uuid.uuid4()),
56+
exchange=UiPathConversationExchangeEvent(
57+
exchange_id=exchange_id or str(uuid.uuid4()),
58+
message=msg_event,
59+
),
60+
)
61+
62+
def _extract_text(self, content: Any) -> str:
63+
"""Normalize LangGraph message.content to plain text."""
64+
if isinstance(content, str):
65+
return content
66+
if isinstance(content, list):
67+
return "".join(
68+
part.get("text", "")
69+
for part in content
70+
if isinstance(part, dict) and part.get("type") == "text"
71+
)
72+
return str(content or "")
73+
74+
def map_messages(self, input_data: dict[str, Any]) -> list[HumanMessage]:
75+
"""
76+
Converts a UiPathConversationMessage into a list of HumanMessages for LangGraph.
77+
Supports multimodal content parts (text, external content) and preserves metadata.
78+
"""
79+
human_messages: list[HumanMessage] = []
80+
81+
# Extract messages from input_data
82+
messages: list[UiPathConversationMessage] = input_data.get("messages", [])
83+
84+
for uipath_msg in messages:
85+
# Loop over each content part
86+
if uipath_msg.content_parts:
87+
for part in uipath_msg.content_parts:
88+
data = part.data
89+
content = ""
90+
metadata: dict[str, Any] = {
91+
"message_id": uipath_msg.message_id,
92+
"content_part_id": part.content_part_id,
93+
"mime_type": part.mime_type,
94+
"created_at": uipath_msg.created_at,
95+
"updated_at": uipath_msg.updated_at,
96+
}
97+
98+
if isinstance(data, UiPathInlineValue):
99+
content = str(data.inline)
100+
101+
# Append a HumanMessage for this content part
102+
human_messages.append(
103+
HumanMessage(content=content, metadata=metadata)
104+
)
105+
106+
# Handle the case where there are no content parts
107+
else:
108+
metadata = {
109+
"message_id": uipath_msg.message_id,
110+
"role": uipath_msg.role,
111+
"created_at": uipath_msg.created_at,
112+
"updated_at": uipath_msg.updated_at,
113+
}
114+
human_messages.append(HumanMessage(content="", metadata=metadata))
115+
116+
return human_messages
117+
118+
def map_event(
119+
self,
120+
message: BaseMessage,
121+
exchange_id: str | None = None,
122+
conversation_id: str | None = None,
123+
) -> UiPathConversationEvent | None:
124+
"""Convert LangGraph BaseMessage (chunk or full) into a UiPathConversationEvent.
125+
126+
Args:
127+
message: The LangChain message to convert
128+
exchange_id: Optional exchange ID for the conversation
129+
conversation_id: Optional conversation ID
130+
131+
Returns:
132+
A UiPathConversationEvent if the message should be emitted, None otherwise
133+
"""
134+
# Format timestamp as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z
135+
timestamp = (
136+
datetime.now(timezone.utc)
137+
.isoformat(timespec="milliseconds")
138+
.replace("+00:00", "Z")
139+
)
140+
141+
# --- Streaming AIMessageChunk ---
142+
if isinstance(message, AIMessageChunk):
143+
if message.id is None:
144+
return None
145+
146+
msg_event = UiPathConversationMessageEvent(
147+
message_id=message.id,
148+
)
149+
150+
# Check if this is the last chunk by examining chunk_position
151+
if message.chunk_position == "last":
152+
msg_event.end = UiPathConversationMessageEndEvent(timestamp=timestamp)
153+
msg_event.content_part = UiPathConversationContentPartEvent(
154+
content_part_id=f"chunk-{message.id}-0",
155+
end=UiPathConversationContentPartEndEvent(),
156+
)
157+
return self._wrap_in_conversation_event(
158+
msg_event, exchange_id, conversation_id
159+
)
160+
161+
# For every new message_id, start a new message
162+
if message.id not in self.seen_message_ids:
163+
self.seen_message_ids.add(message.id)
164+
msg_event.start = UiPathConversationMessageStartEvent(
165+
role="assistant", timestamp=timestamp
166+
)
167+
msg_event.content_part = UiPathConversationContentPartEvent(
168+
content_part_id=f"chunk-{message.id}-0",
169+
start=UiPathConversationContentPartStartEvent(
170+
mime_type="text/plain"
171+
),
172+
)
173+
174+
elif message.content_blocks:
175+
for block in message.content_blocks:
176+
block_type = block.get("type")
177+
178+
if block_type == "text":
179+
text_block = cast(TextContentBlock, block)
180+
text = text_block["text"]
181+
182+
msg_event.content_part = UiPathConversationContentPartEvent(
183+
content_part_id=f"chunk-{message.id}-0",
184+
chunk=UiPathConversationContentPartChunkEvent(
185+
data=text,
186+
content_part_sequence=0,
187+
),
188+
)
189+
190+
elif block_type == "tool_call_chunk":
191+
tool_chunk_block = cast(ToolCallChunk, block)
192+
193+
tool_call_id = tool_chunk_block.get("id")
194+
if tool_call_id:
195+
# Track tool_call_id -> ai_message_id mapping
196+
self.tool_call_to_ai_message[str(tool_call_id)] = message.id
197+
198+
args = tool_chunk_block.get("args") or ""
199+
200+
msg_event.content_part = UiPathConversationContentPartEvent(
201+
content_part_id=f"chunk-{message.id}-0",
202+
chunk=UiPathConversationContentPartChunkEvent(
203+
data=args,
204+
content_part_sequence=0,
205+
),
206+
)
207+
# Continue so that multiple tool_call_chunks in the same block list
208+
# are handled correctly
209+
continue
210+
211+
# Fallback: raw string content on the chunk (rare when using content_blocks)
212+
elif isinstance(message.content, str) and message.content:
213+
msg_event.content_part = UiPathConversationContentPartEvent(
214+
content_part_id=f"content-{message.id}",
215+
chunk=UiPathConversationContentPartChunkEvent(
216+
data=message.content,
217+
content_part_sequence=0,
218+
),
219+
)
220+
221+
if (
222+
msg_event.start
223+
or msg_event.content_part
224+
or msg_event.tool_call
225+
or msg_event.end
226+
):
227+
return self._wrap_in_conversation_event(
228+
msg_event, exchange_id, conversation_id
229+
)
230+
231+
return None
232+
233+
# --- ToolMessage ---
234+
if isinstance(message, ToolMessage):
235+
# Look up the AI message ID using the tool_call_id
236+
result_message_id = (
237+
self.tool_call_to_ai_message.get(message.tool_call_id)
238+
if message.tool_call_id
239+
else None
240+
)
241+
242+
# If no AI message ID was found, we cannot properly associate this tool result
243+
if not result_message_id:
244+
logger.warning(
245+
f"Tool message {message.tool_call_id} has no associated AI message ID. Skipping."
246+
)
247+
return None
248+
249+
# Clean up the mapping after use
250+
if message.tool_call_id:
251+
del self.tool_call_to_ai_message[message.tool_call_id]
252+
253+
content_value: Any = message.content
254+
if isinstance(content_value, str):
255+
try:
256+
content_value = json.loads(content_value)
257+
except (json.JSONDecodeError, TypeError):
258+
# Keep as string if not valid JSON
259+
pass
260+
261+
return self._wrap_in_conversation_event(
262+
UiPathConversationMessageEvent(
263+
message_id=result_message_id,
264+
tool_call=UiPathConversationToolCallEvent(
265+
tool_call_id=message.tool_call_id,
266+
start=UiPathConversationToolCallStartEvent(
267+
tool_name=message.name,
268+
arguments=None,
269+
timestamp=timestamp,
270+
),
271+
end=UiPathConversationToolCallEndEvent(
272+
timestamp=timestamp,
273+
result=content_value,
274+
),
275+
),
276+
),
277+
exchange_id,
278+
conversation_id,
279+
)
280+
281+
# --- Fallback for other BaseMessage types ---
282+
text_content = self._extract_text(message.content)
283+
return self._wrap_in_conversation_event(
284+
UiPathConversationMessageEvent(
285+
message_id=message.id,
286+
start=UiPathConversationMessageStartEvent(
287+
role="assistant", timestamp=timestamp
288+
),
289+
content_part=UiPathConversationContentPartEvent(
290+
content_part_id=f"cp-{message.id}",
291+
chunk=UiPathConversationContentPartChunkEvent(data=text_content),
292+
),
293+
end=UiPathConversationMessageEndEvent(),
294+
),
295+
exchange_id,
296+
conversation_id,
297+
)
298+
299+
300+
__all__ = ["UiPathChatMessagesMapper"]

src/uipath_langchain/runtime/runtime.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from uipath.runtime.schema import UiPathRuntimeSchema
2323

24+
from uipath_langchain.chat import UiPathChatMessagesMapper
2425
from uipath_langchain.runtime.errors import LangGraphErrorCode, LangGraphRuntimeError
2526
from uipath_langchain.runtime.schema import get_entrypoints_schema, get_graph_schema
2627

@@ -49,6 +50,7 @@ def __init__(
4950
self.graph: CompiledStateGraph[Any, Any, Any, Any] = graph
5051
self.runtime_id: str = runtime_id or "default"
5152
self.entrypoint: str | None = entrypoint
53+
self.chat = UiPathChatMessagesMapper()
5254

5355
async def execute(
5456
self,
@@ -130,7 +132,7 @@ async def stream(
130132
if isinstance(data, tuple):
131133
message, _ = data
132134
event = UiPathRuntimeMessageEvent(
133-
payload=message,
135+
payload=self.chat.map_event(message),
134136
)
135137
yield event
136138

@@ -197,9 +199,14 @@ async def _get_graph_input(
197199
options: UiPathExecuteOptions | None,
198200
) -> Any:
199201
"""Process and return graph input."""
202+
graph_input = input or {}
203+
if isinstance(graph_input, dict):
204+
messages = graph_input.get("messages", None)
205+
if messages:
206+
graph_input["messages"] = self.chat.map_messages(messages)
200207
if options and options.resume:
201-
return Command(resume=input or {})
202-
return input or {}
208+
return Command(resume=graph_input)
209+
return graph_input
203210

204211
async def _get_graph_state(
205212
self,
@@ -299,9 +306,7 @@ async def _create_runtime_result(
299306
Get final graph state and create the execution result.
300307
301308
Args:
302-
compiled_graph: The compiled graph instance
303309
graph_config: The graph execution configuration
304-
memory: The SQLite memory instance
305310
graph_output: The graph execution output
306311
"""
307312
# Get the final state

0 commit comments

Comments
 (0)