|
1 | 1 | import json
|
2 | 2 | import logging
|
3 |
| -from typing import Any, Dict, List, Literal, Optional, Union |
| 3 | +from typing import Any, AsyncIterator, Dict, Iterator, List, Literal, Optional, Union |
4 | 4 |
|
5 | 5 | from langchain_core.callbacks import (
|
6 | 6 | AsyncCallbackManagerForLLMRun,
|
7 | 7 | CallbackManagerForLLMRun,
|
8 | 8 | )
|
9 | 9 | from langchain_core.language_models import LanguageModelInput
|
10 |
| -from langchain_core.messages import AIMessage, BaseMessage |
| 10 | +from langchain_core.messages import AIMessage, AIMessageChunk, BaseMessage |
11 | 11 | from langchain_core.messages.ai import UsageMetadata
|
12 |
| -from langchain_core.outputs import ChatGeneration, ChatResult |
| 12 | +from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult |
13 | 13 | from langchain_core.runnables import Runnable
|
14 | 14 | from langchain_openai.chat_models import AzureChatOpenAI
|
15 | 15 | from pydantic import BaseModel
|
@@ -49,6 +49,54 @@ async def _agenerate(
|
49 | 49 | response = await self._acall(self.url, payload, self.auth_headers)
|
50 | 50 | return self._create_chat_result(response)
|
51 | 51 |
|
| 52 | + def _stream( |
| 53 | + self, |
| 54 | + messages: List[BaseMessage], |
| 55 | + stop: Optional[List[str]] = None, |
| 56 | + run_manager: Optional[CallbackManagerForLLMRun] = None, |
| 57 | + **kwargs: Any, |
| 58 | + ) -> Iterator[ChatGenerationChunk]: |
| 59 | + if "tools" in kwargs and not kwargs["tools"]: |
| 60 | + del kwargs["tools"] |
| 61 | + payload = self._get_request_payload(messages, stop=stop, **kwargs) |
| 62 | + response = self._call(self.url, payload, self.auth_headers) |
| 63 | + |
| 64 | + # For non-streaming response, yield single chunk |
| 65 | + chat_result = self._create_chat_result(response) |
| 66 | + chunk = ChatGenerationChunk( |
| 67 | + message=AIMessageChunk( |
| 68 | + content=chat_result.generations[0].message.content, |
| 69 | + additional_kwargs=chat_result.generations[0].message.additional_kwargs, |
| 70 | + response_metadata=chat_result.generations[0].message.response_metadata, |
| 71 | + usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore |
| 72 | + ) |
| 73 | + ) |
| 74 | + yield chunk |
| 75 | + |
| 76 | + async def _astream( |
| 77 | + self, |
| 78 | + messages: List[BaseMessage], |
| 79 | + stop: Optional[List[str]] = None, |
| 80 | + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, |
| 81 | + **kwargs: Any, |
| 82 | + ) -> AsyncIterator[ChatGenerationChunk]: |
| 83 | + if "tools" in kwargs and not kwargs["tools"]: |
| 84 | + del kwargs["tools"] |
| 85 | + payload = self._get_request_payload(messages, stop=stop, **kwargs) |
| 86 | + response = await self._acall(self.url, payload, self.auth_headers) |
| 87 | + |
| 88 | + # For non-streaming response, yield single chunk |
| 89 | + chat_result = self._create_chat_result(response) |
| 90 | + chunk = ChatGenerationChunk( |
| 91 | + message=AIMessageChunk( |
| 92 | + content=chat_result.generations[0].message.content, |
| 93 | + additional_kwargs=chat_result.generations[0].message.additional_kwargs, |
| 94 | + response_metadata=chat_result.generations[0].message.response_metadata, |
| 95 | + usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore |
| 96 | + ) |
| 97 | + ) |
| 98 | + yield chunk |
| 99 | + |
52 | 100 | def with_structured_output(
|
53 | 101 | self,
|
54 | 102 | schema: Optional[Any] = None,
|
@@ -217,6 +265,92 @@ async def _agenerate(
|
217 | 265 | response = await self._acall(self.url, payload, self.auth_headers)
|
218 | 266 | return self._create_chat_result(response)
|
219 | 267 |
|
| 268 | + def _stream( |
| 269 | + self, |
| 270 | + messages: List[BaseMessage], |
| 271 | + stop: Optional[List[str]] = None, |
| 272 | + run_manager: Optional[CallbackManagerForLLMRun] = None, |
| 273 | + **kwargs: Any, |
| 274 | + ) -> Iterator[ChatGenerationChunk]: |
| 275 | + """Stream the LLM on a given prompt. |
| 276 | +
|
| 277 | + Args: |
| 278 | + messages: the prompt composed of a list of messages. |
| 279 | + stop: a list of strings on which the model should stop generating. |
| 280 | + run_manager: A run manager with callbacks for the LLM. |
| 281 | + **kwargs: Additional keyword arguments. |
| 282 | +
|
| 283 | + Returns: |
| 284 | + An iterator of ChatGenerationChunk objects. |
| 285 | + """ |
| 286 | + if kwargs.get("tools"): |
| 287 | + kwargs["tools"] = [tool["function"] for tool in kwargs["tools"]] |
| 288 | + if "tool_choice" in kwargs and kwargs["tool_choice"]["type"] == "function": |
| 289 | + kwargs["tool_choice"] = { |
| 290 | + "type": "tool", |
| 291 | + "name": kwargs["tool_choice"]["function"]["name"], |
| 292 | + } |
| 293 | + payload = self._get_request_payload(messages, stop=stop, **kwargs) |
| 294 | + response = self._call(self.url, payload, self.auth_headers) |
| 295 | + |
| 296 | + # For non-streaming response, yield single chunk |
| 297 | + chat_result = self._create_chat_result(response) |
| 298 | + chunk = ChatGenerationChunk( |
| 299 | + message=AIMessageChunk( |
| 300 | + content=chat_result.generations[0].message.content, |
| 301 | + additional_kwargs=chat_result.generations[0].message.additional_kwargs, |
| 302 | + response_metadata=chat_result.generations[0].message.response_metadata, |
| 303 | + usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore |
| 304 | + tool_calls=getattr( |
| 305 | + chat_result.generations[0].message, "tool_calls", None |
| 306 | + ), |
| 307 | + ) |
| 308 | + ) |
| 309 | + yield chunk |
| 310 | + |
| 311 | + async def _astream( |
| 312 | + self, |
| 313 | + messages: List[BaseMessage], |
| 314 | + stop: Optional[List[str]] = None, |
| 315 | + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, |
| 316 | + **kwargs: Any, |
| 317 | + ) -> AsyncIterator[ChatGenerationChunk]: |
| 318 | + """Async stream the LLM on a given prompt. |
| 319 | +
|
| 320 | + Args: |
| 321 | + messages: the prompt composed of a list of messages. |
| 322 | + stop: a list of strings on which the model should stop generating. |
| 323 | + run_manager: A run manager with callbacks for the LLM. |
| 324 | + **kwargs: Additional keyword arguments. |
| 325 | +
|
| 326 | + Returns: |
| 327 | + An async iterator of ChatGenerationChunk objects. |
| 328 | + """ |
| 329 | + if kwargs.get("tools"): |
| 330 | + kwargs["tools"] = [tool["function"] for tool in kwargs["tools"]] |
| 331 | + if "tool_choice" in kwargs and kwargs["tool_choice"]["type"] == "function": |
| 332 | + kwargs["tool_choice"] = { |
| 333 | + "type": "tool", |
| 334 | + "name": kwargs["tool_choice"]["function"]["name"], |
| 335 | + } |
| 336 | + payload = self._get_request_payload(messages, stop=stop, **kwargs) |
| 337 | + response = await self._acall(self.url, payload, self.auth_headers) |
| 338 | + |
| 339 | + # For non-streaming response, yield single chunk |
| 340 | + chat_result = self._create_chat_result(response) |
| 341 | + chunk = ChatGenerationChunk( |
| 342 | + message=AIMessageChunk( |
| 343 | + content=chat_result.generations[0].message.content, |
| 344 | + additional_kwargs=chat_result.generations[0].message.additional_kwargs, |
| 345 | + response_metadata=chat_result.generations[0].message.response_metadata, |
| 346 | + usage_metadata=chat_result.generations[0].message.usage_metadata, # type: ignore |
| 347 | + tool_calls=getattr( |
| 348 | + chat_result.generations[0].message, "tool_calls", None |
| 349 | + ), |
| 350 | + ) |
| 351 | + ) |
| 352 | + yield chunk |
| 353 | + |
220 | 354 | def with_structured_output(
|
221 | 355 | self,
|
222 | 356 | schema: Optional[Any] = None,
|
|
0 commit comments