Skip to content

Commit 9e19004

Browse files
authored
[codex] add context-window lineage headers (#16758)
This change adds client-owned context-window and parent thread id headers to all requests to responses api.
1 parent 39097ab commit 9e19004

File tree

7 files changed

+338
-37
lines changed

7 files changed

+338
-37
lines changed

codex-rs/core/src/client.rs

Lines changed: 111 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::sync::Arc;
2828
use std::sync::Mutex as StdMutex;
2929
use std::sync::OnceLock;
3030
use std::sync::atomic::AtomicBool;
31+
use std::sync::atomic::AtomicU64;
3132
use std::sync::atomic::Ordering;
3233

3334
use codex_api::CompactClient as ApiCompactClient;
@@ -119,6 +120,9 @@ use codex_response_debug_context::telemetry_transport_error_message;
119120
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
120121
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
121122
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
123+
pub const X_CODEX_PARENT_THREAD_ID_HEADER: &str = "x-codex-parent-thread-id";
124+
pub const X_CODEX_WINDOW_ID_HEADER: &str = "x-codex-window-id";
125+
pub const X_OPENAI_SUBAGENT_HEADER: &str = "x-openai-subagent";
122126
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
123127
"x-responsesapi-include-timing-metrics";
124128
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
@@ -137,6 +141,7 @@ pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
137141
struct ModelClientState {
138142
auth_manager: Option<Arc<AuthManager>>,
139143
conversation_id: ThreadId,
144+
window_generation: AtomicU64,
140145
provider: ModelProviderInfo,
141146
auth_env_telemetry: AuthEnvTelemetry,
142147
session_source: SessionSource,
@@ -274,6 +279,7 @@ impl ModelClient {
274279
state: Arc::new(ModelClientState {
275280
auth_manager,
276281
conversation_id,
282+
window_generation: AtomicU64::new(0),
277283
provider,
278284
auth_env_telemetry,
279285
session_source,
@@ -303,6 +309,24 @@ impl ModelClient {
303309
self.state.auth_manager.clone()
304310
}
305311

312+
pub(crate) fn set_window_generation(&self, window_generation: u64) {
313+
self.state
314+
.window_generation
315+
.store(window_generation, Ordering::Relaxed);
316+
self.store_cached_websocket_session(WebsocketSession::default());
317+
}
318+
319+
pub(crate) fn advance_window_generation(&self) {
320+
self.state.window_generation.fetch_add(1, Ordering::Relaxed);
321+
self.store_cached_websocket_session(WebsocketSession::default());
322+
}
323+
324+
fn current_window_id(&self) -> String {
325+
let conversation_id = self.state.conversation_id;
326+
let window_generation = self.state.window_generation.load(Ordering::Relaxed);
327+
format!("{conversation_id}:{window_generation}")
328+
}
329+
306330
fn take_cached_websocket_session(&self) -> WebsocketSession {
307331
let mut cached_websocket_session = self
308332
.state
@@ -401,7 +425,7 @@ impl ModelClient {
401425
text,
402426
};
403427

404-
let mut extra_headers = self.build_subagent_headers();
428+
let mut extra_headers = self.build_responses_identity_headers();
405429
extra_headers.extend(build_conversation_headers(Some(
406430
self.state.conversation_id.to_string(),
407431
)));
@@ -461,21 +485,56 @@ impl ModelClient {
461485

462486
fn build_subagent_headers(&self) -> ApiHeaderMap {
463487
let mut extra_headers = ApiHeaderMap::new();
464-
if let SessionSource::SubAgent(sub) = &self.state.session_source {
465-
let subagent = match sub {
466-
SubAgentSource::Review => "review".to_string(),
467-
SubAgentSource::Compact => "compact".to_string(),
468-
SubAgentSource::MemoryConsolidation => "memory_consolidation".to_string(),
469-
SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(),
470-
SubAgentSource::Other(label) => label.clone(),
471-
};
472-
if let Ok(val) = HeaderValue::from_str(&subagent) {
473-
extra_headers.insert("x-openai-subagent", val);
474-
}
488+
if let Some(subagent) = subagent_header_value(&self.state.session_source)
489+
&& let Ok(val) = HeaderValue::from_str(&subagent)
490+
{
491+
extra_headers.insert(X_OPENAI_SUBAGENT_HEADER, val);
492+
}
493+
extra_headers
494+
}
495+
496+
fn build_responses_identity_headers(&self) -> ApiHeaderMap {
497+
let mut extra_headers = self.build_subagent_headers();
498+
if let Some(parent_thread_id) = parent_thread_id_header_value(&self.state.session_source)
499+
&& let Ok(val) = HeaderValue::from_str(&parent_thread_id)
500+
{
501+
extra_headers.insert(X_CODEX_PARENT_THREAD_ID_HEADER, val);
502+
}
503+
if let Ok(val) = HeaderValue::from_str(&self.current_window_id()) {
504+
extra_headers.insert(X_CODEX_WINDOW_ID_HEADER, val);
475505
}
476506
extra_headers
477507
}
478508

509+
fn build_ws_client_metadata(
510+
&self,
511+
turn_metadata_header: Option<&str>,
512+
) -> HashMap<String, String> {
513+
let mut client_metadata = HashMap::new();
514+
client_metadata.insert(
515+
X_CODEX_WINDOW_ID_HEADER.to_string(),
516+
self.current_window_id(),
517+
);
518+
if let Some(subagent) = subagent_header_value(&self.state.session_source) {
519+
client_metadata.insert(X_OPENAI_SUBAGENT_HEADER.to_string(), subagent);
520+
}
521+
if let Some(parent_thread_id) = parent_thread_id_header_value(&self.state.session_source) {
522+
client_metadata.insert(
523+
X_CODEX_PARENT_THREAD_ID_HEADER.to_string(),
524+
parent_thread_id,
525+
);
526+
}
527+
if let Some(turn_metadata_header) = parse_turn_metadata_header(turn_metadata_header)
528+
&& let Ok(turn_metadata) = turn_metadata_header.to_str()
529+
{
530+
client_metadata.insert(
531+
X_CODEX_TURN_METADATA_HEADER.to_string(),
532+
turn_metadata.to_string(),
533+
);
534+
}
535+
client_metadata
536+
}
537+
479538
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
480539
fn build_request_telemetry(
481540
session_telemetry: &SessionTelemetry,
@@ -655,6 +714,7 @@ impl ModelClient {
655714
headers.insert("x-client-request-id", header_value);
656715
}
657716
headers.extend(build_conversation_headers(Some(conversation_id)));
717+
headers.extend(self.build_responses_identity_headers());
658718
headers.insert(
659719
OPENAI_BETA_HEADER,
660720
HeaderValue::from_static(RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE),
@@ -678,7 +738,7 @@ impl Drop for ModelClientSession {
678738
}
679739

680740
impl ModelClientSession {
681-
fn reset_websocket_session(&mut self) {
741+
pub(crate) fn reset_websocket_session(&mut self) {
682742
self.websocket_session.connection = None;
683743
self.websocket_session.last_request = None;
684744
self.websocket_session.last_response_rx = None;
@@ -769,11 +829,15 @@ impl ModelClientSession {
769829
ApiResponsesOptions {
770830
conversation_id: Some(conversation_id),
771831
session_source: Some(self.client.state.session_source.clone()),
772-
extra_headers: build_responses_headers(
773-
self.client.state.beta_features_header.as_deref(),
774-
Some(&self.turn_state),
775-
turn_metadata_header.as_ref(),
776-
),
832+
extra_headers: {
833+
let mut headers = build_responses_headers(
834+
self.client.state.beta_features_header.as_deref(),
835+
Some(&self.turn_state),
836+
turn_metadata_header.as_ref(),
837+
);
838+
headers.extend(self.client.build_responses_identity_headers());
839+
headers
840+
},
777841
compression,
778842
turn_state: Some(Arc::clone(&self.turn_state)),
779843
}
@@ -1137,7 +1201,7 @@ impl ModelClientSession {
11371201
)?;
11381202
let mut ws_payload = ResponseCreateWsRequest {
11391203
client_metadata: response_create_client_metadata(
1140-
build_ws_client_metadata(turn_metadata_header),
1204+
Some(self.client.build_ws_client_metadata(turn_metadata_header)),
11411205
request_trace.as_ref(),
11421206
),
11431207
..ResponseCreateWsRequest::from(&request)
@@ -1370,14 +1434,6 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
13701434
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
13711435
}
13721436

1373-
fn build_ws_client_metadata(turn_metadata_header: Option<&str>) -> Option<HashMap<String, String>> {
1374-
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?;
1375-
let turn_metadata = turn_metadata_header.to_str().ok()?.to_string();
1376-
let mut client_metadata = HashMap::new();
1377-
client_metadata.insert(X_CODEX_TURN_METADATA_HEADER.to_string(), turn_metadata);
1378-
Some(client_metadata)
1379-
}
1380-
13811437
/// Builds the extra headers attached to Responses API requests.
13821438
///
13831439
/// These headers implement Codex-specific conventions:
@@ -1409,6 +1465,34 @@ fn build_responses_headers(
14091465
headers
14101466
}
14111467

1468+
fn subagent_header_value(session_source: &SessionSource) -> Option<String> {
1469+
let SessionSource::SubAgent(subagent_source) = session_source else {
1470+
return None;
1471+
};
1472+
match subagent_source {
1473+
SubAgentSource::Review => Some("review".to_string()),
1474+
SubAgentSource::Compact => Some("compact".to_string()),
1475+
SubAgentSource::MemoryConsolidation => Some("memory_consolidation".to_string()),
1476+
SubAgentSource::ThreadSpawn { .. } => Some("collab_spawn".to_string()),
1477+
SubAgentSource::Other(label) => Some(label.clone()),
1478+
}
1479+
}
1480+
1481+
fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<String> {
1482+
match session_source {
1483+
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
1484+
parent_thread_id, ..
1485+
}) => Some(parent_thread_id.to_string()),
1486+
SessionSource::Cli
1487+
| SessionSource::VSCode
1488+
| SessionSource::Exec
1489+
| SessionSource::Mcp
1490+
| SessionSource::Custom(_)
1491+
| SessionSource::SubAgent(_)
1492+
| SessionSource::Unknown => None,
1493+
}
1494+
}
1495+
14121496
fn map_response_stream<S>(
14131497
api_stream: S,
14141498
session_telemetry: SessionTelemetry,

codex-rs/core/src/client_tests.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ use super::AuthRequestTelemetryContext;
22
use super::ModelClient;
33
use super::PendingUnauthorizedRetry;
44
use super::UnauthorizedRecoveryExecution;
5+
use super::X_CODEX_PARENT_THREAD_ID_HEADER;
6+
use super::X_CODEX_TURN_METADATA_HEADER;
7+
use super::X_CODEX_WINDOW_ID_HEADER;
8+
use super::X_OPENAI_SUBAGENT_HEADER;
59
use codex_api::api_bridge::CoreAuthProvider;
610
use codex_app_server_protocol::AuthMode;
711
use codex_model_provider_info::WireApi;
@@ -80,11 +84,49 @@ fn build_subagent_headers_sets_other_subagent_label() {
8084
)));
8185
let headers = client.build_subagent_headers();
8286
let value = headers
83-
.get("x-openai-subagent")
87+
.get(X_OPENAI_SUBAGENT_HEADER)
8488
.and_then(|value| value.to_str().ok());
8589
assert_eq!(value, Some("memory_consolidation"));
8690
}
8791

92+
#[test]
93+
fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
94+
let parent_thread_id = ThreadId::new();
95+
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
96+
parent_thread_id,
97+
depth: 2,
98+
agent_path: None,
99+
agent_nickname: None,
100+
agent_role: None,
101+
}));
102+
103+
client.advance_window_generation();
104+
105+
let client_metadata = client.build_ws_client_metadata(Some(r#"{"turn_id":"turn-123"}"#));
106+
let conversation_id = client.state.conversation_id;
107+
assert_eq!(
108+
client_metadata,
109+
std::collections::HashMap::from([
110+
(
111+
X_CODEX_WINDOW_ID_HEADER.to_string(),
112+
format!("{conversation_id}:1"),
113+
),
114+
(
115+
X_OPENAI_SUBAGENT_HEADER.to_string(),
116+
"collab_spawn".to_string(),
117+
),
118+
(
119+
X_CODEX_PARENT_THREAD_ID_HEADER.to_string(),
120+
parent_thread_id.to_string(),
121+
),
122+
(
123+
X_CODEX_TURN_METADATA_HEADER.to_string(),
124+
r#"{"turn_id":"turn-123"}"#.to_string(),
125+
),
126+
])
127+
);
128+
}
129+
88130
#[tokio::test]
89131
async fn summarize_memories_returns_empty_for_empty_input() {
90132
let client = test_model_client(SessionSource::Cli);

codex-rs/core/src/codex.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,6 +1531,17 @@ impl Session {
15311531
),
15321532
),
15331533
};
1534+
let window_generation = match &initial_history {
1535+
InitialHistory::Resumed(resumed_history) => u64::try_from(
1536+
resumed_history
1537+
.history
1538+
.iter()
1539+
.filter(|item| matches!(item, RolloutItem::Compacted(_)))
1540+
.count(),
1541+
)
1542+
.unwrap_or(u64::MAX),
1543+
InitialHistory::New | InitialHistory::Forked(_) => 0,
1544+
};
15341545
let state_builder = match &initial_history {
15351546
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
15361547
resumed.history.as_slice(),
@@ -1920,6 +1931,9 @@ impl Session {
19201931
),
19211932
environment: environment_manager.current().await?,
19221933
};
1934+
services
1935+
.model_client
1936+
.set_window_generation(window_generation);
19231937
let js_repl = Arc::new(JsReplHandle::with_node_path(
19241938
config.js_repl_node_path.clone(),
19251939
config.js_repl_node_module_dirs.clone(),
@@ -3514,6 +3528,7 @@ impl Session {
35143528
self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item)])
35153529
.await;
35163530
}
3531+
self.services.model_client.advance_window_generation();
35173532
}
35183533

35193534
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
@@ -5733,16 +5748,20 @@ pub(crate) async fn run_turn(
57335748

57345749
let model_info = turn_context.model_info.clone();
57355750
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
5751+
let mut prewarmed_client_session = prewarmed_client_session;
57365752
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
57375753
// new user message are recorded. Estimate pending incoming items (context
57385754
// diffs/full reinjection + user input) and trigger compaction preemptively
57395755
// when they would push the thread over the compaction threshold.
5740-
if run_pre_sampling_compact(&sess, &turn_context)
5741-
.await
5742-
.is_err()
5743-
{
5744-
error!("Failed to run pre-sampling compact");
5745-
return None;
5756+
let pre_sampling_compacted = match run_pre_sampling_compact(&sess, &turn_context).await {
5757+
Ok(pre_sampling_compacted) => pre_sampling_compacted,
5758+
Err(_) => {
5759+
error!("Failed to run pre-sampling compact");
5760+
return None;
5761+
}
5762+
};
5763+
if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() {
5764+
client_session.reset_websocket_session();
57465765
}
57475766

57485767
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
@@ -6055,6 +6074,7 @@ pub(crate) async fn run_turn(
60556074
{
60566075
return None;
60576076
}
6077+
client_session.reset_websocket_session();
60586078
continue;
60596079
}
60606080

@@ -6215,9 +6235,9 @@ pub(crate) async fn run_turn(
62156235
async fn run_pre_sampling_compact(
62166236
sess: &Arc<Session>,
62176237
turn_context: &Arc<TurnContext>,
6218-
) -> CodexResult<()> {
6238+
) -> CodexResult<bool> {
62196239
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
6220-
maybe_run_previous_model_inline_compact(
6240+
let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact(
62216241
sess,
62226242
turn_context,
62236243
total_usage_tokens_before_compaction,
@@ -6231,8 +6251,9 @@ async fn run_pre_sampling_compact(
62316251
// Compact if the total usage tokens are greater than the auto compact limit
62326252
if total_usage_tokens >= auto_compact_limit {
62336253
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
6254+
pre_sampling_compacted = true;
62346255
}
6235-
Ok(())
6256+
Ok(pre_sampling_compacted)
62366257
}
62376258

62386259
/// Runs pre-sampling compaction against the previous model when switching to a smaller

codex-rs/core/src/compact.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ async fn run_compact_task_inner(
220220
};
221221
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
222222
.await;
223+
client_session.reset_websocket_session();
223224
sess.recompute_token_usage(&turn_context).await;
224225

225226
sess.emit_turn_item_completed(&turn_context, compaction_item)

0 commit comments

Comments
 (0)