diff --git a/pyproject.toml b/pyproject.toml index acdd953e..b5468723 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "UiPath Langchain" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" dependencies = [ - "uipath>=2.1.49, <2.2.0", + "uipath>=2.1.28.dev1005170000,<2.1.28.dev1005180000", "langgraph>=0.5.0, <0.7.0", "langchain-core>=0.3.34", "langgraph-checkpoint-sqlite>=2.0.3", @@ -111,3 +111,7 @@ name = "testpypi" url = "https://test.pypi.org/simple/" publish-url = "https://test.pypi.org/legacy/" explicit = true + + +[tool.uv.sources] +uipath = { index = "testpypi" } \ No newline at end of file diff --git a/src/uipath_langchain/_cli/_runtime/_runtime.py b/src/uipath_langchain/_cli/_runtime/_runtime.py index 6d15f87a..2a119b9e 100644 --- a/src/uipath_langchain/_cli/_runtime/_runtime.py +++ b/src/uipath_langchain/_cli/_runtime/_runtime.py @@ -1,6 +1,7 @@ import json import logging import os +from contextlib import suppress from typing import Any, Dict, List, Optional, Tuple, Union from langchain_core.callbacks.base import BaseCallbackHandler @@ -10,15 +11,29 @@ from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.errors import EmptyInputError, GraphRecursionError, InvalidUpdateError from langgraph.graph.state import CompiledStateGraph +from openinference.instrumentation.langchain import ( + LangChainInstrumentor, + get_current_span, +) +from openinference.instrumentation.langchain import LangChainInstrumentor +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor from uipath._cli._runtime._contracts import ( UiPathBaseRuntime, UiPathErrorCategory, UiPathRuntimeResult, ) +from uipath.tracing import TracingManager from ..._utils import _instrument_traceable_attributes -from ...tracers import AsyncUiPathTracer -from .._utils._graph import LangGraphConfig +from ...tracers import ( + AsyncUiPathTracer, + JsonFileExporter, + LangchainExporter, + SqliteExporter, +) +from ...tracers.LangchainSpanProcessor import LangchainSpanProcessor from ._context import LangGraphRuntimeContext from ._conversation import map_message from ._exception import LangGraphRuntimeError @@ -48,7 +63,41 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: Raises: LangGraphRuntimeError: If execution fails """ - _instrument_traceable_attributes() + + use_otel_ff = os.getenv("UIPATH_USE_OTEL_TRACING", "false").lower() == "true" + + # Instrument LangSmith @traceable to use appropriate adapter + # When OTEL is enabled: uses UiPath @traced() + # When OTEL is disabled: uses existing dispatch_event system + _instrument_traceable_attributes(useOtel=use_otel_ff) + + if use_otel_ff: + try: + provider = TracerProvider() + trace.set_tracer_provider(provider) + provider.add_span_processor(BatchSpanProcessor(LangchainExporter())) # type: ignore + + provider.add_span_processor( + BatchSpanProcessor( + JsonFileExporter( + ".uipath/traces.jsonl", LangchainSpanProcessor() + ) + ) + ) + + provider.add_span_processor( + BatchSpanProcessor( + SqliteExporter(".uipath/traces.db", LangchainSpanProcessor()) + ) + ) + + LangChainInstrumentor().instrument( + tracer_provider=trace.get_tracer_provider() + ) + + TracingManager.register_current_span_provider(get_current_span) + except Exception as e: + logger.error(f"Failed to initialize OpenTelemetry tracing: {e}") if self.context.state_graph is None: return None @@ -74,9 +123,10 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: # Set up tracing if available callbacks: List[BaseCallbackHandler] = [] - if self.context.job_id and self.context.tracing_enabled: - tracer = AsyncUiPathTracer(context=self.context.trace_context) - callbacks = [tracer] + if not use_otel_ff: + if self.context.job_id and self.context.tracing_enabled: + tracer = AsyncUiPathTracer(context=self.context.trace_context) + callbacks = [tracer] graph_config: RunnableConfig = { "configurable": { diff --git a/src/uipath_langchain/tracers/JsonFileExporter.py b/src/uipath_langchain/tracers/JsonFileExporter.py new file mode 100644 index 00000000..03df2d77 --- /dev/null +++ b/src/uipath_langchain/tracers/JsonFileExporter.py @@ -0,0 +1,63 @@ +import json +import logging +import os +from typing import Sequence + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from uipath.tracing._utils import _SpanUtils + +from .LangchainSpanProcessor import BaseSpanProcessor + +logger = logging.getLogger(__name__) + + +class JsonFileExporter(SpanExporter): + """ + An exporter that writes spans to a file in JSON Lines format. + + This exporter is useful for debugging and local development. It serializes + each span to a JSON object and appends it as a new line in the specified + file. + """ + + def __init__(self, file_path: str, processor: BaseSpanProcessor): + """ + Initializes the JsonFileExporter. + + Args: + file_path: The path to the JSON file where spans will be written. + """ + self.file_path = file_path + self._processor = processor + # Ensure the directory exists + os.makedirs(os.path.dirname(self.file_path), exist_ok=True) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """ + Exports a batch of spans. + + Args: + spans: A sequence of ReadableSpan objects. + + Returns: + The result of the export operation. + """ + try: + uipath_spans = [ + _SpanUtils.otel_span_to_uipath_span(span).to_dict() for span in spans + ] + processed_spans = [ + self._processor.process_span(span) for span in uipath_spans + ] + with open(self.file_path, "a") as f: + for span in processed_spans: + f.write(json.dumps(span) + "\n") + return SpanExportResult.SUCCESS + except Exception as e: + logger.error(f"Failed to export spans to {self.file_path}: {e}") + return SpanExportResult.FAILURE + + def shutdown(self) -> None: + """Shuts down the exporter.""" + pass diff --git a/src/uipath_langchain/tracers/LangchainExporter.py b/src/uipath_langchain/tracers/LangchainExporter.py new file mode 100644 index 00000000..d375337b --- /dev/null +++ b/src/uipath_langchain/tracers/LangchainExporter.py @@ -0,0 +1,27 @@ +import logging +from typing import Any, Dict + +from opentelemetry.sdk.trace.export import ( + SpanExportResult, +) +from uipath.tracing import LlmOpsHttpExporter + +from .LangchainSpanProcessor import LangchainSpanProcessor + +logger = logging.getLogger(__name__) + + +class LangchainExporter(LlmOpsHttpExporter): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._processor = LangchainSpanProcessor() + + def _send_with_retries( + self, url: str, payload: list[Dict[str, Any]], max_retries: int = 4 + ) -> SpanExportResult: + processed_payload = [self._processor.process_span(span) for span in payload] + return super()._send_with_retries( + url=url, + payload=processed_payload, + max_retries=max_retries, + ) diff --git a/src/uipath_langchain/tracers/LangchainSpanProcessor.py b/src/uipath_langchain/tracers/LangchainSpanProcessor.py new file mode 100644 index 00000000..53830359 --- /dev/null +++ b/src/uipath_langchain/tracers/LangchainSpanProcessor.py @@ -0,0 +1,385 @@ +import json +import logging +from abc import ABC, abstractmethod +from typing import Any, Dict, MutableMapping + +from typing_extensions import override + +logger = logging.getLogger(__name__) + + +def try_convert_json(flat_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Tries to convert stringified JSON values in a flattened dictionary back to their original types. + + Args: + flat_dict: A dictionary with potentially stringified JSON values. + + Returns: + A new dictionary with JSON strings converted to their original types. + """ + result = {} + for key, value in flat_dict.items(): + if isinstance(value, str): + try: + result[key] = json.loads(value) + except json.JSONDecodeError: + result[key] = value + else: + result[key] = value + return result + + +def unflatten_dict(flat_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Converts a flattened dictionary with dot-separated keys into a nested dictionary. + + Args: + flat_dict: Dictionary with dot-separated keys (e.g., 'llm.output_messages.0.message.content') + + Returns: + Nested dictionary structure + + Example: + Input: {'llm.output_messages.0.message.content': 'hello', 'llm.model': 'gpt-4'} + Output: {'llm': {'output_messages': [{'message': {'content': 'hello'}}], 'model': 'gpt-4'}} + """ + result = {} + + for key, value in flat_dict.items(): + # Split the key by dots + parts = key.split(".") + current = result + + # Navigate/create the nested structure + for i, part in enumerate(parts[:-1]): + # Check if this part represents an array index + if part.isdigit(): + # Convert to integer index + index = int(part) + # Ensure the parent is a list + if not isinstance(current, list): + raise ValueError( + f"Expected list but found {type(current)} for key: {key}" + ) + # Extend the list if necessary + while len(current) <= index: + current.append(None) + + # If the current element is None, we need to create a structure for it + if current[index] is None: + # Look ahead to see if the next part is a digit (array index) + next_part = parts[i + 1] if i + 1 < len(parts) else None + if next_part and next_part.isdigit(): + current[index] = [] + else: + current[index] = {} + + current = current[index] + else: + # Regular dictionary key + if part not in current: + # Look ahead to see if the next part is a digit (array index) + next_part = parts[i + 1] if i + 1 < len(parts) else None + if next_part and next_part.isdigit(): + current[part] = [] + else: + current[part] = {} + current = current[part] # Set the final value + + final_key = parts[-1] + if final_key.isdigit(): + # If the final key is a digit, we're setting an array element + index = int(final_key) + if not isinstance(current, list): + raise ValueError( + f"Expected list but found {type(current)} for key: {key}" + ) + while len(current) <= index: + current.append(None) + current[index] = value + else: + # Regular key assignment + current[final_key] = value + + return result + + +def safe_get(data: Dict[str, Any], path: str, default=None): + """Safely get nested value using dot notation.""" + keys = path.split(".") + current = data + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + else: + return default + return current + + +def safe_parse_json(value): + """Safely parse JSON string.""" + if isinstance(value, str): + try: + return json.loads(value.replace("'", '"')) + except json.JSONDecodeError: + return value + return value + + +class BaseSpanProcessor(ABC): + """ + Abstract base class for span processors. + + Defines the interface for processing spans with a single abstract method. + """ + + @abstractmethod + def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + """ + Process a span and return the transformed data. + + Args: + span_data: The span data to process + + Returns: + Processed span data + """ + pass + + +class LangchainSpanProcessor(BaseSpanProcessor): + """ + A class to process spans, applying custom attribute and type mappings. + + This processor can transform flattened attribute keys (e.g., 'llm.output_messages.0.message.role') + into nested dictionary structures for easier access and processing. + + Example usage: + # With unflattening enabled + processor = LangchainSpanProcessor(unflatten_attributes=True, dump_attributes_as_string=False) + processed_span = processor.process_span(span_data) + + # Access nested attributes naturally: + role = processed_span['attributes']['llm']['output_messages'][0]['message']['role'] + + # Without unflattening (original behavior) + processor = LangchainSpanProcessor(unflatten_attributes=False) + processed_span = processor.process_span(span_data) + + # Access with flattened keys: + role = processed_span['attributes']['llm.output_messages.0.message.role'] + """ + + # Mapping of old attribute names to new attribute names or (new name, function) + ATTRIBUTE_MAPPING = { + "input.value": ("input", lambda s: json.loads(s)), + "output.value": ("output", lambda s: json.loads(s)), + "llm.model_name": "model", + } + + # Mapping of span types + SPAN_TYPE_MAPPING = { + "LLM": "completion", + "TOOL": "toolCall", + # Add more mappings as needed + } + + def __init__( + self, + dump_attributes_as_string: bool = True, + unflatten_attributes: bool = True, + map_json_fields: bool = True, + ): + """ + Initializes the LangchainSpanProcessor. + + Args: + dump_attributes_as_string: If True, dumps attributes as a JSON string. + Otherwise, attributes are set as a dictionary. + unflatten_attributes: If True, converts flattened dot-separated keys + into nested dictionary structures. + map_json_fields: If True, applies JSON field mapping transformations + for tool calls and LLM calls. + """ + self._dump_attributes_as_string = dump_attributes_as_string + self._unflatten_attributes = unflatten_attributes + self._map_json_fields = map_json_fields + + def extract_attributes(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + """Extract and parse attributes from span_data, checking both 'Attributes' and 'attributes' keys.""" + for key in ["Attributes", "attributes"]: + if key in span_data: + value = span_data.pop(key) + if isinstance(value, str): + try: + parsed_value = json.loads(value) + return parsed_value if isinstance(parsed_value, dict) else {} + except json.JSONDecodeError: + logger.warning(f"Failed to parse attributes JSON: {value}") + return {} + elif isinstance(value, dict): + return value + else: + return {} + return {} + + @override + def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + logger.info(f"Processing span: {span_data}") + attributes = self.extract_attributes(span_data) + + if attributes and isinstance(attributes, dict): + if "openinference.span.kind" in attributes: + # Remove the span kind attribute + span_type = attributes["openinference.span.kind"] + # Map span type using SPAN_TYPE_MAPPING + span_data["SpanType"] = self.SPAN_TYPE_MAPPING.get(span_type, span_type) + del attributes["openinference.span.kind"] + + # Apply the transformation logic + for old_key, mapping in self.ATTRIBUTE_MAPPING.items(): + if old_key in attributes: + if isinstance(mapping, tuple): + new_key, func = mapping + try: + attributes[new_key] = func(attributes[old_key]) + except Exception: + attributes[new_key] = attributes[old_key] + else: + new_key = mapping + attributes[new_key] = attributes[old_key] + del attributes[old_key] + + if attributes: + # Apply unflattening if requested (before JSON field mapping) + if self._unflatten_attributes: + try: + attributes = try_convert_json(attributes) + attributes = unflatten_dict(attributes) + except Exception as e: + logger.warning(f"Failed to unflatten attributes: {e}") + + # Set attributes in span_data as dictionary for JSON field mapping + span_data["attributes"] = attributes + + # Apply JSON field mapping before final serialization + if self._map_json_fields: + span_data = self.map_json_fields_from_attributes(span_data) + + # Convert back to JSON string if requested (after all transformations) + if self._dump_attributes_as_string: + span_data["attributes"] = json.dumps(span_data["attributes"]) + + return span_data + + def map_tool_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """Simple tool call mapping - just add new fields.""" + result = attributes.copy() # Keep originals + + # Add new fields + result["type"] = "toolCall" + result["callId"] = attributes.get("call_id") or attributes.get("id") + result["toolName"] = safe_get(attributes, "tool.name") + result["arguments"] = safe_parse_json(attributes.get("input", "{}")) + result["toolType"] = "Integration" + result["result"] = safe_parse_json(attributes.get("output")) + result["error"] = None + + return result + + def map_llm_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """Simple LLM call mapping - just add new fields.""" + result = attributes.copy() # Keep originals + + # Transform token usage data if present (after unflattening) + # Use safe_get to extract token count values from nested structure + prompt_tokens = safe_get(attributes, "llm.token_count.prompt") + completion_tokens = safe_get(attributes, "llm.token_count.completion") + total_tokens = safe_get(attributes, "llm.token_count.total") + + usage = { + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + "totalTokens": total_tokens, + "isByoExecution": False, + "executionDeploymentType": "PAYGO", + "isPiiMasked": False, + } + + # remove None values + usage = {k: v for k, v in usage.items() if v is not None} + + result["usage"] = usage + + # Add new fields + result["input"] = safe_get(attributes, "llm.input_messages") + result["output"] = safe_get(attributes, "llm.output_messages") + + result["type"] = "completion" + result["model"] = safe_get(attributes, "llm.invocation_parameters.model") + + # Settings + settings = {} + max_tokens = safe_get(attributes, "llm.invocation_parameters.max_tokens") + temperature = safe_get(attributes, "llm.invocation_parameters.temperature") + if max_tokens: + settings["maxTokens"] = max_tokens + if temperature is not None: + settings["temperature"] = temperature + if settings: + result["settings"] = settings + + # Tool calls (simplified) + tool_calls = [] + output_msgs = safe_get(attributes, "llm.output_messages", []) + for msg in output_msgs: + msg_tool_calls = safe_get(msg, "message.tool_calls", []) + for tc in msg_tool_calls: + tool_call_data = tc.get("tool_call", {}) + tool_calls.append( + { + "id": tool_call_data.get("id"), + "name": safe_get(tool_call_data, "function.name"), + "arguments": safe_get(tool_call_data, "function.arguments", {}), + } + ) + if tool_calls: + result["toolCalls"] = tool_calls + + # Usage (enhance existing if not created above) + if "usage" in result: + usage = result["usage"] + if isinstance(usage, dict): + usage.setdefault("isByoExecution", False) + usage.setdefault("executionDeploymentType", "PAYGO") + usage.setdefault("isPiiMasked", False) + + return result + + def map_json_fields_from_attributes( + self, span_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Simple mapping dispatcher.""" + if "attributes" not in span_data: + return span_data + + attributes = span_data["attributes"] + + # Parse if string + if isinstance(attributes, str): + try: + attributes = json.loads(attributes) + except json.JSONDecodeError: + return span_data + + if not isinstance(attributes, dict): + return span_data + + # Simple detection and mapping + if "tool" in attributes or span_data.get("SpanType") == "toolCall": + span_data["attributes"] = self.map_tool_call_attributes(attributes) + elif "llm" in attributes or span_data.get("SpanType") == "completion": + span_data["attributes"] = self.map_llm_call_attributes(attributes) + + return span_data diff --git a/src/uipath_langchain/tracers/SqliteExporter.py b/src/uipath_langchain/tracers/SqliteExporter.py new file mode 100644 index 00000000..8e14ea4d --- /dev/null +++ b/src/uipath_langchain/tracers/SqliteExporter.py @@ -0,0 +1,116 @@ +import json +import logging +import os +import sqlite3 +from typing import Sequence + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from uipath.tracing._utils import _SpanUtils + +from .LangchainSpanProcessor import BaseSpanProcessor, LangchainSpanProcessor + +logger = logging.getLogger(__name__) + + +class SqliteExporter(SpanExporter): + """ + An exporter that writes spans to a SQLite database file. + + This exporter is useful for debugging and local development. It serializes + the spans and inserts them into a 'spans' table in the specified database. + """ + + def __init__(self, db_path: str, processor: BaseSpanProcessor = None): + """ + Initializes the SqliteExporter. + + Args: + db_path: The path to the SQLite database file. + """ + self.db_path = db_path + self._processor = processor + # Ensure the directory exists + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + self._create_table() + + def _create_table(self): + """Creates the 'spans' table if it doesn't already exist.""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS spans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + trace_id TEXT, + span_id TEXT, + parent_span_id TEXT, + name TEXT, + start_time TEXT, + end_time TEXT, + span_type TEXT, + attributes TEXT + ) + """ + ) + conn.commit() + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """ + Exports a batch of spans to the SQLite database. + + Args: + spans: A sequence of ReadableSpan objects. + + Returns: + The result of the export operation. + """ + try: + uipath_spans = [ + _SpanUtils.otel_span_to_uipath_span(span).to_dict() for span in spans + ] + + if self._processor: + processed_spans = [ + self._processor.process_span(span) for span in uipath_spans + ] + else: + processed_spans = uipath_spans + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + for span in processed_spans: + # The 'attributes' field is a JSON string, so we store it as TEXT. + attributes_json = span.get("attributes", "{}") + if not isinstance(attributes_json, str): + attributes_json = json.dumps(attributes_json) + + cursor.execute( + """ + INSERT INTO spans ( + trace_id, span_id, parent_span_id, name, + start_time, end_time, span_type, attributes + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + span.get("TraceId"), + span.get("SpanId"), + span.get("ParentSpanId"), + span.get("Name"), + span.get("StartTime"), + span.get("EndTime"), + span.get("SpanType"), + attributes_json, + ), + ) + conn.commit() + + return SpanExportResult.SUCCESS + except Exception as e: + logger.error(f"Failed to export spans to {self.db_path}: {e}") + return SpanExportResult.FAILURE + + def shutdown(self) -> None: + """Shuts down the exporter.""" + pass diff --git a/src/uipath_langchain/tracers/__init__.py b/src/uipath_langchain/tracers/__init__.py index 891379a3..638e66f3 100644 --- a/src/uipath_langchain/tracers/__init__.py +++ b/src/uipath_langchain/tracers/__init__.py @@ -1,7 +1,13 @@ from ._instrument_traceable import _instrument_traceable_attributes from .AsyncUiPathTracer import AsyncUiPathTracer +from .JsonFileExporter import JsonFileExporter +from .LangchainExporter import LangchainExporter +from .SqliteExporter import SqliteExporter __all__ = [ "AsyncUiPathTracer", "_instrument_traceable_attributes", + "LangchainExporter", + "JsonFileExporter", + "SqliteExporter", ] diff --git a/src/uipath_langchain/tracers/_instrument_traceable.py b/src/uipath_langchain/tracers/_instrument_traceable.py index 7cc20cfd..939ad171 100644 --- a/src/uipath_langchain/tracers/_instrument_traceable.py +++ b/src/uipath_langchain/tracers/_instrument_traceable.py @@ -7,7 +7,7 @@ from typing import Any, Callable, Dict, List, Literal, Optional from langchain_core.callbacks import dispatch_custom_event -from uipath.tracing import TracingManager +from uipath.tracing import TracingManager, traced from ._events import CustomTraceEvents, FunctionCallEventData @@ -387,12 +387,112 @@ def register_uipath_tracing(): # Apply the patch -def _instrument_traceable_attributes(): +def _map_traceable_to_traced_args( + run_type: Optional[str] = None, + name: Optional[str] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, +) -> Dict[str, Any]: + """ + Map LangSmith @traceable arguments to UiPath @traced() arguments. + + Args: + run_type: Function type (tool, chain, llm, retriever, etc.) + name: Custom name for the traced function + tags: List of tags for categorization + metadata: Additional metadata dictionary + **kwargs: Additional arguments (ignored) + + Returns: + Dict containing mapped arguments for @traced() + """ + traced_args = {} + + # Direct mappings + if name is not None: + traced_args["name"] = name + + # Pass through run_type directly to UiPath @traced() + if run_type: + traced_args["run_type"] = run_type + + # For span_type, we can derive from run_type or use a default + if run_type: + # Map run_type to appropriate span_type for OpenTelemetry + span_type_mapping = { + "tool": "tool_call", + "chain": "chain_execution", + "llm": "llm_call", + "retriever": "retrieval", + "embedding": "embedding", + "prompt": "prompt_template", + "parser": "output_parser" + } + traced_args["span_type"] = span_type_mapping.get(run_type, run_type) + + # Note: UiPath @traced() doesn't support custom attributes directly + # Tags and metadata information is lost in the current mapping + # This could be enhanced in future versions + + return traced_args + + +def otel_traceable_adapter( + func: Optional[Callable] = None, + *, + run_type: Optional[str] = None, + name: Optional[str] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, +): + """ + OTEL-based adapter that converts LangSmith @traceable decorator calls to UiPath @traced(). + + This function maintains the same interface as LangSmith's @traceable but uses + UiPath's OpenTelemetry-based tracing system underneath. + + Args: + func: Function to be decorated (when used without parentheses) + run_type: Type of function (tool, chain, llm, etc.) + name: Custom name for tracing + tags: List of tags for categorization + metadata: Additional metadata dictionary + **kwargs: Additional arguments (for future compatibility) + + Returns: + Decorated function or decorator function + """ + def decorator(f: Callable) -> Callable: + # Map arguments to @traced() format + traced_args = _map_traceable_to_traced_args( + run_type=run_type, + name=name, + tags=tags, + metadata=metadata, + **kwargs + ) + + # Apply UiPath @traced() decorator + return traced(**traced_args)(f) + + # Handle both @traceable and @traceable(...) usage patterns + if func is None: + # Called as @traceable(...) - return decorator + return decorator + else: + # Called as @traceable - apply decorator directly + return decorator(func) + + +def _instrument_traceable_attributes(useOtel: bool = False): """Apply the patch to langsmith module at import time.""" global original_langsmith, original_traceable - # Register our custom tracing decorator - register_uipath_tracing() + if not useOtel: + # Register our custom tracing decorator when not using opentelemetry + register_uipath_tracing() # Import the original module if not already done if original_langsmith is None: @@ -408,7 +508,12 @@ def _instrument_traceable_attributes(): original_traceable = original_langsmith.traceable # Replace the traceable function with our patched version - original_langsmith.traceable = patched_traceable + if useOtel: + # Use OTEL-based adapter when OTEL is enabled + original_langsmith.traceable = otel_traceable_adapter + else: + # Use existing dispatch_event-based adapter + original_langsmith.traceable = patched_traceable # Put our modified module back sys.modules["langsmith"] = original_langsmith diff --git a/tests/tracers/test_langchain_span_processor.py b/tests/tracers/test_langchain_span_processor.py new file mode 100644 index 00000000..a4399df6 --- /dev/null +++ b/tests/tracers/test_langchain_span_processor.py @@ -0,0 +1,441 @@ +"""Tests for LangchainSpanProcessor.""" + +import json + +import pytest + +from uipath_langchain.tracers.LangchainSpanProcessor import ( + LangchainSpanProcessor, + unflatten_dict, +) + + +class TestUnflattenDict: + """Test the unflatten_dict utility function.""" + + def test_simple_unflatten(self): + """Test basic unflattening functionality.""" + flat_dict = {"user.name": "John", "user.age": 30, "settings.theme": "dark"} + + result = unflatten_dict(flat_dict) + + assert result == { + "user": {"name": "John", "age": 30}, + "settings": {"theme": "dark"}, + } + + def test_array_unflatten(self): + """Test unflattening with array indices.""" + flat_dict = { + "items.0.name": "first", + "items.0.value": 1, + "items.1.name": "second", + "items.1.value": 2, + } + + result = unflatten_dict(flat_dict) + + expected = { + "items": [{"name": "first", "value": 1}, {"name": "second", "value": 2}] + } + assert result == expected + + def test_nested_arrays(self): + """Test deeply nested structures with arrays.""" + flat_dict = { + "llm.messages.0.content": "hello", + "llm.messages.0.tools.0.name": "tool1", + "llm.messages.0.tools.1.name": "tool2", + "llm.provider": "azure", + } + + result = unflatten_dict(flat_dict) + + expected = { + "llm": { + "messages": [ + { + "content": "hello", + "tools": [{"name": "tool1"}, {"name": "tool2"}], + } + ], + "provider": "azure", + } + } + assert result == expected + + def test_sparse_arrays(self): + """Test arrays with gaps in indices.""" + flat_dict = {"items.0.name": "first", "items.2.name": "third"} + + result = unflatten_dict(flat_dict) + + expected = {"items": [{"name": "first"}, None, {"name": "third"}]} + assert result == expected + + def test_empty_dict(self): + """Test with empty dictionary.""" + result = unflatten_dict({}) + assert result == {} + + def test_single_level_keys(self): + """Test with keys that don't need unflattening.""" + flat_dict = {"name": "value", "number": 42} + result = unflatten_dict(flat_dict) + assert result == flat_dict + + +class TestLangchainSpanProcessor: + """Test the LangchainSpanProcessor class.""" + + def test_init_defaults(self): + """Test initialization with default parameters.""" + processor = LangchainSpanProcessor() + assert processor._dump_attributes_as_string is True + assert processor._unflatten_attributes is True + + def test_init_custom_params(self): + """Test initialization with custom parameters.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + assert processor._dump_attributes_as_string is False + assert processor._unflatten_attributes is True + + def test_process_span_without_attributes(self): + """Test processing span without attributes.""" + processor = LangchainSpanProcessor() + span_data = {"Id": "test-id", "Name": "TestSpan"} + + result = processor.process_span(span_data) + assert result == span_data + + def test_process_span_with_unflatten_disabled(self): + """Test processing span with unflattening disabled.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=False + ) + + attributes = { + "llm.output_messages.0.role": "assistant", + "llm.provider": "azure", + "model": "gpt-4", + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + + # Should keep flattened structure + assert result["attributes"]["llm.output_messages.0.role"] == "assistant" + assert result["attributes"]["llm.provider"] == "azure" + assert result["attributes"]["model"] == "gpt-4" + + def test_process_span_with_unflatten_enabled(self): + """Test processing span with unflattening enabled.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + attributes = { + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.content": "Hello", + "llm.output_messages.0.message.tool_calls.0.function.name": "get_time", + "llm.provider": "azure", + "model": "gpt-4", + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + + # Should have nested structure + attrs = result["attributes"] + assert attrs["llm"]["output_messages"][0]["message"]["role"] == "assistant" + assert attrs["llm"]["output_messages"][0]["message"]["content"] == "Hello" + assert ( + attrs["llm"]["output_messages"][0]["message"]["tool_calls"][0]["function"][ + "name" + ] + == "get_time" + ) + assert attrs["llm"]["provider"] == "azure" + assert attrs["model"] == "gpt-4" + + def test_process_span_with_unflatten_and_json_output(self): + """Test processing span with unflattening and JSON string output.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=True, unflatten_attributes=True + ) + + attributes = {"llm.provider": "azure", "llm.messages.0.role": "user"} + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + + # Should be JSON string + assert isinstance(result["attributes"], str) + + # Parse and verify nested structure + parsed = json.loads(result["attributes"]) + assert parsed["llm"]["provider"] == "azure" + assert parsed["llm"]["messages"][0]["role"] == "user" + + def test_attribute_mapping_with_unflatten(self): + """Test that attribute mapping works with unflattening.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + attributes = { + "llm.model_name": "gpt-4", # Should be mapped to "model" + "llm.output_messages.0.role": "assistant", + "input.value": '{"text": "hello"}', # Should be parsed + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + attrs = result["attributes"] + + # Check mapping worked + assert attrs["model"] == "gpt-4" + assert attrs["input"] == {"text": "hello"} + + # Check unflattening worked + assert attrs["llm"]["output_messages"][0]["role"] == "assistant" + + def test_token_usage_processing_with_unflatten(self): + """Test token usage processing with unflattening.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + attributes = { + "llm.token_count.prompt": 100, + "llm.token_count.completion": 50, + "llm.token_count.total": 150, + "llm.provider": "azure", + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + attrs = result["attributes"] + + # Check usage structure + assert attrs["usage"]["promptTokens"] == 100 + assert attrs["usage"]["completionTokens"] == 50 + assert attrs["usage"]["totalTokens"] == 150 + assert attrs["usage"]["isByoExecution"] is False + + # Check unflattening of other attributes + assert attrs["llm"]["provider"] == "azure" + + def test_unflatten_error_handling(self): + """Test error handling in unflattening.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Create a scenario that might cause unflattening issues + # This should be handled gracefully + attributes = {"normal.key": "value", "llm.provider": "azure"} + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + # Should not raise an exception + result = processor.process_span(span_data) + assert "attributes" in result + + def test_process_span_with_dict_attributes_unflatten_enabled(self): + """Test processing span with dictionary attributes and unflattening enabled.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Simulate the real-world case where Attributes is already a dictionary + attributes = { + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "call_123", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_time", + "llm.provider": "azure", + "model": "gpt-4", + } + + span_data = { + "Id": "test-id", + "Attributes": attributes, # Already a dictionary, not a JSON string + } + + result = processor.process_span(span_data) + + # Should have nested structure + attrs = result["attributes"] + assert attrs["llm"]["output_messages"][0]["message"]["role"] == "assistant" + assert ( + attrs["llm"]["output_messages"][0]["message"]["tool_calls"][0]["tool_call"][ + "id" + ] + == "call_123" + ) + assert ( + attrs["llm"]["output_messages"][0]["message"]["tool_calls"][0]["tool_call"][ + "function" + ]["name"] + == "get_time" + ) + assert attrs["llm"]["provider"] == "azure" + assert attrs["model"] == "gpt-4" + + def test_real_world_trace_unflatten(self): + """Test with real-world trace data to verify unflattening works correctly.""" + processor = LangchainSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Real trace data from user's example (dictionary format) + real_trace_attributes = { + "input.mime_type": "application/json", + "output.mime_type": "application/json", + "llm.input_messages.0.message.role": "user", + "llm.input_messages.0.message.content": "You are a helpful assistant with access to various tools. \n The user is asking about: Weather and Technology\n \n Please use the available tools to gather some relevant information. For example:\n - Check the current time\n - Generate a random number if relevant\n - Calculate squares of numbers if needed\n - Get weather information for any cities mentioned\n \n Use at least 2-3 tools to demonstrate their functionality.", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "call_qWaFnNRY8mk2PQjEu0wRLaRd", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_current_time", + "llm.output_messages.0.message.tool_calls.1.tool_call.id": "call_3ckaPILSv4SmyeufQf1ovA3H", + "llm.output_messages.0.message.tool_calls.1.tool_call.function.name": "generate_random_number", + "llm.output_messages.0.message.tool_calls.1.tool_call.function.arguments": '{"min_val": 1, "max_val": 10}', + "llm.output_messages.0.message.tool_calls.2.tool_call.id": "call_BjaiJ0NHwWs14fMbCyjDElEX", + "llm.output_messages.0.message.tool_calls.2.tool_call.function.name": "get_weather_info", + "llm.output_messages.0.message.tool_calls.2.tool_call.function.arguments": '{"city": "San Francisco"}', + "llm.invocation_parameters": '{"model": "gpt-4o-mini-2024-07-18", "url": "https://alpha.uipath.com/..."}', + "llm.tools.0.tool.json_schema": '{"type": "function", "function": {"name": "get_current_time", "description": "Get the current date and time.", "parameters": {"properties": {}, "type": "object"}}}', + "llm.tools.1.tool.json_schema": '{"type": "function", "function": {"name": "generate_random_number", "description": "Generate a random number between min_val and max_val (inclusive).", "parameters": {"properties": {"min_val": {"default": 1, "type": "integer"}, "max_val": {"default": 100, "type": "integer"}}, "type": "object"}}}', + "llm.tools.2.tool.json_schema": '{"type": "function", "function": {"name": "calculate_square", "description": "Calculate the square of a given number.", "parameters": {"properties": {"number": {"type": "number"}}, "required": ["number"], "type": "object"}}}', + "llm.tools.3.tool.json_schema": '{"type": "function", "function": {"name": "get_weather_info", "description": "Get mock weather information for a given city.", "parameters": {"properties": {"city": {"type": "string"}}, "required": ["city"], "type": "object"}}}', + "llm.provider": "azure", + "llm.system": "openai", + "session.id": "a879985a-8d39-4f51-94e1-8433423f35db", + "metadata": '{"thread_id": "a879985a-8d39-4f51-94e1-8433423f35db", "langgraph_step": 1, "langgraph_node": "make_tool_calls"}', + "model": "gpt-4o-mini-2024-07-18", + "usage": { + "promptTokens": 219, + "completionTokens": 66, + "totalTokens": 285, + "isByoExecution": False, + }, + } + + span_data = { + "PermissionStatus": 0, + "Id": "7d137190-348c-4ef2-9b19-165295643b82", + "TraceId": "81dbeaf2-c2ba-4b1e-95fd-b722f53dc405", + "ParentId": "f71478d6-f081-4bf6-a942-0944d97ffadb", + "Name": "UiPathChat", + "StartTime": "2025-08-26T16:11:17.276Z", + "EndTime": "2025-08-26T16:11:20.027Z", + "Attributes": real_trace_attributes, # Dictionary format (not JSON string) + "SpanType": "completion", + } + + # Process the span + result = processor.process_span(span_data) + + # Verify the trace data structure is preserved + assert result["Id"] == "7d137190-348c-4ef2-9b19-165295643b82" + assert result["Name"] == "UiPathChat" + assert result["SpanType"] == "completion" + + # Verify attributes are unflattened and accessible + attrs = result["attributes"] + assert isinstance(attrs, dict) + + # Test LLM provider info + assert attrs["llm"]["provider"] == "azure" + assert attrs["llm"]["system"] == "openai" + + # Test input messages + input_messages = attrs["llm"]["input_messages"] + assert len(input_messages) == 1 + assert input_messages[0]["message"]["role"] == "user" + assert "helpful assistant" in input_messages[0]["message"]["content"] + + # Test output messages and tool calls + output_messages = attrs["llm"]["output_messages"] + assert len(output_messages) == 1 + assert output_messages[0]["message"]["role"] == "assistant" + + tool_calls = output_messages[0]["message"]["tool_calls"] + assert len(tool_calls) == 3 + + # Verify individual tool calls + assert tool_calls[0]["tool_call"]["function"]["name"] == "get_current_time" + assert tool_calls[0]["tool_call"]["id"] == "call_qWaFnNRY8mk2PQjEu0wRLaRd" + + assert ( + tool_calls[1]["tool_call"]["function"]["name"] == "generate_random_number" + ) + assert ( + tool_calls[1]["tool_call"]["function"]["arguments"] + == '{"min_val": 1, "max_val": 10}' + ) + + assert tool_calls[2]["tool_call"]["function"]["name"] == "get_weather_info" + assert ( + tool_calls[2]["tool_call"]["function"]["arguments"] + == '{"city": "San Francisco"}' + ) + + # Test tools schema + tools = attrs["llm"]["tools"] + assert len(tools) == 4 + + # Parse and verify tool schemas + tool_0_schema = json.loads(tools[0]["tool"]["json_schema"]) + assert tool_0_schema["function"]["name"] == "get_current_time" + + tool_1_schema = json.loads(tools[1]["tool"]["json_schema"]) + assert tool_1_schema["function"]["name"] == "generate_random_number" + + # Test session data + assert attrs["session"]["id"] == "a879985a-8d39-4f51-94e1-8433423f35db" + + # Test metadata + metadata = json.loads(attrs["metadata"]) + assert metadata["thread_id"] == "a879985a-8d39-4f51-94e1-8433423f35db" + assert metadata["langgraph_step"] == 1 + assert metadata["langgraph_node"] == "make_tool_calls" + + # Test model and usage info + assert attrs["model"] == "gpt-4o-mini-2024-07-18" + assert attrs["usage"]["promptTokens"] == 219 + assert attrs["usage"]["completionTokens"] == 66 + assert attrs["usage"]["totalTokens"] == 285 + assert attrs["usage"]["isByoExecution"] is False + + # Test MIME types + assert attrs["input"]["mime_type"] == "application/json" + assert attrs["output"]["mime_type"] == "application/json" + + print("✅ Real-world trace unflattening test passed!") + print(f" - Processed {len(real_trace_attributes)} flattened attributes") + print(f" - Created nested structure with {len(tool_calls)} tool calls") + print(f" - Verified {len(tools)} tool schemas") + print(" - All nested access patterns work correctly") + + def test_invalid_json_attributes(self): + """Test handling of invalid JSON in attributes.""" + processor = LangchainSpanProcessor(unflatten_attributes=True) + + span_data = {"Id": "test-id", "Attributes": "invalid json {"} + + # Should handle gracefully and return original span + # Note: invalid JSON causes the Attributes key to be removed + result = processor.process_span(span_data) + assert result["Id"] == "test-id" + assert "Attributes" not in result # Attributes key is removed on invalid JSON + + +# uipath-langchain==0.0.123.dev1001490444 diff --git a/tests/tracers/test_otel_error_handling.py b/tests/tracers/test_otel_error_handling.py new file mode 100644 index 00000000..7e7f0696 --- /dev/null +++ b/tests/tracers/test_otel_error_handling.py @@ -0,0 +1,312 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +import sys +from types import ModuleType + +from uipath_langchain.tracers._instrument_traceable import ( + otel_traceable_adapter, + _instrument_traceable_attributes, + _map_traceable_to_traced_args, +) + + +class TestErrorHandling: + """Test error handling in OTEL traceable adapter.""" + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_traced_import_error(self, mock_traced): + """Test handling when UiPath traced decorator import fails.""" + mock_traced.side_effect = ImportError("uipath.tracing not found") + + with pytest.raises(ImportError): + # Should propagate the import error + otel_traceable_adapter(run_type="tool")(lambda: None) + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_traced_decorator_error(self, mock_traced): + """Test handling when traced decorator raises an error.""" + mock_traced.side_effect = ValueError("Invalid traced parameters") + + with pytest.raises(ValueError): + # Should propagate decorator errors + otel_traceable_adapter(run_type="tool")(lambda: None) + + def test_invalid_parameter_types(self): + """Test handling of invalid parameter types.""" + # Test with invalid tags type (currently ignored) + result = _map_traceable_to_traced_args(tags=123) + + # Tags are not currently supported, so should be ignored + assert "tags" not in result + assert "attributes" not in result + + # Test with invalid metadata values (currently ignored) + result = _map_traceable_to_traced_args( + metadata={"key": None, "complex": {"nested": "value"}} + ) + + # Metadata is not currently supported, so should be ignored + assert "metadata" not in result + assert "attributes" not in result + + def test_extremely_large_parameters(self): + """Test handling of very large parameter values.""" + large_tags = [f"tag_{i}" for i in range(1000)] + large_metadata = {f"key_{i}": f"value_{i}" for i in range(100)} + + result = _map_traceable_to_traced_args( + tags=large_tags, + metadata=large_metadata + ) + + # Should handle large collections without error (currently ignored) + assert "tags" not in result + assert "attributes" not in result + assert isinstance(result, dict) + + def test_special_characters_in_parameters(self): + """Test handling of special characters in parameter values.""" + special_metadata = { + "unicode": "测试数据", + "newlines": "line1\nline2\nline3", + "quotes": 'single"double\'quotes', + "symbols": "!@#$%^&*()_+-=[]{}|;:,.<>?" + } + + result = _map_traceable_to_traced_args(metadata=special_metadata) + + # Should handle special characters gracefully (currently ignored) + assert "metadata" not in result + assert "attributes" not in result + assert isinstance(result, dict) + + +class TestModuleInstrumentationErrors: + """Test error handling in module instrumentation.""" + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_langsmith_import_error(self, mock_import): + """Test handling when langsmith module cannot be imported.""" + mock_import.side_effect = ImportError("No module named 'langsmith'") + + # Should handle import error gracefully - implementation dependent + try: + result = _instrument_traceable_attributes(useOtel=True) + # If it doesn't raise, it should return None or handle gracefully + except ImportError: + # Or it might propagate - both are acceptable behaviors + pass + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_module_attribute_error(self, mock_import): + """Test handling when langsmith module doesn't have traceable attribute.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + # Mock langsmith module without traceable attribute + mock_langsmith = Mock(spec=[]) # Empty spec - no attributes + mock_import.return_value = mock_langsmith + + # Current implementation raises AttributeError when traceable is missing + with pytest.raises(AttributeError, match="Mock object has no attribute 'traceable'"): + _instrument_traceable_attributes(useOtel=True) + finally: + # Restore original state + module.original_langsmith = original_state + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + @patch('uipath_langchain.tracers._instrument_traceable.sys.modules') + def test_sys_modules_modification_error(self, mock_modules, mock_import): + """Test handling when sys.modules modification fails.""" + # Mock sys.modules that raises on deletion + mock_modules.__contains__ = Mock(return_value=True) + mock_modules.__delitem__ = Mock(side_effect=KeyError("Cannot delete module")) + + mock_langsmith = Mock() + mock_langsmith.traceable = Mock() + mock_import.return_value = mock_langsmith + + # Should handle sys.modules errors gracefully + try: + result = _instrument_traceable_attributes(useOtel=True) + # Implementation should handle this error case + except KeyError: + # Or it might propagate - depends on implementation + pass + + @patch('uipath_langchain.tracers._instrument_traceable.register_uipath_tracing') + def test_register_tracing_error(self, mock_register): + """Test handling when register_uipath_tracing fails.""" + mock_register.side_effect = RuntimeError("Tracing registration failed") + + # Should handle registration error gracefully when OTEL is disabled + try: + result = _instrument_traceable_attributes(useOtel=False) + # Implementation should handle this error + except RuntimeError: + # Or it might propagate + pass + + +class TestConcurrencyAndThreading: + """Test behavior under concurrent access.""" + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_concurrent_instrumentation(self, mock_import): + """Test concurrent calls to instrumentation function.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + # Mock langsmith module + mock_langsmith = Mock() + mock_langsmith.traceable = Mock() + mock_import.return_value = mock_langsmith + + # Simulate concurrent calls + results = [] + for _ in range(5): + result = _instrument_traceable_attributes(useOtel=True) + results.append(result) + + # After the first call, global state is set, so subsequent calls return the cached module + # First call should use the mock, subsequent calls return the cached global + assert results[0] == mock_langsmith + # Import should only happen once due to global state + mock_import.assert_called_once() + finally: + # Restore original state + module.original_langsmith = original_state + + def test_global_state_consistency(self): + """Test that global state remains consistent.""" + from uipath_langchain.tracers._instrument_traceable import ( + original_langsmith, + original_traceable + ) + + # Store initial state + initial_langsmith = original_langsmith + initial_traceable = original_traceable + + # Multiple calls should maintain consistent state + # (This test might need adjustment based on actual global state management) + + +class TestEdgeCases: + """Test edge cases and unusual scenarios.""" + + def test_empty_function_decoration(self): + """Test decorating functions with no parameters.""" + @otel_traceable_adapter() + def empty_func(): + pass + + # Should work without error + assert callable(empty_func) + + def test_lambda_decoration(self): + """Test decorating lambda functions.""" + # This should work but might have limitations + decorated_lambda = otel_traceable_adapter()(lambda x: x * 2) + + assert callable(decorated_lambda) + + def test_class_method_decoration(self): + """Test decorating class methods.""" + class TestClass: + @otel_traceable_adapter(run_type="tool") + def method(self, x): + return x * 2 + + instance = TestClass() + # Should work without error + assert hasattr(instance, 'method') + assert callable(instance.method) + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_recursive_decoration(self, mock_traced): + """Test what happens with recursive decoration.""" + mock_traced.return_value = lambda f: f + + # Apply decorator multiple times + def base_func(): + pass + + decorated_once = otel_traceable_adapter()(base_func) + decorated_twice = otel_traceable_adapter()(decorated_once) + + # Should handle multiple decoration attempts + assert callable(decorated_twice) + + def test_very_long_function_names(self): + """Test with extremely long function names.""" + long_name = "a" * 1000 + + result = _map_traceable_to_traced_args(name=long_name) + + # Should handle very long names + assert result["name"] == long_name + + def test_unicode_in_parameters(self): + """Test Unicode characters in all parameters.""" + result = _map_traceable_to_traced_args( + name="测试函数", + run_type="工具", + tags=["标签1", "标签2"], + metadata={"键": "值", "key2": "测试数据"} + ) + + # Should handle Unicode correctly + assert result["name"] == "测试函数" + assert result["run_type"] == "工具" + assert result["span_type"] == "工具" # Will be passthrough since not in mapping + + # Tags and metadata not currently supported + assert "tags" not in result + assert "metadata" not in result + assert "attributes" not in result + + +class TestMemoryAndPerformance: + """Test memory usage and performance characteristics.""" + + def test_memory_cleanup(self): + """Test that large objects are properly cleaned up.""" + # Create adapter with large metadata + large_metadata = {f"key_{i}": "x" * 1000 for i in range(100)} + + result = _map_traceable_to_traced_args(metadata=large_metadata) + + # Should complete without memory errors (metadata currently ignored) + assert isinstance(result, dict) + assert "attributes" not in result + + # Clear references + del large_metadata, result + + def test_parameter_mapping_performance(self): + """Test performance with many parameters.""" + import time + + # Large but reasonable parameter set + tags = [f"tag_{i}" for i in range(100)] + metadata = {f"key_{i}": f"value_{i}" for i in range(200)} + + start_time = time.time() + result = _map_traceable_to_traced_args( + name="test", + run_type="tool", + tags=tags, + metadata=metadata + ) + end_time = time.time() + + # Should complete in reasonable time (adjust threshold as needed) + assert (end_time - start_time) < 1.0 # 1 second max + assert isinstance(result, dict) # Should return valid dict \ No newline at end of file diff --git a/tests/tracers/test_otel_traceable_adapter.py b/tests/tracers/test_otel_traceable_adapter.py new file mode 100644 index 00000000..4246d0b4 --- /dev/null +++ b/tests/tracers/test_otel_traceable_adapter.py @@ -0,0 +1,346 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from typing import Dict, Any, List, Optional + +from uipath_langchain.tracers._instrument_traceable import ( + _map_traceable_to_traced_args, + otel_traceable_adapter, + _instrument_traceable_attributes, +) + + +class TestMapTraceableToTracedArgs: + """Test parameter mapping from LangSmith @traceable to UiPath @traced().""" + + def test_basic_mapping(self): + """Test basic parameter mapping.""" + result = _map_traceable_to_traced_args( + name="test_func", + run_type="tool" + ) + + assert result["name"] == "test_func" + assert result["run_type"] == "tool" + assert result["span_type"] == "tool_call" + + def test_run_type_mapping(self): + """Test run_type to span_type mapping.""" + test_cases = [ + ("tool", "tool_call"), + ("chain", "chain_execution"), + ("llm", "llm_call"), + ("retriever", "retrieval"), + ("embedding", "embedding"), + ("prompt", "prompt_template"), + ("parser", "output_parser"), + ("custom_type", "custom_type"), # passthrough + ] + + for run_type, expected_span_type in test_cases: + result = _map_traceable_to_traced_args(run_type=run_type) + assert result["span_type"] == expected_span_type + assert result["run_type"] == run_type + + def test_tags_mapping(self): + """Test tags mapping (currently not supported by UiPath @traced).""" + result = _map_traceable_to_traced_args( + tags=["search", "web", "test"] + ) + + # Tags are currently not mapped since UiPath @traced doesn't support them + assert "tags" not in result + + def test_tags_string_mapping(self): + """Test single tag as string (currently not supported).""" + result = _map_traceable_to_traced_args( + tags="single_tag" + ) + + # Tags are currently not mapped + assert "tags" not in result + + def test_metadata_mapping(self): + """Test metadata mapping (currently not supported by UiPath @traced).""" + metadata = { + "version": "1.0", + "model": "gpt-4", + "temperature": 0.7 + } + + result = _map_traceable_to_traced_args(metadata=metadata) + + # Metadata is currently not mapped since UiPath @traced doesn't support custom attributes + assert "metadata" not in result + assert "attributes" not in result + + def test_complete_mapping(self): + """Test complete parameter mapping.""" + result = _map_traceable_to_traced_args( + name="research_tool", + run_type="tool", + tags=["research", "web"], + metadata={"version": "2.0", "provider": "tavily"} + ) + + assert result["name"] == "research_tool" + assert result["run_type"] == "tool" + assert result["span_type"] == "tool_call" + + # Tags and metadata are not currently supported by UiPath @traced + assert "tags" not in result + assert "metadata" not in result + assert "attributes" not in result + + def test_empty_parameters(self): + """Test handling of empty/None parameters.""" + result = _map_traceable_to_traced_args() + + # Should return empty dict when no parameters provided + assert result == {} + + def test_kwargs_ignored(self): + """Test that extra kwargs are ignored.""" + result = _map_traceable_to_traced_args( + name="test", + unknown_param="ignored", + another_param=123 + ) + + assert result["name"] == "test" + assert "unknown_param" not in result + assert "another_param" not in result + + +class TestOtelTraceableAdapter: + """Test the OTEL traceable adapter decorator.""" + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_decorator_without_parentheses(self, mock_traced): + """Test @otel_traceable_adapter usage (direct decoration).""" + mock_traced.return_value = lambda f: f # Mock traced decorator + + def sample_func(): + return "test" + + # Apply decorator directly + decorated_func = otel_traceable_adapter(sample_func) + + # Should call traced() with empty args and return decorated function + mock_traced.assert_called_once_with() + assert callable(decorated_func) + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_decorator_with_parentheses(self, mock_traced): + """Test @otel_traceable_adapter(...) usage (parameterized decoration).""" + mock_traced_instance = Mock() + mock_traced.return_value = mock_traced_instance + mock_traced_instance.return_value = lambda f: f + + def sample_func(): + return "test" + + # Create parameterized decorator + decorator = otel_traceable_adapter( + run_type="tool", + name="test_tool", + tags=["test"], + metadata={"version": "1.0"} + ) + + # Apply to function + decorated_func = decorator(sample_func) + + # Should call traced with mapped parameters + expected_args = { + "name": "test_tool", + "run_type": "tool", + "span_type": "tool_call" + } + + mock_traced.assert_called_once_with(**expected_args) + mock_traced_instance.assert_called_once_with(sample_func) + assert callable(decorated_func) + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_parameter_mapping_integration(self, mock_traced): + """Test that parameters are correctly mapped through the adapter.""" + mock_traced.return_value = lambda f: f + + decorator = otel_traceable_adapter( + run_type="chain", + name="research_chain", + tags=["research", "web"], + metadata={"model": "gpt-4", "temperature": 0.7} + ) + + def sample_func(): + pass + + decorator(sample_func) + + # Verify traced was called with correctly mapped parameters + call_args = mock_traced.call_args[1] # Get keyword arguments + + assert call_args["name"] == "research_chain" + assert call_args["run_type"] == "chain" + assert call_args["span_type"] == "chain_execution" + + # Tags and metadata are not currently supported + assert "tags" not in call_args + assert "metadata" not in call_args + assert "attributes" not in call_args + + +class TestInstrumentTraceableAttributes: + """Test the instrumentation function with OTEL support.""" + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_otel_enabled(self, mock_import): + """Test instrumentation with OTEL enabled.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + # Mock langsmith module + mock_langsmith = Mock() + mock_langsmith.traceable = Mock() + mock_import.return_value = mock_langsmith + + # Call with OTEL enabled + result = _instrument_traceable_attributes(useOtel=True) + + # Should import langsmith and replace traceable with OTEL adapter + mock_import.assert_called_once_with("langsmith") + assert result == mock_langsmith + + # traceable should be replaced with otel_traceable_adapter + from uipath_langchain.tracers._instrument_traceable import otel_traceable_adapter + assert mock_langsmith.traceable == otel_traceable_adapter + finally: + # Restore original state + module.original_langsmith = original_state + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + @patch('uipath_langchain.tracers._instrument_traceable.register_uipath_tracing') + def test_otel_disabled(self, mock_register, mock_import): + """Test instrumentation with OTEL disabled.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + # Mock langsmith module + mock_langsmith = Mock() + mock_langsmith.traceable = Mock() + mock_import.return_value = mock_langsmith + + # Call with OTEL disabled + result = _instrument_traceable_attributes(useOtel=False) + + # Should register custom tracing and use patched_traceable + mock_register.assert_called_once() + mock_import.assert_called_once_with("langsmith") + assert result == mock_langsmith + + # traceable should be replaced with patched_traceable + from uipath_langchain.tracers._instrument_traceable import patched_traceable + assert mock_langsmith.traceable == patched_traceable + finally: + # Restore original state + module.original_langsmith = original_state + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_existing_module_handling(self, mock_import): + """Test handling when langsmith module is already in sys.modules.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + # Mock existing langsmith in sys.modules + existing_langsmith = Mock() + existing_langsmith.traceable = Mock() + + import sys + with patch.dict(sys.modules, {'langsmith': existing_langsmith}): + # Mock fresh import + fresh_langsmith = Mock() + fresh_langsmith.traceable = Mock() + mock_import.return_value = fresh_langsmith + + result = _instrument_traceable_attributes(useOtel=True) + + # Should temporarily remove existing module, import fresh, and restore + assert result == fresh_langsmith + finally: + # Restore original state + module.original_langsmith = original_state + + @patch('uipath_langchain.tracers._instrument_traceable.importlib.import_module') + def test_import_error_handling(self, mock_import): + """Test handling of import errors.""" + # Reset global state for this test + import uipath_langchain.tracers._instrument_traceable as module + original_state = module.original_langsmith + module.original_langsmith = None + + try: + mock_import.side_effect = ImportError("langsmith not found") + + # Current implementation propagates import errors + with pytest.raises(ImportError, match="langsmith not found"): + _instrument_traceable_attributes(useOtel=True) + finally: + # Restore original state + module.original_langsmith = original_state + + +class TestIntegration: + """Integration tests for the complete OTEL adapter flow.""" + + @patch('uipath_langchain.tracers._instrument_traceable.traced') + def test_end_to_end_workflow(self, mock_traced): + """Test complete workflow from decorator to traced call.""" + # Setup mock + mock_traced_decorator = Mock() + mock_traced.return_value = mock_traced_decorator + mock_traced_decorator.return_value = lambda f: f + + # Create and use decorator + @otel_traceable_adapter(run_type="tool", name="search_tool") + def search(query: str) -> str: + return f"Results for {query}" + + # Verify traced was called with correct parameters + mock_traced.assert_called_once() + call_kwargs = mock_traced.call_args[1] + + assert call_kwargs["name"] == "search_tool" + assert call_kwargs["run_type"] == "tool" + assert call_kwargs["span_type"] == "tool_call" + + def test_parameter_validation(self): + """Test parameter validation and edge cases.""" + # Test with None values + result = _map_traceable_to_traced_args( + name=None, + run_type=None, + tags=None, + metadata=None + ) + + # Should handle None gracefully + assert isinstance(result, dict) + + # Test with empty collections + result = _map_traceable_to_traced_args( + tags=[], + metadata={} + ) + + # Should handle empty collections + assert isinstance(result, dict) \ No newline at end of file