Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "UiPath Langchain"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.10"
dependencies = [
"uipath>=2.1.49, <2.2.0",
"uipath>=2.1.28.dev1005170000,<2.1.28.dev1005180000",
"langgraph>=0.5.0, <0.7.0",
"langchain-core>=0.3.34",
"langgraph-checkpoint-sqlite>=2.0.3",
Expand Down Expand Up @@ -111,3 +111,7 @@ name = "testpypi"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true


[tool.uv.sources]
uipath = { index = "testpypi" }
62 changes: 56 additions & 6 deletions src/uipath_langchain/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
from contextlib import suppress
from typing import Any, Dict, List, Optional, Tuple, Union

from langchain_core.callbacks.base import BaseCallbackHandler
Expand All @@ -10,15 +11,29 @@
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.errors import EmptyInputError, GraphRecursionError, InvalidUpdateError
from langgraph.graph.state import CompiledStateGraph
from openinference.instrumentation.langchain import (
LangChainInstrumentor,
get_current_span,
)
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from uipath._cli._runtime._contracts import (
UiPathBaseRuntime,
UiPathErrorCategory,
UiPathRuntimeResult,
)
from uipath.tracing import TracingManager

from ..._utils import _instrument_traceable_attributes
from ...tracers import AsyncUiPathTracer
from .._utils._graph import LangGraphConfig
from ...tracers import (
AsyncUiPathTracer,
JsonFileExporter,
LangchainExporter,
SqliteExporter,
)
from ...tracers.LangchainSpanProcessor import LangchainSpanProcessor
from ._context import LangGraphRuntimeContext
from ._conversation import map_message
from ._exception import LangGraphRuntimeError
Expand Down Expand Up @@ -48,7 +63,41 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
Raises:
LangGraphRuntimeError: If execution fails
"""
_instrument_traceable_attributes()

use_otel_ff = os.getenv("UIPATH_USE_OTEL_TRACING", "false").lower() == "true"

# Instrument LangSmith @traceable to use appropriate adapter
# When OTEL is enabled: uses UiPath @traced()
# When OTEL is disabled: uses existing dispatch_event system
_instrument_traceable_attributes(useOtel=use_otel_ff)

if use_otel_ff:
try:
provider = TracerProvider()
trace.set_tracer_provider(provider)
provider.add_span_processor(BatchSpanProcessor(LangchainExporter())) # type: ignore

provider.add_span_processor(
BatchSpanProcessor(
JsonFileExporter(
".uipath/traces.jsonl", LangchainSpanProcessor()
)
)
)

provider.add_span_processor(
BatchSpanProcessor(
SqliteExporter(".uipath/traces.db", LangchainSpanProcessor())
)
)

LangChainInstrumentor().instrument(
tracer_provider=trace.get_tracer_provider()
)

TracingManager.register_current_span_provider(get_current_span)
except Exception as e:
logger.error(f"Failed to initialize OpenTelemetry tracing: {e}")

if self.context.state_graph is None:
return None
Expand All @@ -74,9 +123,10 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
# Set up tracing if available
callbacks: List[BaseCallbackHandler] = []

if self.context.job_id and self.context.tracing_enabled:
tracer = AsyncUiPathTracer(context=self.context.trace_context)
callbacks = [tracer]
if not use_otel_ff:
if self.context.job_id and self.context.tracing_enabled:
tracer = AsyncUiPathTracer(context=self.context.trace_context)
callbacks = [tracer]

graph_config: RunnableConfig = {
"configurable": {
Expand Down
63 changes: 63 additions & 0 deletions src/uipath_langchain/tracers/JsonFileExporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
import logging
import os
from typing import Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from uipath.tracing._utils import _SpanUtils

from .LangchainSpanProcessor import BaseSpanProcessor

logger = logging.getLogger(__name__)


class JsonFileExporter(SpanExporter):
"""
An exporter that writes spans to a file in JSON Lines format.

This exporter is useful for debugging and local development. It serializes
each span to a JSON object and appends it as a new line in the specified
file.
"""

def __init__(self, file_path: str, processor: BaseSpanProcessor):
"""
Initializes the JsonFileExporter.

Args:
file_path: The path to the JSON file where spans will be written.
"""
self.file_path = file_path
self._processor = processor
# Ensure the directory exists
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""
Exports a batch of spans.

Args:
spans: A sequence of ReadableSpan objects.

Returns:
The result of the export operation.
"""
try:
uipath_spans = [
_SpanUtils.otel_span_to_uipath_span(span).to_dict() for span in spans
]
processed_spans = [
self._processor.process_span(span) for span in uipath_spans
]
with open(self.file_path, "a") as f:
for span in processed_spans:
f.write(json.dumps(span) + "\n")
return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Failed to export spans to {self.file_path}: {e}")
return SpanExportResult.FAILURE

def shutdown(self) -> None:
"""Shuts down the exporter."""
pass
27 changes: 27 additions & 0 deletions src/uipath_langchain/tracers/LangchainExporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
from typing import Any, Dict

from opentelemetry.sdk.trace.export import (
SpanExportResult,
)
from uipath.tracing import LlmOpsHttpExporter

from .LangchainSpanProcessor import LangchainSpanProcessor

logger = logging.getLogger(__name__)


class LangchainExporter(LlmOpsHttpExporter):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._processor = LangchainSpanProcessor()

def _send_with_retries(
self, url: str, payload: list[Dict[str, Any]], max_retries: int = 4
) -> SpanExportResult:
processed_payload = [self._processor.process_span(span) for span in payload]
return super()._send_with_retries(
url=url,
payload=processed_payload,
max_retries=max_retries,
)
Loading
Loading