Skip to content

Commit 67cdfff

Browse files
committed
feat(tracing): Enhance Langfuse generation update with fire-and-forget strategy
- Add fire-and-forget mechanism for Langfuse generation updates - Improve performance by making update process non-blocking - Refactor update_generation method to use async fire-and-forget pattern - Ensure robust error handling and logging for generation updates - Maintain comprehensive metadata and cost tracking for Langfuse tracing Optimizes tracing update performance by preventing potential blocking operations during generation tracking.
1 parent 4d6f5ab commit 67cdfff

File tree

1 file changed

+185
-149
lines changed

1 file changed

+185
-149
lines changed

agentle/generations/tracing/langfuse_otel_client.py

Lines changed: 185 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from typing import TYPE_CHECKING, Any, AsyncGenerator, Literal, Optional, cast, override
1515

1616
from pydantic import PrivateAttr
17+
from rsb.coroutines.fire_and_forget import fire_and_forget
1718
from rsb.models import BaseModel
1819

1920
from .otel_client import GenerationContext, OtelClient, TraceContext
@@ -191,85 +192,92 @@ async def update_generation(
191192
192193
Esta implementação garante que as contagens de tokens e custos
193194
sejam registrados com precisão no Langfuse UI.
195+
196+
Usa fire-and-forget para não bloquear a execução, já que não precisamos
197+
esperar pela confirmação da atualização.
194198
"""
195199
if not isinstance(generation_context, _LangfuseGenerationContext):
196200
return
197201

198-
try:
199-
generation = generation_context.generation
200-
201-
# Preparar dados para atualização em uma única chamada
202-
update_params = {"output": dict(output_data)}
203-
204-
# Adicionar metadados se fornecidos
205-
if metadata:
206-
update_params["metadata"] = dict(metadata)
207-
208-
# ✅ FIX: Mapear para o formato correto esperado pelo Langfuse V3
209-
if usage_details:
210-
langfuse_usage = {
211-
"prompt_tokens": usage_details.get("input", 0),
212-
"completion_tokens": usage_details.get("output", 0),
213-
"total_tokens": usage_details.get("total", 0),
214-
}
215-
update_params["usage_details"] = langfuse_usage
216-
217-
# ✅ FIX: Try multiple cost field name formats to ensure compatibility
218-
if cost_details:
219-
input_cost = float(cost_details.get("input", 0.0))
220-
output_cost = float(cost_details.get("output", 0.0))
221-
total_cost = float(cost_details.get("total", 0.0))
222-
223-
# Ensure we have meaningful cost values (not zero)
224-
if total_cost > 0 or input_cost > 0 or output_cost > 0:
225-
# Try the format that Langfuse V3 documentation suggests
226-
langfuse_cost = {
227-
"total": total_cost,
228-
}
202+
async def _do_update() -> None:
203+
try:
204+
generation = generation_context.generation
229205

230-
# Also include breakdown if available
231-
if input_cost > 0:
232-
langfuse_cost["input"] = input_cost
233-
if output_cost > 0:
234-
langfuse_cost["output"] = output_cost
206+
# Preparar dados para atualização em uma única chamada
207+
update_params = {"output": dict(output_data)}
235208

236-
update_params["cost_details"] = langfuse_cost
209+
# Adicionar metadados se fornecidos
210+
if metadata:
211+
update_params["metadata"] = dict(metadata)
237212

238-
# Also add cost information to metadata for better visibility
239-
cost_metadata = {
240-
"cost_usd_input": input_cost,
241-
"cost_usd_output": output_cost,
242-
"cost_usd_total": total_cost,
243-
"currency": "USD",
213+
# ✅ FIX: Mapear para o formato correto esperado pelo Langfuse V3
214+
if usage_details:
215+
langfuse_usage = {
216+
"prompt_tokens": usage_details.get("input", 0),
217+
"completion_tokens": usage_details.get("output", 0),
218+
"total_tokens": usage_details.get("total", 0),
244219
}
220+
update_params["usage_details"] = langfuse_usage
221+
222+
# ✅ FIX: Try multiple cost field name formats to ensure compatibility
223+
if cost_details:
224+
input_cost = float(cost_details.get("input", 0.0))
225+
output_cost = float(cost_details.get("output", 0.0))
226+
total_cost = float(cost_details.get("total", 0.0))
227+
228+
# Ensure we have meaningful cost values (not zero)
229+
if total_cost > 0 or input_cost > 0 or output_cost > 0:
230+
# Try the format that Langfuse V3 documentation suggests
231+
langfuse_cost = {
232+
"total": total_cost,
233+
}
245234

246-
if "metadata" not in update_params:
247-
update_params["metadata"] = {}
248-
update_params["metadata"].update(cost_metadata)
235+
# Also include breakdown if available
236+
if input_cost > 0:
237+
langfuse_cost["input"] = input_cost
238+
if output_cost > 0:
239+
langfuse_cost["output"] = output_cost
249240

250-
# ✅ FIX: Fazer uma única chamada de update com todos os parâmetros
251-
generation.update(**update_params) # type: ignore
241+
update_params["cost_details"] = langfuse_cost
252242

253-
# ✅ FIX: Also try setting cost details as generation attributes
254-
if cost_details and hasattr(generation, "_otel_span"):
255-
try:
256-
# Set cost as span attributes for better visibility
257-
span = generation._otel_span # type: ignore
258-
span.set_attribute(
259-
"cost.total", float(cost_details.get("total", 0.0))
260-
)
261-
span.set_attribute(
262-
"cost.input", float(cost_details.get("input", 0.0))
263-
)
264-
span.set_attribute(
265-
"cost.output", float(cost_details.get("output", 0.0))
266-
)
267-
span.set_attribute("cost.currency", "USD")
268-
except Exception as e:
269-
logger.debug(f"Could not set cost attributes: {e}")
243+
# Also add cost information to metadata for better visibility
244+
cost_metadata = {
245+
"cost_usd_input": input_cost,
246+
"cost_usd_output": output_cost,
247+
"cost_usd_total": total_cost,
248+
"currency": "USD",
249+
}
270250

271-
except Exception as e:
272-
logger.error(f"Erro ao atualizar geração: {e}")
251+
if "metadata" not in update_params:
252+
update_params["metadata"] = {}
253+
update_params["metadata"].update(cost_metadata)
254+
255+
# ✅ FIX: Fazer uma única chamada de update com todos os parâmetros
256+
generation.update(**update_params) # type: ignore
257+
258+
# ✅ FIX: Also try setting cost details as generation attributes
259+
if cost_details and hasattr(generation, "_otel_span"):
260+
try:
261+
# Set cost as span attributes for better visibility
262+
span = generation._otel_span # type: ignore
263+
span.set_attribute(
264+
"cost.total", float(cost_details.get("total", 0.0))
265+
)
266+
span.set_attribute(
267+
"cost.input", float(cost_details.get("input", 0.0))
268+
)
269+
span.set_attribute(
270+
"cost.output", float(cost_details.get("output", 0.0))
271+
)
272+
span.set_attribute("cost.currency", "USD")
273+
except Exception as e:
274+
logger.debug(f"Could not set cost attributes: {e}")
275+
276+
except Exception as e:
277+
logger.error(f"Erro ao atualizar geração: {e}")
278+
279+
# Fire-and-forget: não bloqueia a execução
280+
fire_and_forget(_do_update())
273281

274282
async def update_trace(
275283
self,
@@ -280,76 +288,85 @@ async def update_trace(
280288
metadata: Optional[Mapping[str, Any]] = None,
281289
end_time: Optional[datetime] = None,
282290
) -> None:
283-
"""Atualiza um trace com dados finais."""
291+
"""
292+
Atualiza um trace com dados finais.
293+
294+
Usa fire-and-forget para não bloquear a execução, já que não precisamos
295+
esperar pela confirmação da atualização.
296+
"""
284297
if not isinstance(trace_context, _LangfuseTraceContext):
285298
return
286299

287-
try:
288-
span = trace_context.span
289-
290-
# Preparar metadados do trace
291-
trace_metadata = dict(metadata) if metadata else {}
292-
trace_metadata["success"] = success
293-
294-
# ✅ FIX: Extract cost information from output_data and add to trace
295-
if isinstance(output_data, dict):
296-
# Check if cost summary is in output data
297-
if "cost_summary" in output_data:
298-
cost_summary = output_data["cost_summary"]
299-
trace_metadata.update(
300-
{
301-
"cost_details": {
302-
"input": cost_summary.get("input_cost", 0.0),
303-
"output": cost_summary.get("output_cost", 0.0),
304-
},
305-
}
306-
)
307-
308-
# Check if usage summary is in output data
309-
if "usage_summary" in output_data:
310-
usage_summary = output_data["usage_summary"]
311-
trace_metadata.update(
312-
{
313-
"tokens_total": usage_summary.get("total_tokens", 0),
314-
"tokens_input": usage_summary.get("input_tokens", 0),
315-
"tokens_output": usage_summary.get("output_tokens", 0),
316-
}
317-
)
318-
319-
# ✅ FIX: Update trace with cost information using the span's update_trace method
320-
span.update_trace(
321-
output=cast(dict[str, Any], output_data),
322-
metadata=trace_metadata,
323-
)
324-
325-
# ✅ FIX: Also try to set cost directly on the trace if possible
326-
if hasattr(span, "_otel_span"):
327-
try:
328-
otel_span = span._otel_span # type: ignore[reportPrivateUsage]
329-
if hasattr(otel_span, "set_attribute"):
330-
# Set cost attributes on the span
331-
if "cost_total" in trace_metadata:
332-
otel_span.set_attribute(
333-
"cost.total", float(trace_metadata["cost_total"])
334-
)
335-
if "cost_input" in trace_metadata:
336-
otel_span.set_attribute(
337-
"cost.input", float(trace_metadata["cost_input"])
338-
)
339-
if "cost_output" in trace_metadata:
340-
otel_span.set_attribute(
341-
"cost.output", float(trace_metadata["cost_output"])
342-
)
343-
if "tokens_total" in trace_metadata:
344-
otel_span.set_attribute(
345-
"usage.total_tokens",
346-
int(trace_metadata["tokens_total"]),
347-
)
348-
except Exception as e:
349-
logger.debug(f"Could not set trace attributes: {e}")
300+
async def _do_update() -> None:
301+
try:
302+
span = trace_context.span
303+
304+
# Preparar metadados do trace
305+
trace_metadata = dict(metadata) if metadata else {}
306+
trace_metadata["success"] = success
307+
308+
# ✅ FIX: Extract cost information from output_data and add to trace
309+
if isinstance(output_data, dict):
310+
# Check if cost summary is in output data
311+
if "cost_summary" in output_data:
312+
cost_summary = output_data["cost_summary"]
313+
trace_metadata.update(
314+
{
315+
"cost_details": {
316+
"input": cost_summary.get("input_cost", 0.0),
317+
"output": cost_summary.get("output_cost", 0.0),
318+
},
319+
}
320+
)
321+
322+
# Check if usage summary is in output data
323+
if "usage_summary" in output_data:
324+
usage_summary = output_data["usage_summary"]
325+
trace_metadata.update(
326+
{
327+
"tokens_total": usage_summary.get("total_tokens", 0),
328+
"tokens_input": usage_summary.get("input_tokens", 0),
329+
"tokens_output": usage_summary.get("output_tokens", 0),
330+
}
331+
)
332+
333+
# ✅ FIX: Update trace with cost information using the span's update_trace method
334+
span.update_trace(
335+
output=cast(dict[str, Any], output_data),
336+
metadata=trace_metadata,
337+
)
350338

351-
except Exception as e:
352-
logger.error(f"Erro ao atualizar trace: {e}")
339+
# ✅ FIX: Also try to set cost directly on the trace if possible
340+
if hasattr(span, "_otel_span"):
341+
try:
342+
otel_span = span._otel_span # type: ignore[reportPrivateUsage]
343+
if hasattr(otel_span, "set_attribute"):
344+
# Set cost attributes on the span
345+
if "cost_total" in trace_metadata:
346+
otel_span.set_attribute(
347+
"cost.total", float(trace_metadata["cost_total"])
348+
)
349+
if "cost_input" in trace_metadata:
350+
otel_span.set_attribute(
351+
"cost.input", float(trace_metadata["cost_input"])
352+
)
353+
if "cost_output" in trace_metadata:
354+
otel_span.set_attribute(
355+
"cost.output", float(trace_metadata["cost_output"])
356+
)
357+
if "tokens_total" in trace_metadata:
358+
otel_span.set_attribute(
359+
"usage.total_tokens",
360+
int(trace_metadata["tokens_total"]),
361+
)
362+
except Exception as e:
363+
logger.debug(f"Could not set trace attributes: {e}")
364+
365+
except Exception as e:
366+
logger.error(f"Erro ao atualizar trace: {e}")
367+
368+
# Fire-and-forget: não bloqueia a execução
369+
fire_and_forget(_do_update())
353370

354371
async def add_trace_score(
355372
self,
@@ -359,20 +376,29 @@ async def add_trace_score(
359376
value: float | str,
360377
comment: Optional[str] = None,
361378
) -> None:
362-
"""Adiciona uma pontuação ao trace."""
379+
"""
380+
Adiciona uma pontuação ao trace.
381+
382+
Usa fire-and-forget para não bloquear a execução, já que não precisamos
383+
esperar pela confirmação da pontuação.
384+
"""
363385
if not isinstance(trace_context, _LangfuseTraceContext):
364386
return
365387

366-
try:
367-
# Usar o método score_current_trace do SDK V3
368-
self.langfuse.score_current_trace(
369-
name=name,
370-
value=value,
371-
comment=comment,
372-
)
388+
async def _do_score() -> None:
389+
try:
390+
# Usar o método score_current_trace do SDK V3
391+
self.langfuse.score_current_trace(
392+
name=name,
393+
value=value,
394+
comment=comment,
395+
)
373396

374-
except Exception as e:
375-
logger.error(f"Erro ao adicionar pontuação: {e}")
397+
except Exception as e:
398+
logger.error(f"Erro ao adicionar pontuação: {e}")
399+
400+
# Fire-and-forget: não bloqueia a execução
401+
fire_and_forget(_do_score())
376402

377403
async def handle_error(
378404
self,
@@ -460,9 +486,19 @@ def _categorize_error(self, error: Exception) -> str:
460486
return "other"
461487

462488
async def flush(self) -> None:
463-
"""Força o envio imediato de todos os eventos pendentes."""
464-
try:
465-
self.langfuse.flush()
466-
logger.debug("Eventos enviados com sucesso para Langfuse")
467-
except Exception as e:
468-
logger.error(f"Erro ao enviar eventos para Langfuse: {e}")
489+
"""
490+
Força o envio imediato de todos os eventos pendentes.
491+
492+
Usa fire-and-forget para não bloquear a execução, já que flush
493+
é uma operação de I/O que pode ser lenta.
494+
"""
495+
496+
async def _do_flush() -> None:
497+
try:
498+
self.langfuse.flush()
499+
logger.debug("Eventos enviados com sucesso para Langfuse")
500+
except Exception as e:
501+
logger.error(f"Erro ao enviar eventos para Langfuse: {e}")
502+
503+
# Fire-and-forget: não bloqueia a execução
504+
fire_and_forget(_do_flush())

0 commit comments

Comments
 (0)