Skip to content

Commit c789fe4

Browse files
committed
feat: implement streaming methods for chat models
1 parent 31e2bd0 commit c789fe4

File tree

4 files changed

+1833
-1691
lines changed

4 files changed

+1833
-1691
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.0.134"
3+
version = "0.0.135"
44
description = "UiPath Langchain"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.10"

src/uipath_langchain/_cli/_runtime/_conversation.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,14 @@ def map_message(
173173
content_part_sequence=idx,
174174
),
175175
)
176+
elif isinstance(message.content, str) and message.content:
177+
msg_event.content_part = UiPathConversationContentPartEvent(
178+
content_part_id=f"content-{message.id}",
179+
chunk=UiPathConversationContentPartChunkEvent(
180+
data=message.content,
181+
content_part_sequence=0,
182+
),
183+
)
176184

177185
stop_reason = message.response_metadata.get("stop_reason")
178186
if not message.content and stop_reason in ("tool_use", "end_turn"):

src/uipath_langchain/chat/models.py

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import json
22
import logging
3-
from typing import Any, Dict, List, Literal, Optional, Union
3+
from typing import Any, AsyncIterator, Dict, Iterator, List, Literal, Optional, Union
44

55
from langchain_core.callbacks import (
66
AsyncCallbackManagerForLLMRun,
77
CallbackManagerForLLMRun,
88
)
99
from langchain_core.language_models import LanguageModelInput
10-
from langchain_core.messages import AIMessage, BaseMessage
10+
from langchain_core.messages import AIMessage, AIMessageChunk, BaseMessage
1111
from langchain_core.messages.ai import UsageMetadata
12-
from langchain_core.outputs import ChatGeneration, ChatResult
12+
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
1313
from langchain_core.runnables import Runnable
1414
from langchain_openai.chat_models import AzureChatOpenAI
1515
from pydantic import BaseModel
@@ -49,6 +49,54 @@ async def _agenerate(
4949
response = await self._acall(self.url, payload, self.auth_headers)
5050
return self._create_chat_result(response)
5151

52+
def _stream(
53+
self,
54+
messages: List[BaseMessage],
55+
stop: Optional[List[str]] = None,
56+
run_manager: Optional[CallbackManagerForLLMRun] = None,
57+
**kwargs: Any,
58+
) -> Iterator[ChatGenerationChunk]:
59+
if "tools" in kwargs and not kwargs["tools"]:
60+
del kwargs["tools"]
61+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
62+
response = self._call(self.url, payload, self.auth_headers)
63+
64+
# For non-streaming response, yield single chunk
65+
chat_result = self._create_chat_result(response)
66+
chunk = ChatGenerationChunk(
67+
message=AIMessageChunk(
68+
content=chat_result.generations[0].message.content,
69+
additional_kwargs=chat_result.generations[0].message.additional_kwargs,
70+
response_metadata=chat_result.generations[0].message.response_metadata,
71+
usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore
72+
)
73+
)
74+
yield chunk
75+
76+
async def _astream(
77+
self,
78+
messages: List[BaseMessage],
79+
stop: Optional[List[str]] = None,
80+
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
81+
**kwargs: Any,
82+
) -> AsyncIterator[ChatGenerationChunk]:
83+
if "tools" in kwargs and not kwargs["tools"]:
84+
del kwargs["tools"]
85+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
86+
response = await self._acall(self.url, payload, self.auth_headers)
87+
88+
# For non-streaming response, yield single chunk
89+
chat_result = self._create_chat_result(response)
90+
chunk = ChatGenerationChunk(
91+
message=AIMessageChunk(
92+
content=chat_result.generations[0].message.content,
93+
additional_kwargs=chat_result.generations[0].message.additional_kwargs,
94+
response_metadata=chat_result.generations[0].message.response_metadata,
95+
usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore
96+
)
97+
)
98+
yield chunk
99+
52100
def with_structured_output(
53101
self,
54102
schema: Optional[Any] = None,
@@ -217,6 +265,92 @@ async def _agenerate(
217265
response = await self._acall(self.url, payload, self.auth_headers)
218266
return self._create_chat_result(response)
219267

268+
def _stream(
269+
self,
270+
messages: List[BaseMessage],
271+
stop: Optional[List[str]] = None,
272+
run_manager: Optional[CallbackManagerForLLMRun] = None,
273+
**kwargs: Any,
274+
) -> Iterator[ChatGenerationChunk]:
275+
"""Stream the LLM on a given prompt.
276+
277+
Args:
278+
messages: the prompt composed of a list of messages.
279+
stop: a list of strings on which the model should stop generating.
280+
run_manager: A run manager with callbacks for the LLM.
281+
**kwargs: Additional keyword arguments.
282+
283+
Returns:
284+
An iterator of ChatGenerationChunk objects.
285+
"""
286+
if kwargs.get("tools"):
287+
kwargs["tools"] = [tool["function"] for tool in kwargs["tools"]]
288+
if "tool_choice" in kwargs and kwargs["tool_choice"]["type"] == "function":
289+
kwargs["tool_choice"] = {
290+
"type": "tool",
291+
"name": kwargs["tool_choice"]["function"]["name"],
292+
}
293+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
294+
response = self._call(self.url, payload, self.auth_headers)
295+
296+
# For non-streaming response, yield single chunk
297+
chat_result = self._create_chat_result(response)
298+
chunk = ChatGenerationChunk(
299+
message=AIMessageChunk(
300+
content=chat_result.generations[0].message.content,
301+
additional_kwargs=chat_result.generations[0].message.additional_kwargs,
302+
response_metadata=chat_result.generations[0].message.response_metadata,
303+
usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore
304+
tool_calls=getattr(
305+
chat_result.generations[0].message, "tool_calls", None
306+
),
307+
)
308+
)
309+
yield chunk
310+
311+
async def _astream(
312+
self,
313+
messages: List[BaseMessage],
314+
stop: Optional[List[str]] = None,
315+
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
316+
**kwargs: Any,
317+
) -> AsyncIterator[ChatGenerationChunk]:
318+
"""Async stream the LLM on a given prompt.
319+
320+
Args:
321+
messages: the prompt composed of a list of messages.
322+
stop: a list of strings on which the model should stop generating.
323+
run_manager: A run manager with callbacks for the LLM.
324+
**kwargs: Additional keyword arguments.
325+
326+
Returns:
327+
An async iterator of ChatGenerationChunk objects.
328+
"""
329+
if kwargs.get("tools"):
330+
kwargs["tools"] = [tool["function"] for tool in kwargs["tools"]]
331+
if "tool_choice" in kwargs and kwargs["tool_choice"]["type"] == "function":
332+
kwargs["tool_choice"] = {
333+
"type": "tool",
334+
"name": kwargs["tool_choice"]["function"]["name"],
335+
}
336+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
337+
response = await self._acall(self.url, payload, self.auth_headers)
338+
339+
# For non-streaming response, yield single chunk
340+
chat_result = self._create_chat_result(response)
341+
chunk = ChatGenerationChunk(
342+
message=AIMessageChunk(
343+
content=chat_result.generations[0].message.content,
344+
additional_kwargs=chat_result.generations[0].message.additional_kwargs,
345+
response_metadata=chat_result.generations[0].message.response_metadata,
346+
usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore
347+
tool_calls=getattr(
348+
chat_result.generations[0].message, "tool_calls", None
349+
),
350+
)
351+
)
352+
yield chunk
353+
220354
def with_structured_output(
221355
self,
222356
schema: Optional[Any] = None,

0 commit comments

Comments
 (0)