Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 103 additions & 35 deletions codex-rs/core/src/mcp_tool_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use codex_analytics::AppInvocation;
use codex_analytics::InvocationType;
use codex_analytics::build_track_events_context;
use codex_features::Feature;
use codex_otel::sanitize_metric_tag_value;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::AskForApproval;
Expand All @@ -61,6 +62,9 @@ use tracing::Span;
use tracing::field::Empty;
use url::Url;

const MCP_CALL_COUNT_METRIC: &str = "codex.mcp.call";
const MCP_CALL_DURATION_METRIC: &str = "codex.mcp.call.duration_ms";

/// Handles the specified tool call dispatches the appropriate
/// `McpToolCallBegin` and `McpToolCallEnd` events to the `Session`.
pub(crate) async fn handle_mcp_tool_call(
Expand Down Expand Up @@ -128,7 +132,7 @@ pub(crate) async fn handle_mcp_tool_call(
.await;
let status = if result.is_ok() { "ok" } else { "error" };
turn_context.session_telemetry.counter(
"codex.mcp.call",
MCP_CALL_COUNT_METRIC,
/*inc*/ 1,
&[("status", status)],
);
Expand Down Expand Up @@ -166,7 +170,7 @@ pub(crate) async fn handle_mcp_tool_call(
)
.await
{
let result = match decision {
let (result, call_duration) = match decision {
McpToolApprovalDecision::Accept
| McpToolApprovalDecision::AcceptForSession
| McpToolApprovalDecision::AcceptAndRemember => {
Expand Down Expand Up @@ -206,10 +210,11 @@ pub(crate) async fn handle_mcp_tool_call(
if let Err(error) = &result {
tracing::warn!("MCP tool call error: {error:?}");
}
let duration = start.elapsed();
let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id: call_id.clone(),
invocation,
duration: start.elapsed(),
duration,
result: result.clone(),
});
notify_mcp_tool_call_event(
Expand All @@ -225,50 +230,62 @@ pub(crate) async fn handle_mcp_tool_call(
&tool_name,
)
.await;
result
(result, Some(duration))
}
McpToolApprovalDecision::Decline => {
let message = "user rejected MCP tool call".to_string();
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
(
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
)
.await,
None,
)
.await
}
McpToolApprovalDecision::Cancel => {
let message = "user cancelled MCP tool call".to_string();
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
(
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
)
.await,
None,
)
.await
}
McpToolApprovalDecision::BlockedBySafetyMonitor(message) => {
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
(
notify_mcp_tool_call_skip(
sess.as_ref(),
turn_context.as_ref(),
&call_id,
invocation,
message,
/*already_started*/ true,
)
.await,
None,
)
.await
}
};

let status = if result.is_ok() { "ok" } else { "error" };
turn_context.session_telemetry.counter(
"codex.mcp.call",
/*inc*/ 1,
&[("status", status)],
emit_mcp_call_metrics(
turn_context.as_ref(),
status,
&tool_name,
connector_id.as_deref(),
connector_name.as_deref(),
call_duration,
);

return CallToolResult::from_result(result);
Expand Down Expand Up @@ -306,10 +323,11 @@ pub(crate) async fn handle_mcp_tool_call(
if let Err(error) = &result {
tracing::warn!("MCP tool call error: {error:?}");
}
let duration = start.elapsed();
let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id: call_id.clone(),
invocation,
duration: start.elapsed(),
duration,
result: result.clone(),
});

Expand All @@ -322,11 +340,61 @@ pub(crate) async fn handle_mcp_tool_call(
maybe_track_codex_app_used(sess.as_ref(), turn_context.as_ref(), &server, &tool_name).await;

let status = if result.is_ok() { "ok" } else { "error" };
emit_mcp_call_metrics(
turn_context.as_ref(),
status,
&tool_name,
connector_id.as_deref(),
connector_name.as_deref(),
Some(duration),
);

CallToolResult::from_result(result)
}

fn emit_mcp_call_metrics(
turn_context: &TurnContext,
status: &str,
tool_name: &str,
connector_id: Option<&str>,
connector_name: Option<&str>,
duration: Option<Duration>,
) {
let tags = mcp_call_metric_tags(status, tool_name, connector_id, connector_name);
let tag_refs: Vec<(&str, &str)> = tags
.iter()
.map(|(key, value)| (*key, value.as_str()))
.collect();
turn_context
.session_telemetry
.counter("codex.mcp.call", /*inc*/ 1, &[("status", status)]);
.counter(MCP_CALL_COUNT_METRIC, /*inc*/ 1, &tag_refs);
if let Some(duration) = duration {
turn_context.session_telemetry.record_duration(
MCP_CALL_DURATION_METRIC,
duration,
&tag_refs,
);
}
}

CallToolResult::from_result(result)
fn mcp_call_metric_tags(
status: &str,
tool_name: &str,
connector_id: Option<&str>,
connector_name: Option<&str>,
) -> Vec<(&'static str, String)> {
let mut tags = vec![
("status", sanitize_metric_tag_value(status)),
("tool", sanitize_metric_tag_value(tool_name)),
];
if let Some(connector_id) = connector_id.filter(|connector_id| !connector_id.is_empty()) {
tags.push(("connector_id", sanitize_metric_tag_value(connector_id)));
}
if let Some(connector_name) = connector_name.filter(|connector_name| !connector_name.is_empty())
{
tags.push(("connector_name", sanitize_metric_tag_value(connector_name)));
}
tags
}

fn mcp_tool_call_span(
Expand Down
Loading