1
1
import json
2
2
import logging
3
3
import os
4
- from typing import Any , Dict , List , Optional , Tuple , Union
4
+ from typing import Any , Awaitable , Callable , Dict , List , Optional , Tuple , Union
5
5
6
6
from langchain_core .callbacks .base import BaseCallbackHandler
7
7
from langchain_core .messages import BaseMessage
8
8
from langchain_core .runnables .config import RunnableConfig
9
9
from langgraph .checkpoint .sqlite .aio import AsyncSqliteSaver
10
10
from langgraph .errors import EmptyInputError , GraphRecursionError , InvalidUpdateError
11
- from langgraph .graph .state import CompiledStateGraph
11
+ from langgraph .graph .state import CompiledStateGraph , StateGraph
12
12
from uipath ._cli ._runtime ._contracts import (
13
13
UiPathBaseRuntime ,
14
14
UiPathErrorCategory ,
15
15
UiPathRuntimeResult ,
16
16
)
17
17
18
- from .._utils ._graph import LangGraphConfig
19
18
from ._context import LangGraphRuntimeContext
20
19
from ._conversation import map_message
21
20
from ._exception import LangGraphRuntimeError
21
+ from ._graph_resolver import GraphResolver
22
22
from ._input import LangGraphInputProcessor
23
23
from ._output import LangGraphOutputProcessor
24
24
25
25
logger = logging .getLogger (__name__ )
26
26
27
+ AsyncResolver = Callable [[], Awaitable [StateGraph [Any , Any , Any ]]]
28
+
27
29
28
30
class LangGraphRuntime (UiPathBaseRuntime ):
29
31
"""
30
32
A runtime class implementing the async context manager protocol.
31
33
This allows using the class with 'async with' statements.
32
34
"""
33
35
34
- def __init__ (self , context : LangGraphRuntimeContext ):
36
+ def __init__ (self , context : LangGraphRuntimeContext , graph_resolver : AsyncResolver ):
35
37
super ().__init__ (context )
36
38
self .context : LangGraphRuntimeContext = context
39
+ self .graph_resolver : AsyncResolver = graph_resolver
37
40
38
41
async def execute (self ) -> Optional [UiPathRuntimeResult ]:
39
42
"""
@@ -46,7 +49,8 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
46
49
LangGraphRuntimeError: If execution fails
47
50
"""
48
51
49
- if self .context .state_graph is None :
52
+ graph = await self .graph_resolver ()
53
+ if not graph :
50
54
return None
51
55
52
56
try :
@@ -56,9 +60,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
56
60
self .context .memory = memory
57
61
58
62
# Compile the graph with the checkpointer
59
- graph = self .context .state_graph .compile (
60
- checkpointer = self .context .memory
61
- )
63
+ compiled_graph = graph .compile (checkpointer = self .context .memory )
62
64
63
65
# Process input, handling resume if needed
64
66
input_processor = LangGraphInputProcessor (context = self .context )
@@ -87,7 +89,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
87
89
graph_config ["max_concurrency" ] = int (max_concurrency )
88
90
89
91
if self .context .chat_handler :
90
- async for stream_chunk in graph .astream (
92
+ async for stream_chunk in compiled_graph .astream (
91
93
processed_input ,
92
94
graph_config ,
93
95
stream_mode = "messages" ,
@@ -109,7 +111,7 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
109
111
elif self .is_debug_run ():
110
112
# Get final chunk while streaming
111
113
final_chunk = None
112
- async for stream_chunk in graph .astream (
114
+ async for stream_chunk in compiled_graph .astream (
113
115
processed_input ,
114
116
graph_config ,
115
117
stream_mode = "updates" ,
@@ -118,16 +120,18 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
118
120
self ._pretty_print (stream_chunk )
119
121
final_chunk = stream_chunk
120
122
121
- self .context .output = self ._extract_graph_result (final_chunk , graph )
123
+ self .context .output = self ._extract_graph_result (
124
+ final_chunk , compiled_graph
125
+ )
122
126
else :
123
127
# Execute the graph normally at runtime or eval
124
- self .context .output = await graph .ainvoke (
128
+ self .context .output = await compiled_graph .ainvoke (
125
129
processed_input , graph_config
126
130
)
127
131
128
132
# Get the state if available
129
133
try :
130
- self .context .state = await graph .aget_state (graph_config )
134
+ self .context .state = await compiled_graph .aget_state (graph_config )
131
135
except Exception :
132
136
pass
133
137
@@ -177,91 +181,10 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
177
181
pass
178
182
179
183
async def validate (self ) -> None :
180
- """Validate runtime inputs."""
181
- """Load and validate the graph configuration ."""
182
- if self .context .langgraph_config is None :
183
- self .context .langgraph_config = LangGraphConfig ()
184
- if not self .context .langgraph_config .exists :
185
- raise LangGraphRuntimeError (
186
- "CONFIG_MISSING" ,
187
- "Invalid configuration" ,
188
- "Failed to load configuration" ,
189
- UiPathErrorCategory .DEPLOYMENT ,
190
- )
191
-
192
- try :
193
- self .context .langgraph_config .load_config ()
194
- except Exception as e :
195
- raise LangGraphRuntimeError (
196
- "CONFIG_INVALID" ,
197
- "Invalid configuration" ,
198
- f"Failed to load configuration: { str (e )} " ,
199
- UiPathErrorCategory .DEPLOYMENT ,
200
- ) from e
201
-
202
- # Determine entrypoint if not provided
203
- graphs = self .context .langgraph_config .graphs
204
- if not self .context .entrypoint and len (graphs ) == 1 :
205
- self .context .entrypoint = graphs [0 ].name
206
- elif not self .context .entrypoint :
207
- graph_names = ", " .join (g .name for g in graphs )
208
- raise LangGraphRuntimeError (
209
- "ENTRYPOINT_MISSING" ,
210
- "Entrypoint required" ,
211
- f"Multiple graphs available. Please specify one of: { graph_names } ." ,
212
- UiPathErrorCategory .DEPLOYMENT ,
213
- )
214
-
215
- # Get the specified graph
216
- self .graph_config = self .context .langgraph_config .get_graph (
217
- self .context .entrypoint
218
- )
219
- if not self .graph_config :
220
- raise LangGraphRuntimeError (
221
- "GRAPH_NOT_FOUND" ,
222
- "Graph not found" ,
223
- f"Graph '{ self .context .entrypoint } ' not found." ,
224
- UiPathErrorCategory .DEPLOYMENT ,
225
- )
226
- try :
227
- loaded_graph = await self .graph_config .load_graph ()
228
- self .context .state_graph = (
229
- loaded_graph .builder
230
- if isinstance (loaded_graph , CompiledStateGraph )
231
- else loaded_graph
232
- )
233
- except ImportError as e :
234
- raise LangGraphRuntimeError (
235
- "GRAPH_IMPORT_ERROR" ,
236
- "Graph import failed" ,
237
- f"Failed to import graph '{ self .context .entrypoint } ': { str (e )} " ,
238
- UiPathErrorCategory .USER ,
239
- ) from e
240
- except TypeError as e :
241
- raise LangGraphRuntimeError (
242
- "GRAPH_TYPE_ERROR" ,
243
- "Invalid graph type" ,
244
- f"Graph '{ self .context .entrypoint } ' is not a valid StateGraph or CompiledStateGraph: { str (e )} " ,
245
- UiPathErrorCategory .USER ,
246
- ) from e
247
- except ValueError as e :
248
- raise LangGraphRuntimeError (
249
- "GRAPH_VALUE_ERROR" ,
250
- "Invalid graph value" ,
251
- f"Invalid value in graph '{ self .context .entrypoint } ': { str (e )} " ,
252
- UiPathErrorCategory .USER ,
253
- ) from e
254
- except Exception as e :
255
- raise LangGraphRuntimeError (
256
- "GRAPH_LOAD_ERROR" ,
257
- "Failed to load graph" ,
258
- f"Unexpected error loading graph '{ self .context .entrypoint } ': { str (e )} " ,
259
- UiPathErrorCategory .USER ,
260
- ) from e
184
+ pass
261
185
262
186
async def cleanup (self ):
263
- if hasattr (self , "graph_config" ) and self .graph_config :
264
- await self .graph_config .cleanup ()
187
+ pass
265
188
266
189
def _extract_graph_result (
267
190
self , final_chunk , graph : CompiledStateGraph [Any , Any , Any ]
@@ -377,3 +300,19 @@ def _pretty_print(self, stream_chunk: Union[Tuple[Any, Any], Dict[str, Any], Any
377
300
logger .info ("%s" , formatted_metadata )
378
301
except (TypeError , ValueError ):
379
302
pass
303
+
304
+
305
+ class LangGraphScriptRuntime (LangGraphRuntime ):
306
+ """
307
+ Resolves the graph from langgraph.json config file and passes it to the base runtime.
308
+ """
309
+
310
+ def __init__ (
311
+ self , context : LangGraphRuntimeContext , entrypoint : Optional [str ] = None
312
+ ):
313
+ self .resolver = GraphResolver (entrypoint = entrypoint )
314
+ super ().__init__ (context , self .resolver )
315
+
316
+ async def cleanup (self ):
317
+ await super ().cleanup ()
318
+ await self .resolver .cleanup ()
0 commit comments