Skip to content

Commit da8d754

Browse files
committed
feat(trace): add experimental otel trace
1 parent 6eaa07b commit da8d754

File tree

7 files changed

+1061
-5
lines changed

7 files changed

+1061
-5
lines changed

src/uipath_langchain/_cli/_runtime/_runtime.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import logging
33
import os
4+
from contextlib import suppress
45
from typing import Any, Dict, List, Optional, Tuple, Union
56

67
from langchain_core.callbacks.base import BaseCallbackHandler
@@ -10,15 +11,24 @@
1011
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
1112
from langgraph.errors import EmptyInputError, GraphRecursionError, InvalidUpdateError
1213
from langgraph.graph.state import CompiledStateGraph
14+
from openinference.instrumentation.langchain import LangChainInstrumentor
15+
from opentelemetry import trace
16+
from opentelemetry.sdk.trace import TracerProvider
17+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1318
from uipath._cli._runtime._contracts import (
1419
UiPathBaseRuntime,
1520
UiPathErrorCategory,
1621
UiPathRuntimeResult,
1722
)
1823

1924
from ..._utils import _instrument_traceable_attributes
20-
from ...tracers import AsyncUiPathTracer
21-
from .._utils._graph import LangGraphConfig
25+
from ...tracers import (
26+
AsyncUiPathTracer,
27+
JsonFileExporter,
28+
LangchainExporter,
29+
SqliteExporter,
30+
)
31+
from ...tracers.LangchainSpanProcessor import LangchainSpanProcessor
2232
from ._context import LangGraphRuntimeContext
2333
from ._conversation import map_message
2434
from ._exception import LangGraphRuntimeError
@@ -50,6 +60,27 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
5060
"""
5161
_instrument_traceable_attributes()
5262

63+
with suppress(Exception):
64+
provider = TracerProvider()
65+
trace.set_tracer_provider(provider)
66+
provider.add_span_processor(BatchSpanProcessor(LangchainExporter())) # type: ignore
67+
68+
provider.add_span_processor(
69+
BatchSpanProcessor(
70+
JsonFileExporter(".uipath/traces.jsonl", LangchainSpanProcessor())
71+
)
72+
)
73+
74+
provider.add_span_processor(
75+
BatchSpanProcessor(
76+
SqliteExporter(".uipath/traces.db", LangchainSpanProcessor())
77+
)
78+
)
79+
80+
LangChainInstrumentor().instrument(
81+
tracer_provider=trace.get_tracer_provider()
82+
)
83+
5384
if self.context.state_graph is None:
5485
return None
5586

@@ -74,9 +105,9 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
74105
# Set up tracing if available
75106
callbacks: List[BaseCallbackHandler] = []
76107

77-
if self.context.job_id and self.context.tracing_enabled:
78-
tracer = AsyncUiPathTracer(context=self.context.trace_context)
79-
callbacks = [tracer]
108+
# if self.context.job_id and self.context.tracing_enabled:
109+
# tracer = AsyncUiPathTracer(context=self.context.trace_context)
110+
# callbacks = [tracer]
80111

81112
graph_config: RunnableConfig = {
82113
"configurable": {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import json
2+
import logging
3+
import os
4+
from typing import Sequence
5+
6+
from opentelemetry.sdk.trace import ReadableSpan
7+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
8+
from uipath.tracing._utils import _SpanUtils
9+
10+
from .LangchainSpanProcessor import BaseSpanProcessor
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class JsonFileExporter(SpanExporter):
16+
"""
17+
An exporter that writes spans to a file in JSON Lines format.
18+
19+
This exporter is useful for debugging and local development. It serializes
20+
each span to a JSON object and appends it as a new line in the specified
21+
file.
22+
"""
23+
24+
def __init__(self, file_path: str, processor: BaseSpanProcessor):
25+
"""
26+
Initializes the JsonFileExporter.
27+
28+
Args:
29+
file_path: The path to the JSON file where spans will be written.
30+
"""
31+
self.file_path = file_path
32+
self._processor = processor
33+
# Ensure the directory exists
34+
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
35+
36+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
37+
"""
38+
Exports a batch of spans.
39+
40+
Args:
41+
spans: A sequence of ReadableSpan objects.
42+
43+
Returns:
44+
The result of the export operation.
45+
"""
46+
try:
47+
uipath_spans = [
48+
_SpanUtils.otel_span_to_uipath_span(span).to_dict() for span in spans
49+
]
50+
processed_spans = [
51+
self._processor.process_span(span) for span in uipath_spans
52+
]
53+
with open(self.file_path, "a") as f:
54+
for span in processed_spans:
55+
f.write(json.dumps(span) + "\n")
56+
return SpanExportResult.SUCCESS
57+
except Exception as e:
58+
logger.error(f"Failed to export spans to {self.file_path}: {e}")
59+
return SpanExportResult.FAILURE
60+
61+
def shutdown(self) -> None:
62+
"""Shuts down the exporter."""
63+
pass
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import logging
2+
from typing import Any, Dict
3+
4+
from opentelemetry.sdk.trace.export import (
5+
SpanExportResult,
6+
)
7+
from uipath.tracing import LlmOpsHttpExporter
8+
9+
from .LangchainSpanProcessor import LangchainSpanProcessor
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class LangchainExporter(LlmOpsHttpExporter):
15+
def __init__(self, **kwargs):
16+
super().__init__(**kwargs)
17+
self._processor = LangchainSpanProcessor()
18+
19+
def _send_with_retries(
20+
self, url: str, payload: list[Dict[str, Any]], max_retries: int = 4
21+
) -> SpanExportResult:
22+
processed_payload = [self._processor.process_span(span) for span in payload]
23+
return super()._send_with_retries(
24+
url=url,
25+
payload=processed_payload,
26+
max_retries=max_retries,
27+
)

0 commit comments

Comments
 (0)