Skip to content

Commit 85979f2

Browse files
committed
feat(tracing): Enhance trace parameters support in responder and generation models
- Add optional `trace_params` parameter to `respond_async` method - Update `TraceParams` to use `dict[str, Any]` for metadata type - Implement comprehensive trace parameter handling in `_create_tracing_contexts` - Support additional tracing metadata fields like user_id, session_id, tags, version, and release - Merge custom metadata from trace parameters with existing metadata - Add example script demonstrating trace parameter usage Improves observability and flexibility for tracing generation and response events with more granular configuration options.
1 parent f2f3317 commit 85979f2

File tree

3 files changed

+222
-5
lines changed

3 files changed

+222
-5
lines changed

agentle/generations/models/generation/trace_params.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TraceParams(TypedDict, total=False):
5353
session_id: NotRequired[str]
5454
version: NotRequired[str]
5555
release: NotRequired[str]
56-
metadata: NotRequired[Any]
56+
metadata: NotRequired[dict[str, Any]]
5757
tags: NotRequired[Sequence[str]]
5858
public: NotRequired[bool]
5959
parent_trace_id: NotRequired[str]

agentle/responses/responder.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# from rsb.coroutines import fire_and_forget
1414
from rsb.models.field import Field
1515

16+
from agentle.generations.models.generation.trace_params import TraceParams
1617
from agentle.generations.tracing.otel_client_type import OtelClientType
1718
from agentle.prompts.models.prompt import Prompt as AgentlePromptType
1819
from agentle.responses.async_stream import AsyncStream
@@ -279,6 +280,7 @@ async def respond_async[TextFormatT = None](
279280
stream_options: Optional[ResponseStreamOptions] = None,
280281
conversation: Optional[Union[str, ConversationParam]] = None,
281282
text_format: type[TextFormatT] | None = None,
283+
trace_params: Optional[TraceParams] = None,
282284
# ResponseProperties parameters
283285
previous_response_id: Optional[str] = None,
284286
reasoning: Optional[Reasoning] = None,
@@ -324,6 +326,7 @@ async def respond_async[TextFormatT = None](
324326
stream_options: Optional[ResponseStreamOptions] = None,
325327
conversation: Optional[Union[str, ConversationParam]] = None,
326328
text_format: type[TextFormatT] | None = None,
329+
trace_params: Optional[TraceParams] = None,
327330
# ResponseProperties parameters
328331
previous_response_id: Optional[str] = None,
329332
reasoning: Optional[Reasoning] = None,
@@ -369,6 +372,7 @@ async def respond_async[TextFormatT = None](
369372
stream_options: Optional[ResponseStreamOptions] = None,
370373
conversation: Optional[Union[str, ConversationParam]] = None,
371374
text_format: type[TextFormatT] | None = None,
375+
trace_params: Optional[TraceParams] = None,
372376
# ResponseProperties parameters
373377
previous_response_id: Optional[str] = None,
374378
reasoning: Optional[Reasoning] = None,
@@ -413,6 +417,7 @@ async def respond_async[TextFormatT = None](
413417
stream_options: Optional[ResponseStreamOptions] = None,
414418
conversation: Optional[Union[str, ConversationParam]] = None,
415419
text_format: type[TextFormatT] | None = None,
420+
trace_params: Optional[TraceParams] = None,
416421
# ResponseProperties parameters
417422
previous_response_id: Optional[str] = None,
418423
reasoning: Optional[Reasoning] = None,
@@ -508,12 +513,14 @@ async def respond_async[TextFormatT = None](
508513
return await self._respond_async(
509514
create_response,
510515
text_format=text_format,
516+
trace_params=trace_params,
511517
)
512518

513519
async def _respond_async[TextFormatT](
514520
self,
515521
create_response: CreateResponse,
516522
text_format: Type[TextFormatT] | None = None,
523+
trace_params: Optional[TraceParams] = None,
517524
) -> Response[TextFormatT] | AsyncStream[ResponseStreamEvent, TextFormatT]:
518525
_api_key = self.api_key
519526
if not _api_key:
@@ -557,6 +564,7 @@ async def _respond_async[TextFormatT](
557564
model=model,
558565
create_response=create_response,
559566
custom_metadata=custom_metadata,
567+
trace_params=trace_params,
560568
)
561569
except Exception as e:
562570
# Log error but don't fail the request
@@ -1136,6 +1144,7 @@ async def _create_tracing_contexts(
11361144
model: str,
11371145
create_response: CreateResponse,
11381146
custom_metadata: dict[str, Any],
1147+
trace_params: Optional[TraceParams] = None,
11391148
) -> list[TracingContext]:
11401149
"""
11411150
Create trace and generation contexts for all configured OtelClients.
@@ -1148,6 +1157,7 @@ async def _create_tracing_contexts(
11481157
model: The model identifier
11491158
create_response: The CreateResponse object
11501159
custom_metadata: Custom metadata dictionary to include in traces
1160+
trace_params: Optional trace parameters for observability
11511161
11521162
Returns:
11531163
List of TracingContext objects containing client and context information
@@ -1164,35 +1174,67 @@ async def _create_tracing_contexts(
11641174
f"Creating tracing contexts for {len(self.otel_clients)} OTel client(s) with model: {model}"
11651175
)
11661176

1177+
# Initialize trace_params if not provided
1178+
if trace_params is None:
1179+
trace_params = TraceParams()
1180+
1181+
# Extract trace parameters
1182+
trace_name = trace_params.get("name", "responder_api_call")
1183+
user_id = trace_params.get("user_id")
1184+
session_id = trace_params.get("session_id")
1185+
tags = trace_params.get("tags")
1186+
trace_version = trace_params.get("version")
1187+
trace_release = trace_params.get("release")
1188+
trace_public = trace_params.get("public")
1189+
# parent_trace_id = trace_params.get("parent_trace_id") # Reserved for future use
1190+
1191+
# Merge custom metadata from trace_params
1192+
merged_metadata = dict(custom_metadata)
1193+
if "metadata" in trace_params:
1194+
trace_metadata_val = trace_params["metadata"]
1195+
merged_metadata.update(trace_metadata_val)
1196+
11671197
# Prepare input data and metadata for tracing
11681198
input_data = self._prepare_trace_input_data(create_response)
11691199
metadata = self._prepare_trace_metadata(
11701200
model=model,
11711201
base_url=self.base_url,
1172-
custom_metadata=custom_metadata,
1202+
custom_metadata=merged_metadata,
11731203
)
11741204

1205+
# Add trace_params specific fields to metadata
1206+
if trace_version:
1207+
metadata.custom_metadata["version"] = trace_version
1208+
if trace_release:
1209+
metadata.custom_metadata["release"] = trace_release
1210+
if trace_public is not None:
1211+
metadata.custom_metadata["public"] = trace_public
1212+
11751213
# Create contexts for each client
11761214
for client in self.otel_clients:
11771215
client_name = type(client).__name__
11781216
try:
11791217
logger.debug(f"Creating trace context for client: {client_name}")
11801218

1181-
# Create trace context
1219+
# Create trace context with trace_params
11821220
trace_gen = client.trace_context(
1183-
name="responder_api_call",
1221+
name=trace_name,
11841222
input_data=input_data.model_dump(),
11851223
metadata=metadata.to_api_dict(),
1224+
user_id=user_id,
1225+
session_id=session_id,
1226+
tags=tags,
11861227
)
11871228
trace_ctx = await trace_gen.__anext__()
11881229

11891230
logger.debug(f"Trace context created for client: {client_name}")
11901231

11911232
# Create generation context
11921233
logger.debug(f"Creating generation context for client: {client_name}")
1234+
generation_name = trace_params.get("name", "response_generation")
11931235
generation_gen = client.generation_context(
11941236
trace_context=trace_ctx,
1195-
name="response_generation",
1237+
name=generation_name,
11961238
model=model,
11971239
provider=metadata.provider,
11981240
input_data=input_data.model_dump(),
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""
2+
Example demonstrating TraceParams integration with Responder.
3+
4+
This example shows how to use TraceParams to add metadata, tags, user_id,
5+
session_id, and other observability information to your API calls.
6+
"""
7+
8+
import asyncio
9+
10+
from dotenv import load_dotenv
11+
from rsb.models.base_model import BaseModel
12+
13+
from agentle.generations.models.generation.trace_params import TraceParams
14+
from agentle.generations.tracing.langfuse_otel_client import LangfuseOtelClient
15+
from agentle.responses.responder import Responder
16+
17+
load_dotenv(override=True)
18+
19+
20+
class Answer(BaseModel):
21+
"""Structured response format."""
22+
23+
answer: str
24+
confidence: float
25+
26+
27+
async def basic_trace_params_example():
28+
"""Basic example with trace_params."""
29+
print("\n=== Basic TraceParams Example ===")
30+
31+
responder = Responder.openrouter()
32+
responder.append_otel_client(LangfuseOtelClient())
33+
34+
# Create trace params with basic information
35+
trace_params = TraceParams(
36+
name="customer_support_query",
37+
user_id="user_123",
38+
session_id="session_456",
39+
tags=["support", "billing"],
40+
metadata={
41+
"department": "billing",
42+
"priority": "high",
43+
"source": "web_chat",
44+
},
45+
)
46+
47+
response = await responder.respond_async(
48+
input="What is the capital of France?",
49+
model="google/gemma-3-27b-it",
50+
max_output_tokens=1000,
51+
text_format=Answer,
52+
trace_params=trace_params,
53+
)
54+
55+
print(f"Response: {response.output_parsed}")
56+
print(f"Trace name: {trace_params.get('name')}")
57+
print(f"User ID: {trace_params.get('user_id')}")
58+
print(f"Session ID: {trace_params.get('session_id')}")
59+
print(f"Tags: {trace_params.get('tags')}")
60+
61+
62+
async def version_release_example():
63+
"""Example with version and release tracking."""
64+
print("\n=== Version & Release Tracking Example ===")
65+
66+
responder = Responder.openrouter()
67+
responder.append_otel_client(LangfuseOtelClient())
68+
69+
# Track version and release information
70+
trace_params = TraceParams(
71+
name="api_v2_query",
72+
version="2.1.0",
73+
release="production",
74+
metadata={
75+
"api_version": "v2",
76+
"deployment": "us-east-1",
77+
},
78+
)
79+
80+
response = await responder.respond_async(
81+
input="Explain quantum computing in simple terms",
82+
model="google/gemma-3-27b-it",
83+
max_output_tokens=1000,
84+
trace_params=trace_params,
85+
)
86+
87+
print(f"Response: {response.output_text}")
88+
print(f"Version: {trace_params.get('version')}")
89+
print(f"Release: {trace_params.get('release')}")
90+
91+
92+
async def multi_user_session_example():
93+
"""Example simulating multiple users and sessions."""
94+
print("\n=== Multi-User Session Example ===")
95+
96+
responder = Responder.openrouter()
97+
responder.append_otel_client(LangfuseOtelClient())
98+
99+
# Simulate different users asking questions
100+
users = [
101+
("user_alice", "session_001", "What is machine learning?"),
102+
("user_bob", "session_002", "Explain neural networks"),
103+
("user_charlie", "session_003", "What is deep learning?"),
104+
]
105+
106+
for user_id, session_id, question in users:
107+
trace_params = TraceParams(
108+
name="educational_query",
109+
user_id=user_id,
110+
session_id=session_id,
111+
tags=["education", "ai_concepts"],
112+
metadata={
113+
"category": "ai_education",
114+
"difficulty": "beginner",
115+
},
116+
)
117+
118+
response = await responder.respond_async(
119+
input=question,
120+
model="google/gemma-3-27b-it",
121+
max_output_tokens=500,
122+
trace_params=trace_params,
123+
)
124+
125+
print(f"\nUser: {user_id} | Session: {session_id}")
126+
print(f"Question: {question}")
127+
print(f"Answer: {response.output_text[:100]}...")
128+
129+
130+
async def streaming_with_trace_params():
131+
"""Example with streaming and trace_params."""
132+
print("\n=== Streaming with TraceParams Example ===")
133+
134+
responder = Responder.openrouter()
135+
responder.append_otel_client(LangfuseOtelClient())
136+
137+
trace_params = TraceParams(
138+
name="streaming_query",
139+
user_id="user_streaming",
140+
session_id="stream_session_001",
141+
tags=["streaming", "real_time"],
142+
metadata={
143+
"stream_type": "text",
144+
"buffer_size": "default",
145+
},
146+
)
147+
148+
stream = await responder.respond_async(
149+
input="Write a short poem about AI",
150+
model="google/gemma-3-27b-it",
151+
max_output_tokens=500,
152+
stream=True,
153+
trace_params=trace_params,
154+
)
155+
156+
print("Streaming response:")
157+
async for event in stream:
158+
if event.type == "ResponseTextDeltaEvent":
159+
print(event.delta, end="", flush=True)
160+
161+
print("\n")
162+
163+
164+
async def main():
165+
"""Run all examples."""
166+
await basic_trace_params_example()
167+
await version_release_example()
168+
await multi_user_session_example()
169+
await streaming_with_trace_params()
170+
171+
print("\n=== All Examples Complete ===")
172+
173+
174+
if __name__ == "__main__":
175+
asyncio.run(main())

0 commit comments

Comments
 (0)