diff --git a/.gitignore b/.gitignore index e399061..5b0b36f 100644 --- a/.gitignore +++ b/.gitignore @@ -34,8 +34,9 @@ benchmarks/locomo/data/ benchmarks/locomo/results/ benchmarks/longmemeval/results/ -# Claude Code +# Claude Code / Codex .claude +.codex # Logs logs diff --git a/Cargo.lock b/Cargo.lock index da63365..75b00f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3681,7 +3681,11 @@ dependencies = [ "plastmem_shared", "schemars", "serde", + "serde_json", + "strum 0.28.0", + "tokio", "tracing", + "tracing-subscriber", "uuid", ] @@ -3746,6 +3750,7 @@ dependencies = [ "plastmem_ai", "plastmem_core", "plastmem_entities", + "plastmem_event", "plastmem_event_segmentation", "plastmem_shared", "schemars", diff --git a/Cargo.toml b/Cargo.toml index ffd7ebc..3a0d52a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ strum = { version = "0.28", features = ["derive"] } tokio = { version = "1.49.0", features = [ "full" ] } tracing = "0.1" tracing-error = "0.2" +tracing-subscriber = "0.3.22" uuid = { version = "1.20.0", features = [ "serde", "v4", @@ -112,4 +113,4 @@ apalis-postgres.workspace = true sea-orm.workspace = true tokio.workspace = true tracing-error.workspace = true -tracing-subscriber = "0.3.22" +tracing-subscriber.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9e647d7..bb38302 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,23 +1,13 @@ mod conversation_message; pub use conversation_message::ConversationMessage; +mod message_queue; +pub use message_queue::{ + ADD_BACKPRESSURE_LIMIT, FENCE_TTL_MINUTES, MessageQueue, PendingReview, QueueProcessingStatus, + SegmentationCheck, +}; + mod memory; pub use memory::EpisodicMemory; pub use memory::SemanticMemory; pub use memory::{DetailLevel, format_tool_result}; - -mod pending_review_queue; -pub use pending_review_queue::{ - PendingReview, PendingReviewQueueItem, add_pending_review_item, take_pending_review_items, -}; - -mod message_ingest; -pub use message_ingest::{append_batch_messages, append_message, try_claim_segmentation_job}; - -pub(crate) mod segmentation_state; -pub use segmentation_state::{ - EpisodeSpan, SegmentJobState, SegmentationJobClaim, SegmentationProcessingStatus, - SegmentationState, abort_segmentation_job, commit_segmentation_job, get_claim_messages, - get_episode_span, get_messages_in_range, get_segmentation_processing_status, - get_segmentation_state, recover_stale_segmentation_job, -}; diff --git a/crates/core/src/memory/retrieval.rs b/crates/core/src/memory/retrieval.rs index 1acae85..80db0b4 100644 --- a/crates/core/src/memory/retrieval.rs +++ b/crates/core/src/memory/retrieval.rs @@ -51,8 +51,8 @@ pub fn format_tool_result( #[cfg(test)] mod tests { - use chrono::Utc; use chrono::TimeZone; + use chrono::Utc; use sea_orm::prelude::PgVector; use uuid::Uuid; @@ -87,8 +87,14 @@ mod tests { #[test] fn format_tool_result_outputs_only_episodic_content_blocks() { let episodic = vec![ - (episodic_memory("Spoken At: Jun 15, 2026 3 PM\nSam: hello"), 0.9), - (episodic_memory("Spoken At: Jun 16, 2026 4 PM\nEvan: hi"), 0.8), + ( + episodic_memory("Spoken At: Jun 15, 2026 3 PM\nSam: hello"), + 0.9, + ), + ( + episodic_memory("Spoken At: Jun 16, 2026 4 PM\nEvan: hi"), + 0.8, + ), ]; let rendered = format_tool_result(&[], &episodic, &DetailLevel::Auto); diff --git a/crates/core/src/message_queue.rs b/crates/core/src/message_queue.rs new file mode 100644 index 0000000..58c6607 --- /dev/null +++ b/crates/core/src/message_queue.rs @@ -0,0 +1,406 @@ +use anyhow::anyhow; +use chrono::TimeDelta; +use plastmem_entities::message_queue; +use plastmem_shared::{AppError, Message}; + +use sea_orm::{ + ConnectionTrait, DatabaseConnection, DbBackend, EntityTrait, FromQueryResult, QuerySelect, Set, + Statement, TransactionTrait, sea_query::OnConflict, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +const WINDOW_BASE: usize = 20; +const WINDOW_MAX: usize = 30; +pub const FENCE_TTL_MINUTES: i64 = 120; +const GAP_TRIGGER_HOURS: i64 = 3; + +pub const ADD_BACKPRESSURE_LIMIT: i32 = 25; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct MessageQueue { + pub id: Uuid, + pub messages: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PendingReview { + pub query: String, + pub memory_ids: Vec, +} + +#[derive(Debug, Clone)] +pub struct SegmentationCheck { + pub fence_count: i32, + pub force_process: bool, +} + +#[derive(Debug, Clone)] +pub struct QueueProcessingStatus { + pub messages_pending: i32, + pub fence_active: bool, +} + +#[derive(Debug, FromQueryResult)] +struct PushResult { + msg_count: i32, +} + +#[derive(Debug, FromQueryResult)] +struct ProcessingStatusRow { + messages_pending: i32, + fence_active: bool, +} + +#[derive(Debug, FromQueryResult)] +struct IdRow { + #[allow(dead_code)] + id: Uuid, +} + +impl MessageQueue { + pub async fn get_processing_status( + id: Uuid, + db: &DatabaseConnection, + ) -> Result { + Self::ensure_exists(id, db).await?; + + let sql = "SELECT \ + COALESCE(jsonb_array_length(messages), 0)::int AS messages_pending, \ + (in_progress_fence IS NOT NULL) AS fence_active \ + FROM message_queue \ + WHERE id = $1"; + + let row = ProcessingStatusRow::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + sql, + [id.into()], + )) + .one(db) + .await? + .ok_or_else(|| anyhow!("Queue not found after ensure_exists"))?; + + Ok(QueueProcessingStatus { + messages_pending: row.messages_pending, + fence_active: row.fence_active, + }) + } + + pub async fn get(id: Uuid, db: &DatabaseConnection) -> Result { + let model = Self::get_or_create_model(id, db).await?; + Self::from_model(model) + } + + async fn ensure_exists(id: Uuid, db: &DatabaseConnection) -> Result<(), AppError> { + message_queue::Entity::insert(message_queue::ActiveModel { + id: Set(id), + messages: Set(serde_json::to_value(Vec::::new())?), + pending_reviews: Set(None), + in_progress_fence: Set(None), + in_progress_since: Set(None), + prev_episode_content: Set(None), + }) + .on_conflict( + OnConflict::column(message_queue::Column::Id) + .do_nothing() + .to_owned(), + ) + .exec_without_returning(db) + .await?; + + Ok(()) + } + + pub async fn get_or_create_model( + id: Uuid, + db: &DatabaseConnection, + ) -> Result { + if let Some(model) = message_queue::Entity::find_by_id(id).one(db).await? { + return Ok(model); + } + + Self::ensure_exists(id, db).await?; + + message_queue::Entity::find_by_id(id) + .one(db) + .await? + .ok_or_else(|| anyhow!("Failed to ensure queue existence").into()) + } + + pub fn from_model(model: message_queue::Model) -> Result { + Ok(Self { + id: model.id, + messages: serde_json::from_value(model.messages)?, + }) + } + + pub async fn push( + id: Uuid, + message: Message, + db: &DatabaseConnection, + ) -> Result, AppError> { + Self::ensure_exists(id, db).await?; + + let message_json = serde_json::to_value(vec![&message])?; + let sql = "UPDATE message_queue \ + SET messages = messages || $1::jsonb \ + WHERE id = $2 \ + RETURNING jsonb_array_length(messages) AS msg_count"; + + let result = PushResult::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + sql, + [message_json.into(), id.into()], + )) + .one(db) + .await?; + + let trigger_count = result + .ok_or_else(|| AppError::from(anyhow!("Queue not found after push")))? + .msg_count; + + Self::check(id, trigger_count, db).await + } + + pub async fn push_batch( + id: Uuid, + messages: &[Message], + db: &DatabaseConnection, + ) -> Result { + Self::ensure_exists(id, db).await?; + if messages.is_empty() { + let queue = Self::get(id, db).await?; + return Ok(i32::try_from(queue.messages.len()).unwrap_or(i32::MAX)); + } + + let message_json = serde_json::to_value(messages)?; + let sql = "UPDATE message_queue \ + SET messages = messages || $1::jsonb \ + WHERE id = $2 \ + RETURNING jsonb_array_length(messages) AS msg_count"; + + let result = PushResult::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + sql, + [message_json.into(), id.into()], + )) + .one(db) + .await?; + + Ok( + result + .ok_or_else(|| AppError::from(anyhow!("Queue not found after push_batch")))? + .msg_count, + ) + } + + pub async fn drain(id: Uuid, count: usize, db: &C) -> Result<(), AppError> + where + C: ConnectionTrait, + { + let sql = format!( + "UPDATE message_queue SET messages = jsonb_path_query_array(messages, '$[{count} to last]'::jsonpath) WHERE id = $1" + ); + let res = db + .execute_raw(Statement::from_sql_and_values( + DbBackend::Postgres, + &sql, + [id.into()], + )) + .await?; + + if res.rows_affected() == 0 { + return Err(anyhow!("Queue not found").into()); + } + + Ok(()) + } + + pub async fn check( + id: Uuid, + trigger_count: i32, + db: &DatabaseConnection, + ) -> Result, AppError> { + let model = Self::get_or_create_model(id, db).await?; + + if model.in_progress_fence.is_some() { + let cleared = Self::clear_stale_fence(id, FENCE_TTL_MINUTES, db).await?; + if !cleared { + tracing::debug!(conversation_id = %id, "Segmentation skipped: job in progress"); + return Ok(None); + } + } + + let messages: Vec = serde_json::from_value(model.messages)?; + let trigger_count_usize = usize::try_from(trigger_count).unwrap_or(0); + let gap_trigger = messages.len() >= 2 + && messages.windows(2).last().is_some_and(|pair| { + pair[1].timestamp - pair[0].timestamp >= TimeDelta::hours(GAP_TRIGGER_HOURS) + }); + let count_trigger = trigger_count_usize >= WINDOW_BASE; + let force_trigger = gap_trigger || trigger_count_usize >= WINDOW_MAX; + + if !gap_trigger && !count_trigger { + return Ok(None); + } + + let fence_count = if gap_trigger { + trigger_count.saturating_sub(1) + } else { + trigger_count + }; + + if fence_count <= 0 || !Self::try_set_fence(id, fence_count, db).await? { + return Ok(None); + } + + Ok(Some(SegmentationCheck { + fence_count, + force_process: force_trigger, + })) + } + + pub async fn try_set_fence( + id: Uuid, + fence_count: i32, + db: &DatabaseConnection, + ) -> Result { + let sql = "UPDATE message_queue \ + SET in_progress_fence = $2, in_progress_since = NOW() \ + WHERE id = $1 AND in_progress_fence IS NULL \ + RETURNING id"; + + let result = IdRow::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + sql, + [id.into(), fence_count.into()], + )) + .one(db) + .await?; + + Ok(result.is_some()) + } + + pub async fn clear_stale_fence( + id: Uuid, + ttl_minutes: i64, + db: &DatabaseConnection, + ) -> Result { + let sql = "UPDATE message_queue \ + SET in_progress_fence = NULL, in_progress_since = NULL \ + WHERE id = $1 \ + AND in_progress_fence IS NOT NULL \ + AND in_progress_since < NOW() - ($2 || ' minutes')::INTERVAL \ + RETURNING id"; + + let result = IdRow::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + sql, + [id.into(), ttl_minutes.to_string().into()], + )) + .one(db) + .await?; + + Ok(result.is_some()) + } + + pub async fn finalize_job( + id: Uuid, + prev_episode_content: Option, + db: &C, + ) -> Result<(), AppError> + where + C: ConnectionTrait, + { + message_queue::Entity::update(message_queue::ActiveModel { + id: Set(id), + in_progress_fence: Set(None), + in_progress_since: Set(None), + prev_episode_content: Set(prev_episode_content), + ..Default::default() + }) + .exec(db) + .await?; + Ok(()) + } + + pub async fn clear_fence(id: Uuid, db: &DatabaseConnection) -> Result<(), AppError> { + message_queue::Entity::update(message_queue::ActiveModel { + id: Set(id), + in_progress_fence: Set(None), + in_progress_since: Set(None), + ..Default::default() + }) + .exec(db) + .await?; + Ok(()) + } + + pub async fn get_prev_episode_content( + id: Uuid, + db: &DatabaseConnection, + ) -> Result, AppError> { + let model = Self::get_or_create_model(id, db).await?; + Ok(model.prev_episode_content) + } + + pub async fn add_pending_review( + id: Uuid, + memory_ids: Vec, + query: String, + db: &DatabaseConnection, + ) -> Result<(), AppError> { + Self::ensure_exists(id, db).await?; + + let review = PendingReview { query, memory_ids }; + let review_value = serde_json::to_value(vec![review])?; + + let res = db + .execute_raw(Statement::from_sql_and_values( + DbBackend::Postgres, + "UPDATE message_queue SET pending_reviews = COALESCE(pending_reviews, '[]'::jsonb) || $1::jsonb WHERE id = $2", + [review_value.into(), id.into()], + )) + .await?; + + if res.rows_affected() == 0 { + return Err(anyhow!("Queue not found").into()); + } + + Ok(()) + } + + pub async fn take_pending_reviews( + id: Uuid, + db: &DatabaseConnection, + ) -> Result>, AppError> { + let txn = db.begin().await?; + + let Some(model) = message_queue::Entity::find_by_id(id) + .lock_exclusive() + .one(&txn) + .await? + else { + txn.commit().await?; + return Ok(None); + }; + + let reviews: Option> = model + .pending_reviews + .and_then(|value| serde_json::from_value(value).ok()) + .filter(|items: &Vec| !items.is_empty()); + + if reviews.is_some() { + message_queue::Entity::update(message_queue::ActiveModel { + id: Set(id), + pending_reviews: Set(None), + ..Default::default() + }) + .exec(&txn) + .await?; + } + + txn.commit().await?; + Ok(reviews) + } +} diff --git a/crates/entities/src/lib.rs b/crates/entities/src/lib.rs index 7a6f2d9..ae00766 100644 --- a/crates/entities/src/lib.rs +++ b/crates/entities/src/lib.rs @@ -6,6 +6,7 @@ pub mod conversation_message; pub mod episode_classification; pub mod episode_span; pub mod episodic_memory; +pub mod message_queue; pub mod pending_review_queue; pub mod segmentation_state; pub mod semantic_memory; diff --git a/crates/entities/src/message_queue.rs b/crates/entities/src/message_queue.rs new file mode 100644 index 0000000..b392a1d --- /dev/null +++ b/crates/entities/src/message_queue.rs @@ -0,0 +1,23 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 2.0 + +use sea_orm::entity::prelude::*; + +#[expect(clippy::derive_partial_eq_without_eq, reason = "generated")] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] +#[sea_orm(table_name = "message_queue")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + #[sea_orm(column_type = "JsonBinary")] + pub messages: Json, + #[sea_orm(column_type = "JsonBinary", nullable)] + pub pending_reviews: Option, + pub in_progress_fence: Option, + pub in_progress_since: Option, + pub prev_episode_content: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/entities/src/prelude.rs b/crates/entities/src/prelude.rs index 0a1431a..1643a01 100644 --- a/crates/entities/src/prelude.rs +++ b/crates/entities/src/prelude.rs @@ -3,6 +3,7 @@ pub use super::conversation_message::Entity as ConversationMessage; pub use super::episode_span::Entity as EpisodeSpan; pub use super::episodic_memory::Entity as EpisodicMemory; +pub use super::message_queue::Entity as MessageQueue; pub use super::pending_review_queue::Entity as PendingReviewQueue; pub use super::segmentation_state::Entity as SegmentationState; pub use super::semantic_memory::Entity as SemanticMemory; diff --git a/crates/event/src/data/mod.rs b/crates/event/src/data/mod.rs index 5d37d64..9696df4 100644 --- a/crates/event/src/data/mod.rs +++ b/crates/event/src/data/mod.rs @@ -3,7 +3,7 @@ use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; mod message; -use message::MessageEventData; +pub use message::{MessageEventData, MessageEventRole}; #[enum_dispatch] pub trait EventDataToString { diff --git a/crates/event/src/lib.rs b/crates/event/src/lib.rs index d1a6ae6..33c83c6 100644 --- a/crates/event/src/lib.rs +++ b/crates/event/src/lib.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; mod data; -pub use data::{EventData, EventDataToString}; +pub use data::{EventData, EventDataToString, MessageEventData, MessageEventRole}; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Event { diff --git a/crates/event_segmentation/Cargo.toml b/crates/event_segmentation/Cargo.toml index 2e12bbf..1ce5da6 100644 --- a/crates/event_segmentation/Cargo.toml +++ b/crates/event_segmentation/Cargo.toml @@ -24,5 +24,11 @@ anyhow.workspace = true chrono.workspace = true schemars.workspace = true serde.workspace = true +strum.workspace = true tracing.workspace = true uuid.workspace = true + +[dev-dependencies] +serde_json.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true diff --git a/crates/event_segmentation/examples/locomo_segmenter.rs b/crates/event_segmentation/examples/locomo_segmenter.rs new file mode 100644 index 0000000..3495931 --- /dev/null +++ b/crates/event_segmentation/examples/locomo_segmenter.rs @@ -0,0 +1,289 @@ +use std::{collections::BTreeMap, env, fs, path::PathBuf}; + +use anyhow::{Context, Result, anyhow}; +use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc}; +use plastmem_event::{Event, EventData, EventDataToString, MessageEventData, MessageEventRole}; +use plastmem_event_segmentation::{EventSegment, EventSegmenter}; +use serde::Deserialize; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +const DEFAULT_DATA_FILE: &str = "benchmarks/locomo/data/locomo10.json"; +const TURN_INTERVAL_MINS: i64 = 1; + +#[derive(Debug, Deserialize)] +struct DialogTurn { + #[serde(default)] + blip_caption: Option, + #[serde(default)] + query: Option, + #[serde(default)] + search_query: Option, + speaker: String, + text: String, +} + +#[derive(Debug, Deserialize)] +struct LoCoMoSample { + conversation: BTreeMap, + sample_id: String, +} + +#[derive(Debug)] +struct OrderedSession { + date: Option>, + turns: Vec, +} + +#[derive(Debug)] +struct Config { + data_file: PathBuf, + sample_id: Option, + sample_index: usize, + print_events: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + init_tracing(); + + let config = parse_args()?; + let samples = load_samples(&config.data_file)?; + let sample = select_sample(&samples, &config)?; + let events = build_events(sample)?; + + if events.is_empty() { + return Err(anyhow!("selected sample contains no dialog turns")); + } + + eprintln!( + "sample_id={} events={} data_file={}", + sample.sample_id, + events.len(), + config.data_file.display() + ); + + if config.print_events { + for (index, event) in events.iter().enumerate() { + eprintln!( + "event[{index:03}] {} {}", + event.timestamp.format("%Y-%m-%dT%H:%M:%SZ"), + event.data.to_string_without_timestamp() + ); + } + eprintln!(); + } + + let segments = EventSegmenter::segment(&events) + .await + .map_err(|err| anyhow!(err.to_string()))?; + write_segments_json(&segments)?; + + Ok(()) +} + +fn init_tracing() { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::WARN) + .with_writer(std::io::stderr) + .with_target(false) + .without_time() + .finish(); + + let _ = tracing::subscriber::set_global_default(subscriber); +} + +fn write_segments_json(segments: &[EventSegment]) -> Result<()> { + serde_json::to_writer(std::io::stdout(), segments) + .context("failed to serialize event segments to stdout")?; + println!(); + Ok(()) +} + +fn parse_args() -> Result { + let mut data_file = PathBuf::from(DEFAULT_DATA_FILE); + let mut sample_id = None; + let mut sample_index = 0usize; + let mut print_events = false; + + let mut args = env::args().skip(1); + while let Some(arg) = args.next() { + match arg.as_str() { + "--data-file" => { + let value = args + .next() + .ok_or_else(|| anyhow!("--data-file requires a path"))?; + data_file = PathBuf::from(value); + } + "--sample-id" => { + sample_id = Some( + args + .next() + .ok_or_else(|| anyhow!("--sample-id requires a value"))?, + ); + } + "--sample-index" => { + let value = args + .next() + .ok_or_else(|| anyhow!("--sample-index requires a value"))?; + sample_index = value + .parse::() + .with_context(|| format!("invalid --sample-index value: {value}"))?; + } + "--print-events" => { + print_events = true; + } + "--help" | "-h" => { + print_usage(); + std::process::exit(0); + } + other => { + return Err(anyhow!("unknown argument: {other}")); + } + } + } + + Ok(Config { + data_file, + sample_id, + sample_index, + print_events, + }) +} + +fn print_usage() { + eprintln!( + "Usage: cargo run -p plastmem_event_segmentation --example locomo_segmenter -- [options]\n\ + \n\ + Options:\n\ + --data-file LoCoMo JSON file path (default: {DEFAULT_DATA_FILE})\n\ + --sample-id Select sample by sample_id\n\ + --sample-index Select sample by zero-based index when --sample-id is omitted\n\ + --print-events Print flattened events before segmentation\n\ + -h, --help Show this help" + ); +} + +fn load_samples(path: &PathBuf) -> Result> { + let raw = fs::read_to_string(path) + .with_context(|| format!("failed to read LoCoMo data file: {}", path.display()))?; + serde_json::from_str(&raw) + .with_context(|| format!("failed to parse LoCoMo JSON from {}", path.display())) +} + +fn select_sample<'a>(samples: &'a [LoCoMoSample], config: &Config) -> Result<&'a LoCoMoSample> { + if let Some(sample_id) = &config.sample_id { + return samples + .iter() + .find(|sample| &sample.sample_id == sample_id) + .ok_or_else(|| anyhow!("sample_id not found: {sample_id}")); + } + + samples + .get(config.sample_index) + .ok_or_else(|| anyhow!("sample_index out of range: {}", config.sample_index)) +} + +fn build_events(sample: &LoCoMoSample) -> Result> { + let sessions = get_ordered_sessions(sample)?; + let mut events = Vec::new(); + + for session in sessions { + for (turn_index, turn) in session.turns.iter().enumerate() { + let text = turn.text.trim(); + if text.is_empty() { + continue; + } + + let mut parts = vec![text.to_owned()]; + append_image_context( + &mut parts, + turn.blip_caption.as_deref(), + turn.query.as_deref().or(turn.search_query.as_deref()), + ); + let content = parts.join(" "); + let timestamp = get_turn_timestamp(session.date, turn_index)?; + let role = parse_role(&turn.speaker); + + events.push(Event::new( + EventData::Message(MessageEventData { role, content }), + timestamp, + None, + )); + } + } + + Ok(events) +} + +fn get_ordered_sessions(sample: &LoCoMoSample) -> Result> { + let mut sessions = Vec::new(); + + for session_number in 1..=100 { + let turns_key = format!("session_{session_number}"); + let Some(turns_value) = sample.conversation.get(&turns_key) else { + break; + }; + + let turns: Vec = serde_json::from_value(turns_value.clone()) + .with_context(|| format!("failed to parse {turns_key}"))?; + let date = sample + .conversation + .get(&format!("session_{session_number}_date_time")) + .and_then(|value| value.as_str()) + .map(parse_session_date) + .transpose()?; + + sessions.push(OrderedSession { date, turns }); + } + + Ok(sessions) +} + +fn get_turn_timestamp( + session_date: Option>, + turn_index: usize, +) -> Result> { + if let Some(session_date) = session_date { + let offset = i64::try_from(turn_index).context("turn index overflow")?; + return Ok(session_date + Duration::minutes(offset * TURN_INTERVAL_MINS)); + } + + let base = Utc + .with_ymd_and_hms(2023, 1, 1, 0, 0, 0) + .single() + .ok_or_else(|| anyhow!("failed to construct fallback timestamp"))?; + let offset = i64::try_from(turn_index).context("turn index overflow")?; + Ok(base + Duration::minutes(offset * TURN_INTERVAL_MINS)) +} + +fn parse_role(value: &str) -> MessageEventRole { + match value.trim().to_ascii_lowercase().as_str() { + "user" | "speaker1" => MessageEventRole::User, + "assistant" | "speaker2" => MessageEventRole::Assistant, + other => MessageEventRole::Custom(other.to_owned()), + } +} + +fn append_image_context(parts: &mut Vec, caption: Option<&str>, query: Option<&str>) { + let Some(caption) = caption.map(str::trim).filter(|caption| !caption.is_empty()) else { + return; + }; + + if let Some(query) = query.map(str::trim).filter(|query| !query.is_empty()) { + parts.push(format!( + "[Image: {caption}; Image Retrieval Keywords: {query}]" + )); + return; + } + + parts.push(format!("[Image: {caption}]")); +} + +fn parse_session_date(value: &str) -> Result> { + let value = value.trim(); + let naive = NaiveDateTime::parse_from_str(value, "%I:%M %P on %-d %B, %Y") + .or_else(|_| NaiveDateTime::parse_from_str(value, "%I:%M %P on %d %B, %Y")) + .with_context(|| format!("failed to parse session date: {value}"))?; + Ok(Utc.from_utc_datetime(&naive)) +} diff --git a/crates/event_segmentation/src/event_segment.rs b/crates/event_segmentation/src/event_segment.rs index 57cbd37..64d9e56 100644 --- a/crates/event_segmentation/src/event_segment.rs +++ b/crates/event_segmentation/src/event_segment.rs @@ -1,29 +1,58 @@ use plastmem_event::Event; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use strum::AsRefStr; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventSegment { - events: Vec, - reasons: Vec, + pub events: Vec, + pub reason: EventSegmentReason, + pub score: f32, + pub boundary_before_confidence: f32, + pub boundary_after_confidence: f32, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, AsRefStr)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] pub enum EventSegmentReason { + InitialSegment, TopicShift, TimeGap, + HardTimeGap, IntentShift, + ActivityShift, StructuralCue, } impl EventSegment { - pub fn new(events: Vec, reasons: Vec) -> Self { - Self { events, reasons } + pub fn new(events: Vec, reason: EventSegmentReason) -> Self { + Self { + events, + reason, + score: 1.0, + boundary_before_confidence: 1.0, + boundary_after_confidence: 1.0, + } } - pub fn events(&self) -> &[Event] { - &self.events + pub fn with_metadata( + events: Vec, + reason: EventSegmentReason, + score: f32, + boundary_before_confidence: f32, + boundary_after_confidence: f32, + ) -> Self { + Self { + events, + reason, + score, + boundary_before_confidence, + boundary_after_confidence, + } } - pub fn reasons(&self) -> &[EventSegmentReason] { - &self.reasons + pub fn extend_events(&mut self, events: impl IntoIterator) { + self.events.extend(events); } } diff --git a/crates/event_segmentation/src/event_segmenter.rs b/crates/event_segmentation/src/event_segmenter.rs index 29892da..7b25da5 100644 --- a/crates/event_segmentation/src/event_segmenter.rs +++ b/crates/event_segmentation/src/event_segmenter.rs @@ -1,48 +1,290 @@ +mod boundary; +mod embedding_stats; +mod review; + use chrono::TimeDelta; -use plastmem_event::Event; +use plastmem_ai::embed_many; +use plastmem_event::{Event, EventDataToString}; use plastmem_shared::AppError; use crate::{EventSegment, EventSegmentReason}; -pub struct EventSegmenter {} +use self::{ + boundary::{BoundaryCandidate, boundary_budget, collect_candidates}, + embedding_stats::{build_prefix, segment_cohesion}, + review::{review_candidates_with_llm, review_short_segments_with_llm}, +}; -impl EventSegmenter { - const TIME_GAP_THRESHOLD: TimeDelta = TimeDelta::minutes(30); +pub struct EventSegmenter; - // Perform segmented processing on events with intervals exceeding 30 minutes. - fn segment_by_time_gap(events: &[Event]) -> Result, AppError> { +const SOFT_TIME_GAP: TimeDelta = TimeDelta::minutes(30); +const HARD_TIME_GAP: TimeDelta = TimeDelta::hours(3); +const MIN_SEGMENT_EVENTS: usize = 4; +const REVIEW_CONTEXT_EVENTS: usize = 5; +const MAX_REVIEW_CANDIDATES: usize = 40; +const TARGET_EVENTS_PER_SEGMENT: usize = 12; + +impl EventSegmenter { + pub async fn segment(events: &[Event]) -> Result, AppError> { if events.is_empty() { return Ok(Vec::new()); } + let inputs = events + .iter() + .map(|event| event.data.to_string_without_timestamp()) + .collect::>(); + let embeddings = embed_many(&inputs) + .await? + .into_iter() + .map(|embedding| embedding.to_vec()) + .collect::>(); + + Ok(Self::segment_with_embeddings(events, &embeddings).await) + } + + async fn segment_with_embeddings(events: &[Event], embeddings: &[Vec]) -> Vec { let mut segments = Vec::new(); + let mut start = 0usize; + + for end in 1..=events.len() { + if !is_partition_end(events, end) { + continue; + } + + let partition_segments = Self::segment_partition( + &events[start..end], + &embeddings[start..end], + if start > 0 { + EventSegmentReason::HardTimeGap + } else { + EventSegmentReason::InitialSegment + }, + ) + .await; + segments.extend(partition_segments); + start = end; + } + + segments + } - let mut curr_events = vec![events[0].clone()]; - let mut curr_reasons = Vec::new(); - let mut prev = &events[0]; - - for curr in events.iter().skip(1) { - let gap = curr.timestamp.signed_duration_since(prev.timestamp); - - if gap > Self::TIME_GAP_THRESHOLD { - segments.push(EventSegment::new( - std::mem::take(&mut curr_events), - std::mem::take(&mut curr_reasons), - )); - curr_events.push(curr.clone()); - curr_reasons.push(EventSegmentReason::TimeGap); - } else { - curr_events.push(curr.clone()); + async fn segment_partition( + events: &[Event], + embeddings: &[Vec], + start_reason: EventSegmentReason, + ) -> Vec { + if events.len() <= 1 { + return vec![EventSegment::with_metadata( + events.to_vec(), + start_reason, + 1.0, + 1.0, + 1.0, + )]; + } + + let prefix = build_prefix(embeddings); + let candidates = collect_candidates(events, embeddings, &prefix); + let reviewed = reviewed_boundaries(events, candidates).await; + + if reviewed.is_empty() { + return vec![leaf_segment(events, embeddings, &prefix, start_reason)]; + } + + let coarse_segments = build_segments(events, embeddings, &prefix, &reviewed); + let refined = Self::refine_segments(events, embeddings, &coarse_segments, start_reason).await; + + match review_short_segments_with_llm(&refined).await { + Ok(segments) => segments, + Err(err) => { + tracing::warn!(error = %err, "Short segment merge review failed; keeping reviewed boundaries"); + refined } + } + } + + async fn refine_segments( + events: &[Event], + embeddings: &[Vec], + coarse_segments: &[EventSegment], + start_reason: EventSegmentReason, + ) -> Vec { + let mut refined = Vec::new(); + let mut offset = 0usize; - prev = curr; + for (index, coarse_segment) in coarse_segments.iter().enumerate() { + let segment_len = coarse_segment.events.len(); + refined.extend( + Box::pin(Self::segment_partition( + &events[offset..offset + segment_len], + &embeddings[offset..offset + segment_len], + if index == 0 { + start_reason + } else { + coarse_segment.reason + }, + )) + .await, + ); + offset += segment_len; } - segments.push(EventSegment::new(curr_events, curr_reasons)); - Ok(segments) + refined + } +} + +async fn reviewed_boundaries( + events: &[Event], + candidates: Vec, +) -> Vec { + match review_candidates_with_llm(events, &candidates).await { + Ok(reviewed) => reviewed, + Err(err) => { + tracing::warn!(error = %err, "Boundary review failed; using embedding candidates"); + candidates + .into_iter() + .take(boundary_budget(events.len())) + .collect() + } } +} + +fn is_partition_end(events: &[Event], end: usize) -> bool { + end == events.len() || time_gap_before(events, end) > HARD_TIME_GAP +} + +fn leaf_segment( + events: &[Event], + embeddings: &[Vec], + prefix: &[Vec], + reason: EventSegmentReason, +) -> EventSegment { + EventSegment::with_metadata( + events.to_vec(), + reason, + segment_cohesion(embeddings, prefix, 0, events.len()), + 1.0, + 1.0, + ) +} + +fn build_segments( + events: &[Event], + embeddings: &[Vec], + prefix: &[Vec], + boundaries: &[BoundaryCandidate], +) -> Vec { + let mut result = Vec::new(); + let mut start = 0usize; + let points = boundaries + .iter() + .map(|candidate| candidate.index) + .chain(std::iter::once(events.len())); + + for end in points { + if start >= end { + continue; + } + result.push(EventSegment::with_metadata( + events[start..end].to_vec(), + boundary::reason_for(boundaries, start), + segment_cohesion(embeddings, prefix, start, end), + boundary::confidence_for(boundaries, start), + boundary::confidence_for(boundaries, end), + )); + start = end; + } + + result +} + +fn time_gap_before(events: &[Event], index: usize) -> TimeDelta { + if index == 0 || index >= events.len() { + return TimeDelta::zero(); + } + events[index] + .timestamp + .signed_duration_since(events[index - 1].timestamp) +} + +#[cfg(test)] +mod tests { + use chrono::{TimeZone, Utc}; + use plastmem_event::{Event, EventData, MessageEventData, MessageEventRole}; + + use super::*; + use crate::event_segmenter::embedding_stats::normalize; + + fn message_event(id: u128, minute: i64, content: &str) -> Event { + Event::new( + EventData::Message(MessageEventData { + role: MessageEventRole::User, + content: content.to_owned(), + }), + Utc + .timestamp_opt(1_700_000_000 + minute * 60, 0) + .single() + .expect("valid timestamp"), + Some(uuid::Uuid::from_u128(id)), + ) + } + + fn embedding(x: f32, y: f32) -> Vec { + let mut value = vec![x, y]; + normalize(&mut value); + value + } + + #[test] + fn embedding_candidates_find_obvious_topic_shift() { + let events = (0..8) + .map(|index| message_event(index + 1, index as i64, "message")) + .collect::>(); + let embeddings = vec![ + embedding(1.0, 0.0), + embedding(1.0, 0.0), + embedding(1.0, 0.0), + embedding(1.0, 0.0), + embedding(0.0, 1.0), + embedding(0.0, 1.0), + embedding(0.0, 1.0), + embedding(0.0, 1.0), + ]; + let prefix = build_prefix(&embeddings); + let candidates = collect_candidates(&events, &embeddings, &prefix); + let segments = build_segments(&events, &embeddings, &prefix, &candidates); + + assert_eq!( + candidates + .iter() + .map(|candidate| candidate.index) + .collect::>(), + vec![4] + ); + assert_eq!(segments.len(), 2); + assert_eq!(segments[1].reason, EventSegmentReason::TopicShift); + } + + #[tokio::test] + async fn hard_time_gap_forces_partition_boundary() { + let events = vec![ + message_event(1, 0, "a"), + message_event(2, 1, "a"), + message_event(3, 240, "a"), + message_event(4, 241, "a"), + ]; + let embeddings = vec![ + embedding(1.0, 0.0), + embedding(1.0, 0.0), + embedding(1.0, 0.0), + embedding(1.0, 0.0), + ]; + + let segments = EventSegmenter::segment_with_embeddings(&events, &embeddings).await; - pub fn segment(events: &[Event]) -> Result, AppError> { - Self::segment_by_time_gap(events) + assert_eq!(segments.len(), 2); + assert_eq!(segments[0].reason, EventSegmentReason::InitialSegment); + assert_eq!(segments[1].reason, EventSegmentReason::HardTimeGap); } } diff --git a/crates/event_segmentation/src/event_segmenter/boundary.rs b/crates/event_segmentation/src/event_segmenter/boundary.rs new file mode 100644 index 0000000..f2a5874 --- /dev/null +++ b/crates/event_segmentation/src/event_segmenter/boundary.rs @@ -0,0 +1,118 @@ +use plastmem_ai::cosine_similarity; +use plastmem_event::Event; + +use crate::EventSegmentReason; + +use super::{ + MAX_REVIEW_CANDIDATES, MIN_SEGMENT_EVENTS, REVIEW_CONTEXT_EVENTS, SOFT_TIME_GAP, + TARGET_EVENTS_PER_SEGMENT, + embedding_stats::{mean_vector, segment_cohesion}, + time_gap_before, +}; + +#[derive(Debug, Clone)] +pub(super) struct BoundaryCandidate { + pub(super) index: usize, + pub(super) score: f32, + pub(super) reason: EventSegmentReason, +} + +pub(super) fn collect_candidates( + events: &[Event], + embeddings: &[Vec], + prefix: &[Vec], +) -> Vec { + let mut candidates = (1..events.len()) + .filter(|&index| has_minimum_segment_size(events.len(), index)) + .map(|index| BoundaryCandidate { + index, + score: boundary_score(events, embeddings, prefix, index), + reason: candidate_reason(events, index), + }) + .collect::>(); + + candidates.sort_by(|left, right| right.score.total_cmp(&left.score)); + candidates.truncate(MAX_REVIEW_CANDIDATES.min(events.len().saturating_sub(1))); + candidates.sort_by_key(|candidate| candidate.index); + candidates +} + +pub(super) fn boundary_budget(event_count: usize) -> usize { + if event_count < TARGET_EVENTS_PER_SEGMENT * 2 { + return 0; + } + + let natural_budget = (event_count / TARGET_EVENTS_PER_SEGMENT).saturating_sub(1); + let scaled_cap = (event_count / 24).clamp(1, 12); + natural_budget.min(scaled_cap) +} + +pub(super) fn reason_for(boundaries: &[BoundaryCandidate], index: usize) -> EventSegmentReason { + boundaries + .iter() + .find(|candidate| candidate.index == index) + .map(|candidate| candidate.reason) + .unwrap_or(EventSegmentReason::InitialSegment) +} + +pub(super) fn confidence_for(boundaries: &[BoundaryCandidate], index: usize) -> f32 { + if index == 0 { + return 1.0; + } + boundaries + .iter() + .find(|candidate| candidate.index == index) + .map_or(1.0, |candidate| (candidate.score / 1.25).clamp(0.0, 1.0)) +} + +fn has_minimum_segment_size(event_count: usize, index: usize) -> bool { + index >= MIN_SEGMENT_EVENTS && event_count - index >= MIN_SEGMENT_EVENTS +} + +fn candidate_reason(events: &[Event], index: usize) -> EventSegmentReason { + if time_gap_before(events, index) >= SOFT_TIME_GAP { + EventSegmentReason::TimeGap + } else { + EventSegmentReason::TopicShift + } +} + +fn boundary_score( + events: &[Event], + embeddings: &[Vec], + prefix: &[Vec], + index: usize, +) -> f32 { + let left_start = index.saturating_sub(REVIEW_CONTEXT_EVENTS); + let right_end = (index + REVIEW_CONTEXT_EVENTS).min(events.len()); + let separation = 1.0 + - cosine_similarity( + &mean_vector(prefix, left_start, index), + &mean_vector(prefix, index, right_end), + ); + let cohesion = (segment_cohesion(embeddings, prefix, left_start, index) + + segment_cohesion(embeddings, prefix, index, right_end)) + * 0.5; + let time_gap_bonus = if time_gap_before(events, index) >= SOFT_TIME_GAP { + 0.16 + } else { + 0.0 + }; + + separation + 0.12 * cohesion + time_gap_bonus +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn boundary_budget_scales_with_partition_size() { + assert_eq!(boundary_budget(23), 0); + assert_eq!(boundary_budget(24), 1); + assert_eq!(boundary_budget(37), 1); + assert_eq!(boundary_budget(60), 2); + assert_eq!(boundary_budget(120), 5); + assert_eq!(boundary_budget(240), 10); + } +} diff --git a/crates/event_segmentation/src/event_segmenter/embedding_stats.rs b/crates/event_segmentation/src/event_segmenter/embedding_stats.rs new file mode 100644 index 0000000..123ac63 --- /dev/null +++ b/crates/event_segmentation/src/event_segmenter/embedding_stats.rs @@ -0,0 +1,56 @@ +use plastmem_ai::cosine_similarity; + +pub(super) fn build_prefix(embeddings: &[Vec]) -> Vec> { + let dims = embeddings.first().map_or(0, Vec::len); + let mut prefix = vec![vec![0.0; dims]]; + for embedding in embeddings { + let mut next = prefix.last().cloned().unwrap_or_else(|| vec![0.0; dims]); + for (value, current) in next.iter_mut().zip(embedding) { + *value += current; + } + prefix.push(next); + } + prefix +} + +pub(super) fn mean_vector(prefix: &[Vec], start: usize, end: usize) -> Vec { + if start >= end || prefix.is_empty() { + return Vec::new(); + } + + let count = (end - start) as f32; + let mut mean = prefix[end] + .iter() + .zip(&prefix[start]) + .map(|(right, left)| (right - left) / count) + .collect::>(); + normalize(&mut mean); + mean +} + +pub(super) fn normalize(vector: &mut [f32]) { + let norm = vector.iter().map(|value| value * value).sum::().sqrt(); + if norm > f32::EPSILON { + for value in vector { + *value /= norm; + } + } +} + +pub(super) fn segment_cohesion( + embeddings: &[Vec], + prefix: &[Vec], + start: usize, + end: usize, +) -> f32 { + if end <= start + 1 { + return 1.0; + } + + let mean = mean_vector(prefix, start, end); + let total = embeddings[start..end] + .iter() + .map(|embedding| cosine_similarity(embedding, &mean)) + .sum::(); + (total / (end - start) as f32).clamp(-1.0, 1.0) +} diff --git a/crates/event_segmentation/src/event_segmenter/review.rs b/crates/event_segmentation/src/event_segmenter/review.rs new file mode 100644 index 0000000..68dc4d0 --- /dev/null +++ b/crates/event_segmentation/src/event_segmenter/review.rs @@ -0,0 +1,450 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use plastmem_ai::{ + ChatCompletionRequestMessage, ChatCompletionRequestSystemMessage, + ChatCompletionRequestUserMessage, generate_object, +}; +use plastmem_event::Event; +use plastmem_shared::AppError; +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::{ + EventSegment, EventSegmentReason, + prompt::{build_boundary_review_prompt, build_short_segment_merge_prompt}, +}; + +use super::{MIN_SEGMENT_EVENTS, boundary::BoundaryCandidate, boundary::boundary_budget}; + +#[derive(Debug, Deserialize, JsonSchema)] +struct BoundaryReviewOutput { + keep_boundary_indices: Vec, + decisions: Vec, +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct BoundaryDecision { + boundary_index: u32, + label: BoundaryLabel, + confidence: f32, +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct ShortSegmentMergeReviewOutput { + decisions: Vec, +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct ShortSegmentMergeDecision { + segment_index: u32, + merge_with_previous: bool, + reason: ShortSegmentMergeReason, + confidence: f32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +enum BoundaryLabel { + TopicShift, + TopicIntro, + IntentShift, + ActivityShift, + StructuralCue, + DetailElaboration, + DirectResponse, + Closing, + Noise, +} + +impl BoundaryLabel { + fn is_boundary(self) -> bool { + matches!( + self, + Self::TopicShift + | Self::TopicIntro + | Self::IntentShift + | Self::ActivityShift + | Self::StructuralCue + ) + } + + fn to_segment_reason(self) -> EventSegmentReason { + match self { + Self::TopicShift | Self::TopicIntro => EventSegmentReason::TopicShift, + Self::IntentShift => EventSegmentReason::IntentShift, + Self::ActivityShift => EventSegmentReason::ActivityShift, + Self::StructuralCue => EventSegmentReason::StructuralCue, + Self::DetailElaboration | Self::DirectResponse | Self::Closing | Self::Noise => { + EventSegmentReason::StructuralCue + } + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +enum ShortSegmentMergeReason { + SameTopicContinuation, + DetailElaboration, + DirectResponse, + ClosingOrFarewell, + SeparateTopic, + SeparateActivity, + SeparateIntent, +} + +impl ShortSegmentMergeReason { + fn is_merge(self) -> bool { + matches!( + self, + Self::SameTopicContinuation + | Self::DetailElaboration + | Self::DirectResponse + | Self::ClosingOrFarewell + ) + } +} + +pub(super) async fn review_candidates_with_llm( + events: &[Event], + candidates: &[BoundaryCandidate], +) -> Result, AppError> { + let budget = boundary_budget(events.len()); + if candidates.is_empty() || budget == 0 { + return Ok(Vec::new()); + } + + let mut ranked = candidates.to_vec(); + ranked.sort_by_key(|candidate| candidate.index); + let output = generate_object::( + vec![ + ChatCompletionRequestMessage::System(ChatCompletionRequestSystemMessage::from( + "Return JSON with keep_boundary_indices and decisions. Labels must be one of: topic_shift, topic_intro, intent_shift, activity_shift, structural_cue, detail_elaboration, direct_response, closing, noise.", + )), + ChatCompletionRequestMessage::User(ChatCompletionRequestUserMessage::from( + build_boundary_review_prompt(events, &candidate_scores(&ranked), budget), + )), + ], + "event_boundary_review_batch".to_owned(), + Some("Review candidate event boundaries".to_owned()), + ) + .await?; + + Ok(apply_boundary_review_output(&ranked, budget, output)) +} + +pub(super) async fn review_short_segments_with_llm( + segments: &[EventSegment], +) -> Result, AppError> { + let short_indices = short_segment_indices(segments); + if short_indices.is_empty() { + return Ok(segments.to_vec()); + } + + let output = generate_object::( + vec![ + ChatCompletionRequestMessage::System(ChatCompletionRequestSystemMessage::from( + "Return JSON with decisions. Reasons must be one of: same_topic_continuation, detail_elaboration, direct_response, closing_or_farewell, separate_topic, separate_activity, separate_intent.", + )), + ChatCompletionRequestMessage::User(ChatCompletionRequestUserMessage::from( + build_short_segment_merge_prompt(segments, &short_indices), + )), + ], + "event_short_segment_merge_review".to_owned(), + Some("Review short event segments for semantic merge".to_owned()), + ) + .await?; + + Ok(apply_short_segment_merge_decisions( + segments, + output.decisions, + )) +} + +fn candidate_scores(candidates: &[BoundaryCandidate]) -> Vec<(usize, f32)> { + candidates + .iter() + .map(|candidate| (candidate.index, candidate.score)) + .collect() +} + +fn apply_boundary_review_output( + candidates: &[BoundaryCandidate], + budget: usize, + output: BoundaryReviewOutput, +) -> Vec { + let candidate_by_index = candidates + .iter() + .map(|candidate| (candidate.index, candidate)) + .collect::>(); + let decisions = valid_boundary_decisions(output.decisions, &candidate_by_index); + let mut kept = kept_boundaries( + output.keep_boundary_indices, + &candidate_by_index, + &decisions, + ); + + if kept.is_empty() { + kept = fallback_boundaries(&decisions); + } + + kept.sort_by(|left, right| right.1.total_cmp(&left.1)); + kept.truncate(budget); + kept.sort_by_key(|(index, _)| *index); + + kept + .into_iter() + .filter_map(|(index, _)| candidate_by_index.get(&index).copied()) + .map(|candidate| with_reviewed_reason(candidate, &decisions)) + .collect() +} + +fn valid_boundary_decisions( + decisions: Vec, + candidate_by_index: &BTreeMap, +) -> BTreeMap { + decisions + .into_iter() + .filter_map(|decision| { + let index = usize::try_from(decision.boundary_index).ok()?; + candidate_by_index + .contains_key(&index) + .then_some((index, (decision.label, decision.confidence))) + }) + .collect() +} + +fn kept_boundaries( + keep_boundary_indices: Vec, + candidate_by_index: &BTreeMap, + decisions: &BTreeMap, +) -> Vec<(usize, f32)> { + keep_boundary_indices + .into_iter() + .filter_map(|index| usize::try_from(index).ok()) + .filter(|index| candidate_by_index.contains_key(index)) + .filter(|index| { + decisions + .get(index) + .is_none_or(|(label, confidence)| label.is_boundary() || *confidence < 0.55) + }) + .map(|index| { + let confidence = decisions + .get(&index) + .map_or(1.0, |(_, confidence)| *confidence); + (index, confidence) + }) + .collect() +} + +fn fallback_boundaries(decisions: &BTreeMap) -> Vec<(usize, f32)> { + decisions + .iter() + .filter(|(_, (label, confidence))| label.is_boundary() && *confidence >= 0.45) + .map(|(index, (_, confidence))| (*index, *confidence)) + .collect() +} + +fn with_reviewed_reason( + candidate: &BoundaryCandidate, + decisions: &BTreeMap, +) -> BoundaryCandidate { + let mut candidate = candidate.clone(); + if let Some((label, _)) = decisions.get(&candidate.index) { + candidate.reason = label.to_segment_reason(); + } + candidate +} + +fn short_segment_indices(segments: &[EventSegment]) -> Vec { + segments + .iter() + .enumerate() + .skip(1) + .filter_map(|(index, segment)| { + (segment.events.len() <= MIN_SEGMENT_EVENTS + 1).then_some(index) + }) + .collect() +} + +fn apply_short_segment_merge_decisions( + segments: &[EventSegment], + decisions: Vec, +) -> Vec { + let merge_indices = merge_decisions_to_indices(decisions); + let mut merged: Vec = Vec::new(); + + for (index, segment) in segments.iter().cloned().enumerate() { + if index > 0 + && merge_indices.contains(&index) + && let Some(previous) = merged.last_mut() + { + previous.events.extend(segment.events); + previous.boundary_after_confidence = segment.boundary_after_confidence; + previous.score = previous.score.min(segment.score); + } else { + merged.push(segment); + } + } + + merged +} + +fn merge_decisions_to_indices(decisions: Vec) -> BTreeSet { + decisions + .into_iter() + .filter(|decision| { + decision.merge_with_previous && decision.confidence >= 0.55 && decision.reason.is_merge() + }) + .filter_map(|decision| usize::try_from(decision.segment_index).ok()) + .collect() +} + +#[cfg(test)] +mod tests { + use chrono::{TimeZone, Utc}; + use plastmem_event::{Event, EventData, MessageEventData, MessageEventRole}; + + use super::*; + + fn candidate(index: usize, reason: EventSegmentReason) -> BoundaryCandidate { + BoundaryCandidate { + index, + score: 0.8, + reason, + } + } + + fn boundary_output( + keep_boundary_indices: Vec, + decisions: Vec, + ) -> BoundaryReviewOutput { + BoundaryReviewOutput { + keep_boundary_indices, + decisions, + } + } + + fn boundary_decision( + boundary_index: u32, + label: BoundaryLabel, + confidence: f32, + ) -> BoundaryDecision { + BoundaryDecision { + boundary_index, + label, + confidence, + } + } + + fn merge_decision( + segment_index: u32, + reason: ShortSegmentMergeReason, + confidence: f32, + ) -> ShortSegmentMergeDecision { + ShortSegmentMergeDecision { + segment_index, + merge_with_previous: true, + reason, + confidence, + } + } + + fn segment(event_count: usize) -> EventSegment { + let events = (0..event_count) + .map(|index| { + Event::new( + EventData::Message(MessageEventData { + role: MessageEventRole::User, + content: format!("event {index}"), + }), + Utc + .timestamp_opt(1_700_000_000 + index as i64 * 60, 0) + .single() + .expect("valid timestamp"), + None, + ) + }) + .collect(); + EventSegment::with_metadata(events, EventSegmentReason::TopicShift, 0.7, 0.5, 0.6) + } + + #[test] + fn boundary_review_keeps_explicit_valid_boundary() { + let candidates = vec![candidate(4, EventSegmentReason::TopicShift)]; + let reviewed = apply_boundary_review_output( + &candidates, + 1, + boundary_output( + vec![4], + vec![boundary_decision(4, BoundaryLabel::IntentShift, 0.9)], + ), + ); + + assert_eq!(reviewed.len(), 1); + assert_eq!(reviewed[0].index, 4); + assert_eq!(reviewed[0].reason, EventSegmentReason::IntentShift); + } + + #[test] + fn boundary_review_falls_back_to_confident_boundary_label() { + let candidates = vec![candidate(4, EventSegmentReason::TopicShift)]; + let reviewed = apply_boundary_review_output( + &candidates, + 1, + boundary_output( + Vec::new(), + vec![boundary_decision(4, BoundaryLabel::ActivityShift, 0.5)], + ), + ); + + assert_eq!(reviewed.len(), 1); + assert_eq!(reviewed[0].reason, EventSegmentReason::ActivityShift); + } + + #[test] + fn boundary_review_rejects_high_confidence_non_boundary_label() { + let candidates = vec![candidate(4, EventSegmentReason::TopicShift)]; + let reviewed = apply_boundary_review_output( + &candidates, + 1, + boundary_output( + vec![4], + vec![boundary_decision(4, BoundaryLabel::DirectResponse, 0.9)], + ), + ); + + assert!(reviewed.is_empty()); + } + + #[test] + fn short_segment_merge_ignores_low_confidence_or_separate_decisions() { + let segments = vec![segment(6), segment(2), segment(2)]; + let merged = apply_short_segment_merge_decisions( + &segments, + vec![ + merge_decision(1, ShortSegmentMergeReason::SameTopicContinuation, 0.3), + merge_decision(2, ShortSegmentMergeReason::SeparateTopic, 0.9), + ], + ); + + assert_eq!(merged.len(), 3); + } + + #[test] + fn short_segment_merge_combines_valid_decision_with_previous_segment() { + let segments = vec![segment(6), segment(2)]; + let merged = apply_short_segment_merge_decisions( + &segments, + vec![merge_decision( + 1, + ShortSegmentMergeReason::DirectResponse, + 0.9, + )], + ); + + assert_eq!(merged.len(), 1); + assert_eq!(merged[0].events.len(), 8); + assert_eq!(merged[0].boundary_after_confidence, 0.6); + } +} diff --git a/crates/event_segmentation/src/lib.rs b/crates/event_segmentation/src/lib.rs index 0f6bdc1..4520970 100644 --- a/crates/event_segmentation/src/lib.rs +++ b/crates/event_segmentation/src/lib.rs @@ -6,3 +6,5 @@ pub use event_segment::{EventSegment, EventSegmentReason}; mod event_segmenter; pub use event_segmenter::EventSegmenter; + +mod prompt; diff --git a/crates/event_segmentation/src/prompt.rs b/crates/event_segmentation/src/prompt.rs new file mode 100644 index 0000000..fcfaa6f --- /dev/null +++ b/crates/event_segmentation/src/prompt.rs @@ -0,0 +1,138 @@ +use plastmem_event::{Event, EventDataToString}; + +use crate::{EventSegment, EventSegmentReason}; + +pub fn build_boundary_review_prompt( + events: &[Event], + candidates: &[(usize, f32)], + boundary_budget: usize, +) -> String { + let mut prompt = format!( + "Review candidate boundaries for a multilingual dialogue. Fill keep_boundary_indices with at most {} candidate indices that should be kept. Also return decisions for the kept indices, and optionally for nearby rejected candidates when useful. Nearby candidate indices may describe the same transition; choose the single best index, not all of them. Prefer fewer, larger event segments, but keep real pivots between unrelated subjects, activities, plans, stories, intents, or explicit structural pivots. Use topic_shift/topic_intro/intent_shift/activity_shift/structural_cue for true boundaries. Use detail_elaboration/direct_response/closing/noise for continuations, which should usually not be split. Do not split follow-ups, clarifications, examples, greetings, or closing turns. It is valid to keep none.\n\n", + boundary_budget + ); + + for (candidate_index, candidate_score) in candidates { + let left = candidate_index.saturating_sub(5); + let right = (candidate_index + 5).min(events.len()); + prompt.push_str(&format!( + "Candidate boundary_index={} score={:.3}:\n", + candidate_index, candidate_score + )); + for (offset, event) in events[left..right].iter().enumerate() { + let index = left + offset; + if index == *candidate_index { + prompt.push_str(" \n"); + } + prompt.push_str(&format!( + " [idx={index}] {} {}\n", + event.timestamp.format("%Y-%m-%dT%H:%M:%SZ"), + event.data.to_string_without_timestamp() + )); + } + prompt.push('\n'); + } + + prompt +} + +pub fn build_short_segment_merge_prompt( + segments: &[EventSegment], + short_indices: &[usize], +) -> String { + let mut prompt = "Review short event segments in a multilingual dialogue. For each listed segment_index, decide whether the current short segment should merge with the immediately previous segment. Merge only when the short segment is a continuation, detail, direct response, greeting/closing, or small conversational tail of the previous event. Keep separate when it starts an independent topic, activity, intent, story, or plan. Use semantic relation across languages; do not rely on English keywords. Return one decision for every listed segment_index.\n\n".to_owned(); + + for &index in short_indices { + prompt.push_str(&format!("segment_index={index}\nprevious_segment_tail:\n")); + let previous = &segments[index - 1].events; + let previous_start = previous.len().saturating_sub(8); + append_events(&mut prompt, &previous[previous_start..]); + prompt.push_str("current_short_segment:\n"); + if segments[index].reason != EventSegmentReason::InitialSegment { + prompt.push_str(&format!( + " boundary_reason={}\n", + segments[index].reason.as_ref() + )); + } + append_events(&mut prompt, &segments[index].events); + prompt.push('\n'); + } + + prompt +} + +fn append_events(prompt: &mut String, events: &[Event]) { + for event in events { + prompt.push_str(&format!( + " {} {}\n", + event.timestamp.format("%Y-%m-%dT%H:%M:%SZ"), + event.data.to_string_without_timestamp() + )); + } +} + +#[cfg(test)] +mod tests { + use chrono::{TimeZone, Utc}; + use plastmem_event::{Event, EventData, MessageEventData, MessageEventRole}; + + use super::{build_boundary_review_prompt, build_short_segment_merge_prompt}; + use crate::{EventSegment, EventSegmentReason}; + + fn message_event(content: &str) -> Event { + Event::new( + EventData::Message(MessageEventData { + role: MessageEventRole::User, + content: content.to_owned(), + }), + Utc + .timestamp_opt(1_700_000_000, 0) + .single() + .expect("valid timestamp"), + None, + ) + } + + #[test] + fn boundary_review_prompt_includes_candidate_marker() { + let events = vec![ + message_event("a"), + message_event("b"), + message_event("c"), + message_event("d"), + message_event("e"), + message_event("f"), + ]; + + let prompt = build_boundary_review_prompt(&events, &[(3, 0.82)], 2); + + assert!(prompt.contains("Candidate boundary_index=3 score=0.820")); + assert!(prompt.contains("")); + assert!(prompt.contains("[idx=3]")); + } + + #[test] + fn short_segment_merge_prompt_includes_boundary_reason() { + let segments = vec![ + EventSegment::with_metadata( + vec![message_event("previous")], + EventSegmentReason::InitialSegment, + 1.0, + 1.0, + 1.0, + ), + EventSegment::with_metadata( + vec![message_event("current")], + EventSegmentReason::StructuralCue, + 1.0, + 1.0, + 1.0, + ), + ]; + + let prompt = build_short_segment_merge_prompt(&segments, &[1]); + + assert!(prompt.contains("segment_index=1")); + assert!(prompt.contains("boundary_reason=structural_cue")); + } +} diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs index 81e3c3e..f055993 100644 --- a/crates/migration/src/lib.rs +++ b/crates/migration/src/lib.rs @@ -1,9 +1,6 @@ pub use sea_orm_migration::*; -mod m20260417_01_create_conversation_message_table; -mod m20260417_02_create_segmentation_state_table; -mod m20260417_03_create_episode_span_table; -mod m20260417_04_create_pending_review_queue_table; +mod m20260417_01_create_message_queue_table; mod m20260417_05_create_episodic_memory_table; mod m20260417_06_create_semantic_memory_table; @@ -13,10 +10,7 @@ pub struct Migrator; impl MigratorTrait for Migrator { fn migrations() -> Vec> { vec![ - Box::new(m20260417_01_create_conversation_message_table::Migration), - Box::new(m20260417_02_create_segmentation_state_table::Migration), - Box::new(m20260417_03_create_episode_span_table::Migration), - Box::new(m20260417_04_create_pending_review_queue_table::Migration), + Box::new(m20260417_01_create_message_queue_table::Migration), Box::new(m20260417_05_create_episodic_memory_table::Migration), Box::new(m20260417_06_create_semantic_memory_table::Migration), ] diff --git a/crates/migration/src/m20260417_01_create_message_queue_table.rs b/crates/migration/src/m20260417_01_create_message_queue_table.rs new file mode 100644 index 0000000..c507bb0 --- /dev/null +++ b/crates/migration/src/m20260417_01_create_message_queue_table.rs @@ -0,0 +1,44 @@ +use sea_orm_migration::{ + prelude::*, + schema::{integer, json_binary, text, timestamp_with_time_zone, uuid}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(MessageQueue::Table) + .if_not_exists() + .col(uuid(MessageQueue::Id).primary_key()) + .col(json_binary(MessageQueue::Messages).not_null()) + .col(json_binary(MessageQueue::PendingReviews).null()) + .col(integer(MessageQueue::InProgressFence).null()) + .col(timestamp_with_time_zone(MessageQueue::InProgressSince).null()) + .col(text(MessageQueue::PrevEpisodeContent).null()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(MessageQueue::Table).to_owned()) + .await + } +} + +#[derive(Iden)] +enum MessageQueue { + Table, + Id, + Messages, + PendingReviews, + InProgressFence, + InProgressSince, + PrevEpisodeContent, +} diff --git a/crates/server/src/api/add_message.rs b/crates/server/src/api/add_message.rs index 69b34c0..6032061 100644 --- a/crates/server/src/api/add_message.rs +++ b/crates/server/src/api/add_message.rs @@ -6,7 +6,7 @@ use axum::{ response::{IntoResponse, Response}, }; use chrono::{DateTime, Utc}; -use plastmem_core::{append_batch_messages, append_message}; +use plastmem_core::MessageQueue; use plastmem_shared::{AppError, Message, MessageRole}; use plastmem_worker::EventSegmentationJob; use serde::{Deserialize, Serialize}; @@ -83,10 +83,15 @@ pub async fn add_message( timestamp, }; - if let Some(claim) = append_message(payload.conversation_id, message, false, &state.db).await? { + if let Some(check) = MessageQueue::push(payload.conversation_id, message, &state.db).await? { let mut job_storage = state.segmentation_job_storage.clone(); job_storage - .push(EventSegmentationJob::from_claim(claim)) + .push(EventSegmentationJob { + conversation_id: payload.conversation_id, + fence_count: check.fence_count, + force_process: check.force_process, + keep_tail_segment: true, + }) .await?; } @@ -129,10 +134,19 @@ pub async fn import_batch_messages( }) .collect::>(); - if let Some(claim) = append_batch_messages(payload.conversation_id, &messages, &state.db).await? { + let pending_count = + MessageQueue::push_batch(payload.conversation_id, &messages, &state.db).await?; + if pending_count > 0 + && MessageQueue::try_set_fence(payload.conversation_id, pending_count, &state.db).await? + { let mut job_storage = state.segmentation_job_storage.clone(); job_storage - .push(EventSegmentationJob::from_claim(claim)) + .push(EventSegmentationJob { + conversation_id: payload.conversation_id, + fence_count: pending_count, + force_process: true, + keep_tail_segment: false, + }) .await?; } diff --git a/crates/server/src/api/benchmark.rs b/crates/server/src/api/benchmark.rs index 27aad0b..89f1d1e 100644 --- a/crates/server/src/api/benchmark.rs +++ b/crates/server/src/api/benchmark.rs @@ -2,7 +2,7 @@ use axum::{ Json, extract::{Query, State}, }; -use plastmem_core::{get_segmentation_processing_status, recover_stale_segmentation_job}; +use plastmem_core::{FENCE_TTL_MINUTES, MessageQueue}; use plastmem_shared::AppError; use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement}; use serde::{Deserialize, Serialize}; @@ -71,8 +71,8 @@ async fn get_queue_status( id: Uuid, db: &DatabaseConnection, ) -> Result { - recover_stale_segmentation_job(id, db).await?; - let segmentation_status = get_segmentation_processing_status(id, db).await?; + MessageQueue::clear_stale_fence(id, FENCE_TTL_MINUTES, db).await?; + let segmentation_status = MessageQueue::get_processing_status(id, db).await?; let jobs_sql = "SELECT \ COUNT(*) FILTER (WHERE status IN ('Pending', 'Running') AND job_type LIKE '%EventSegmentationJob%' AND convert_from(job, 'UTF8')::jsonb->>'conversation_id' = $1)::bigint AS segmentation_jobs_active, \ @@ -94,15 +94,15 @@ async fn get_queue_status( predict_calibrate_jobs_active: 0, }); - let done = segmentation_status.pending_message_count == 0 - && !segmentation_status.active + let done = segmentation_status.messages_pending == 0 + && !segmentation_status.fence_active && jobs.segmentation_jobs_active == 0 && jobs.episode_creation_jobs_active == 0 && jobs.predict_calibrate_jobs_active == 0; Ok(BenchmarkJobStatus { - messages_pending: i32::try_from(segmentation_status.pending_message_count).unwrap_or(i32::MAX), - fence_active: segmentation_status.active, + messages_pending: segmentation_status.messages_pending, + fence_active: segmentation_status.fence_active, segmentation_jobs_active: jobs.segmentation_jobs_active, episode_creation_jobs_active: jobs.episode_creation_jobs_active, predict_calibrate_jobs_active: jobs.predict_calibrate_jobs_active, diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 212543a..1043b5c 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -11,7 +11,9 @@ mod benchmark; mod recent_memory; mod retrieve_memory; -pub use add_message::{IngestMessageResult, InputConversationMessage, InputConversationMessages, InputMessage}; +pub use add_message::{ + IngestMessageResult, InputConversationMessage, InputConversationMessages, InputMessage, +}; #[cfg(debug_assertions)] pub use benchmark::BenchmarkJobStatus; pub use recent_memory::RecentMemory; @@ -31,8 +33,7 @@ pub fn app() -> Router { .routes(routes!(retrieve_memory::context_pre_retrieve)); #[cfg(debug_assertions)] - let router = router - .routes(routes!(benchmark::benchmark_job_status)); + let router = router.routes(routes!(benchmark::benchmark_job_status)); let (router, openapi) = router.split_for_parts(); diff --git a/crates/server/src/api/retrieve_memory.rs b/crates/server/src/api/retrieve_memory.rs index 615cee2..0a82229 100644 --- a/crates/server/src/api/retrieve_memory.rs +++ b/crates/server/src/api/retrieve_memory.rs @@ -1,7 +1,7 @@ use axum::{Json, extract::State}; use plastmem_ai::embed; use plastmem_core::{ - DetailLevel, EpisodicMemory, SemanticMemory, add_pending_review_item, format_tool_result, + DetailLevel, EpisodicMemory, MessageQueue, SemanticMemory, format_tool_result, }; use plastmem_shared::{APP_ENV, AppError}; use sea_orm::prelude::PgVector; @@ -82,7 +82,8 @@ async fn fetch_memory( )?; if APP_ENV.enable_fsrs_review && !episodic.is_empty() { let memory_ids = episodic.iter().map(|(m, _)| m.id).collect(); - add_pending_review_item(conversation_id, memory_ids, query.to_owned(), &state.db).await?; + MessageQueue::add_pending_review(conversation_id, memory_ids, query.to_owned(), &state.db) + .await?; } Ok((semantic, episodic)) } diff --git a/crates/server/src/server.rs b/crates/server/src/server.rs index b6e8104..4fcbf9b 100644 --- a/crates/server/src/server.rs +++ b/crates/server/src/server.rs @@ -11,9 +11,7 @@ use axum::response::Html; use axum::{Extension, response::Redirect}; use axum::{Router, routing::get}; use plastmem_shared::AppError; -use plastmem_worker::{ - EpisodeCreationJob, EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob, -}; +use plastmem_worker::{EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob}; use sea_orm::DatabaseConnection; #[cfg(debug_assertions)] use std::sync::{Arc, Mutex}; @@ -34,7 +32,6 @@ async fn handler() -> Html<&'static str> { pub async fn server( db: DatabaseConnection, segment_job_storage: PostgresStorage, - episode_creation_job_storage: PostgresStorage, review_job_storage: PostgresStorage, predict_calibrate_job_storage: PostgresStorage, #[cfg(debug_assertions)] board_broadcaster: Arc>, @@ -42,7 +39,6 @@ pub async fn server( let app_state = AppState::new( db, segment_job_storage, - episode_creation_job_storage, review_job_storage, predict_calibrate_job_storage, ); @@ -76,7 +72,6 @@ fn board_app( ) -> Router { let board_api = ApiBuilder::new(Router::new()) .register(app_state.segmentation_job_storage.clone()) - .register(app_state.episode_creation_job_storage.clone()) .register(app_state.review_job_storage.clone()) .register(app_state.predict_calibrate_job_storage.clone()) .build(); diff --git a/crates/server/src/utils/state.rs b/crates/server/src/utils/state.rs index 1d6ef18..0ef2f40 100644 --- a/crates/server/src/utils/state.rs +++ b/crates/server/src/utils/state.rs @@ -1,15 +1,12 @@ use apalis_postgres::PostgresStorage; use sea_orm::DatabaseConnection; -use plastmem_worker::{ - EpisodeCreationJob, EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob, -}; +use plastmem_worker::{EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob}; #[derive(Clone)] pub struct AppState { pub db: DatabaseConnection, pub segmentation_job_storage: PostgresStorage, - pub episode_creation_job_storage: PostgresStorage, pub review_job_storage: PostgresStorage, pub predict_calibrate_job_storage: PostgresStorage, } @@ -20,14 +17,12 @@ impl AppState { pub fn new( db: DatabaseConnection, segmentation_job_storage: PostgresStorage, - episode_creation_job_storage: PostgresStorage, review_job_storage: PostgresStorage, predict_calibrate_job_storage: PostgresStorage, ) -> Self { Self { db, segmentation_job_storage, - episode_creation_job_storage, review_job_storage, predict_calibrate_job_storage, } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index f467957..42b7745 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -26,6 +26,7 @@ schemars.workspace = true plastmem_core.workspace = true plastmem_entities.workspace = true plastmem_ai.workspace = true +plastmem_event.workspace = true plastmem_event_segmentation.workspace = true plastmem_shared.workspace = true sea-orm.workspace = true diff --git a/crates/worker/src/jobs/episode_creation.rs b/crates/worker/src/jobs/episode_creation.rs deleted file mode 100644 index 794a057..0000000 --- a/crates/worker/src/jobs/episode_creation.rs +++ /dev/null @@ -1,594 +0,0 @@ -use std::fmt::Write; - -use apalis::prelude::{Data, TaskSink}; -use apalis_postgres::PostgresStorage; -use chrono::{DateTime, Datelike, Timelike, Utc}; -use fsrs::{DEFAULT_PARAMETERS, FSRS}; -use plastmem_ai::{ - ChatCompletionRequestMessage, ChatCompletionRequestSystemMessage, - ChatCompletionRequestUserMessage, embed, generate_object, -}; -use plastmem_core::{EpisodeSpan, get_episode_span, get_messages_in_range}; -use plastmem_entities::{EpisodeClassification, episodic_memory}; -use plastmem_shared::{AppError, Message}; -use schemars::JsonSchema; -use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use super::PredictCalibrateJob; - -const DESIRED_RETENTION: f32 = 0.9; -const EPISODE_CREATION_JOB_NAMESPACE: Uuid = - Uuid::from_u128(0x7b70f7c6_0c6d_4bb9_b0d2_9386445a6104); - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EpisodeCreationJob { - pub conversation_id: Uuid, - pub start_seq: i64, - pub end_seq: i64, -} - -impl EpisodeCreationJob { - pub const fn from_span(span: &EpisodeSpan) -> Self { - Self { - conversation_id: span.conversation_id, - start_seq: span.start_seq, - end_seq: span.end_seq, - } - } - - fn deterministic_episode_id(&self) -> Uuid { - Uuid::new_v5( - &EPISODE_CREATION_JOB_NAMESPACE, - format!( - "{}:{}:{}", - self.conversation_id, self.start_seq, self.end_seq - ) - .as_bytes(), - ) - } -} - -#[derive(Debug, Deserialize, JsonSchema)] -struct EpisodeTitleOutput { - title: String, -} - -#[derive(Debug, Clone)] -struct RenderedEpisodeLine { - line_index: usize, - timestamp: DateTime, - role: String, - content: String, -} - -#[derive(Debug, Clone)] -struct TimeAnchorCandidateLine { - line_index: usize, - timestamp: DateTime, - role: String, - content: String, -} - -#[derive(Debug, Clone, Copy, Deserialize, JsonSchema, PartialEq, Eq, PartialOrd, Ord)] -#[serde(rename_all = "snake_case")] -enum TimeAnchorPrecision { - Time, - Day, - Week, - Month, - Year, -} - -#[derive(Debug, Deserialize, JsonSchema)] -struct TimeAnchorOutput { - insertions: Vec, -} - -#[derive(Debug, Clone, Deserialize, JsonSchema)] -struct TimeAnchorInsertion { - line_index: u32, - exact_text: String, - anchor_text: String, - precision: TimeAnchorPrecision, -} - -const EPISODE_TITLE_SYSTEM_PROMPT: &str = r" -You are naming one conversation segment for episodic memory retrieval. -Return only JSON with `title`. - -Requirements: -1. The title must be concise, descriptive, and easy to search. -2. Keep it within 10-20 words and name the main topic, activity, or event. -3. Preserve names, places, products, and distinctive wording when they help retrieval. -4. Do not invent facts or generalize away the concrete topic. -"; - -const EPISODE_TIME_ANCHOR_SYSTEM_PROMPT: &str = r" -You are adding grounded time anchors to existing conversation lines. -Return only JSON with `insertions`. - -Each insertion must contain: -- `line_index`: the exact line index shown in the input -- `exact_text`: copy the original time phrase exactly as it appears in the line content -- `anchor_text`: only the grounded parenthetical text to append, without parentheses -- `precision`: one of `time`, `day`, `week`, `month`, or `year` - -Rules: -1. Do not rewrite, summarize, reorder, or delete any line. -2. Only add anchors for time expressions that already exist in the content. -3. Use `spoken_at` as the reference point for resolving relative time expressions. -4. If a line contains multiple time expressions that can be grounded with high confidence, return one insertion for each of them. -5. If a phrase cannot be resolved cleanly from `spoken_at`, omit the insertion entirely. -6. `exact_text` must copy the full original time phrase exactly as it appears in the line. Do not return only a fragment of the phrase. -7. `anchor_text` must add grounded calendar information. Do not repeat, paraphrase, abstract, or generalize the original phrase. -8. The anchor precision must never be more specific than the original phrase supports. -9. Match the original phrase's granularity: - - `last year` -> `2022` with precision `year` - - `last month` -> `July 2023` with precision `month` - - `the previous weekend` -> `June 17-18, 2023` with precision `week` - - `next Tuesday` -> `May 16, 2023` with precision `day` - - `next Monday at 10:30 AM` -> `May 15, 2023 10:30 AM` with precision `time` -10. Preserve the original phrase in the line and resolve it inline after that phrase. -11. Do not modify non-time text. -12. Do not include parentheses inside `anchor_text`. -"; - -// ────────────────────────────────────────────────── -// Entry -// ────────────────────────────────────────────────── - -pub async fn process_episode_creation( - job: EpisodeCreationJob, - db: Data, - predict_storage: Data>, -) -> Result<(), AppError> { - let db = &*db; - - let Some(span) = try_load_current_span(&job, db).await? else { - return Ok(()); - }; - - let episode_id = job.deterministic_episode_id(); - let already_consolidated = try_ensure_episode_exists(episode_id, &span, db).await?; - - try_enqueue_predict_calibrate_if_needed( - span.conversation_id, - episode_id, - &span.classification, - already_consolidated, - &predict_storage, - ) - .await?; - - Ok(()) -} - -// ────────────────────────────────────────────────── -// Episode Lifecycle -// ────────────────────────────────────────────────── - -async fn try_load_current_span( - job: &EpisodeCreationJob, - db: &DatabaseConnection, -) -> Result, AppError> { - let Some(span) = get_episode_span(job.conversation_id, job.start_seq, db).await? else { - tracing::debug!( - conversation_id = %job.conversation_id, - start_seq = job.start_seq, - end_seq = job.end_seq, - "Skipping stale episode creation job" - ); - return Ok(None); - }; - - if span.end_seq != job.end_seq { - tracing::debug!( - conversation_id = %job.conversation_id, - start_seq = job.start_seq, - queued_end_seq = job.end_seq, - actual_end_seq = span.end_seq, - "Skipping mismatched episode creation job" - ); - return Ok(None); - } - - Ok(Some(span)) -} - -// Retries may observe an episode record that already exists even though the -// previous attempt never reached downstream consolidation scheduling. -async fn try_ensure_episode_exists( - episode_id: Uuid, - span: &EpisodeSpan, - db: &DatabaseConnection, -) -> Result { - if let Some(existing_episode) = episodic_memory::Entity::find_by_id(episode_id) - .one(db) - .await? - { - return Ok(existing_episode.consolidated_at.is_some()); - } - - let messages = load_episode_source_messages(span, db).await?; - create_episode_record(episode_id, span, &messages, db).await?; - Ok(false) -} - -fn should_enqueue_predict_calibrate(classification: &EpisodeClassification) -> bool { - matches!(classification, EpisodeClassification::Informative) -} - -// Skip fully consolidated episodes, but still re-enqueue unfinished ones so -// episode creation retries can resume the downstream pipeline. -async fn try_enqueue_predict_calibrate_if_needed( - conversation_id: Uuid, - episode_id: Uuid, - classification: &EpisodeClassification, - already_consolidated: bool, - predict_storage: &PostgresStorage, -) -> Result<(), AppError> { - if !should_enqueue_predict_calibrate(classification) { - return Ok(()); - } - if already_consolidated { - return Ok(()); - } - - let mut storage = predict_storage.clone(); - storage - .push(PredictCalibrateJob { - conversation_id, - episode_id, - force: false, - }) - .await?; - - Ok(()) -} - -// ────────────────────────────────────────────────── -// Episode Persistence -// ────────────────────────────────────────────────── - -async fn load_episode_source_messages( - span: &EpisodeSpan, - db: &DatabaseConnection, -) -> Result, AppError> { - let conversation_messages = - get_messages_in_range(span.conversation_id, span.start_seq, span.end_seq, db).await?; - if conversation_messages.is_empty() { - return Err(AppError::new(anyhow::anyhow!( - "Episode span has no backing messages" - ))); - } - - Ok( - conversation_messages - .iter() - .map(plastmem_core::ConversationMessage::to_message) - .collect(), - ) -} - -async fn create_episode_record( - episode_id: Uuid, - span: &EpisodeSpan, - messages: &[Message], - db: &DatabaseConnection, -) -> Result<(), AppError> { - let (title, content) = generate_episode_artifacts(messages).await?; - let embedding = embed(&content).await?; - - let fsrs = FSRS::new(Some(&DEFAULT_PARAMETERS))?; - let initial_states = fsrs.next_states(None, DESIRED_RETENTION, 0)?; - let initial_state = initial_states.good.memory; - let now = Utc::now(); - let start_at = messages.first().map_or(now, |message| message.timestamp); - let end_at = messages.last().map_or(now, |message| message.timestamp); - - episodic_memory::ActiveModel { - id: Set(episode_id), - conversation_id: Set(span.conversation_id), - messages: Set(serde_json::to_value(messages.to_vec())?), - content: Set(content), - embedding: Set(embedding), - title: Set(title), - stability: Set(initial_state.stability), - difficulty: Set(initial_state.difficulty), - surprise: Set(0.0), - classification: Set(Some(span.classification.clone())), - start_at: Set(start_at.into()), - end_at: Set(end_at.into()), - created_at: Set(now.into()), - last_reviewed_at: Set(now.into()), - consolidated_at: Set(None), - } - .insert(db) - .await?; - - Ok(()) -} - -// ────────────────────────────────────────────────── -// Artifact Generation -// ────────────────────────────────────────────────── - -// Render a deterministic transcript first, then let the LLM add grounded time -// anchors before generating the retrieval title. -async fn generate_episode_artifacts(messages: &[Message]) -> Result<(String, String), AppError> { - let mut lines = render_episode_lines(messages); - try_anchor_episode_lines(&mut lines).await; - let content = render_episode_content(&lines); - let title = generate_episode_title(messages, &content).await?; - Ok((title, content)) -} - -async fn generate_episode_title(messages: &[Message], content: &str) -> Result { - let system = ChatCompletionRequestSystemMessage::from(EPISODE_TITLE_SYSTEM_PROMPT.trim()); - let user = ChatCompletionRequestUserMessage::from(format!( - "Episode content:\n{}\n\nSource messages:\n{}", - content, - format_messages(messages) - )); - - let output = generate_object::( - vec![ - ChatCompletionRequestMessage::System(system), - ChatCompletionRequestMessage::User(user), - ], - "episodic_title_generation".to_owned(), - Some("Generate an episodic memory title".to_owned()), - ) - .await?; - - let title = output.title.trim(); - Ok(if title.is_empty() { - "Conversation Segment".to_owned() - } else { - title.to_owned() - }) -} - -async fn try_anchor_episode_lines(lines: &mut [RenderedEpisodeLine]) { - let candidates = build_time_anchor_candidates(lines); - if candidates.is_empty() { - return; - } - - let output = match request_time_anchor_insertions(&candidates).await { - Ok(output) => output, - Err(err) => { - tracing::warn!(error = %err, "Episode time anchoring failed; using deterministic content"); - return; - } - }; - - for candidate in &candidates { - let Some(line) = lines.get_mut(candidate.line_index) else { - continue; - }; - let mut insertions: Vec<_> = output - .insertions - .iter() - .filter(|insertion| usize::try_from(insertion.line_index).ok() == Some(candidate.line_index)) - .filter(|insertion| is_valid_time_anchor_insertion(insertion, candidate)) - .cloned() - .collect(); - insertions.sort_by(|left, right| right.exact_text.len().cmp(&left.exact_text.len())); - - for insertion in insertions { - let _ = apply_insertion( - &mut line.content, - &insertion.exact_text, - insertion.anchor_text.trim(), - ); - } - } -} - -async fn request_time_anchor_insertions( - candidates: &[TimeAnchorCandidateLine], -) -> Result { - let system = ChatCompletionRequestSystemMessage::from(EPISODE_TIME_ANCHOR_SYSTEM_PROMPT.trim()); - let user = ChatCompletionRequestUserMessage::from(build_time_anchor_user_content(candidates)); - - generate_object::( - vec![ - ChatCompletionRequestMessage::System(system), - ChatCompletionRequestMessage::User(user), - ], - "episodic_time_anchoring".to_owned(), - Some("Add grounded time anchors to existing conversation lines".to_owned()), - ) - .await -} - -// ────────────────────────────────────────────────── -// Deterministic Rendering -// ────────────────────────────────────────────────── - -fn render_episode_lines(messages: &[Message]) -> Vec { - messages - .iter() - .enumerate() - .map(|(line_index, message)| RenderedEpisodeLine { - line_index, - timestamp: message.timestamp, - role: message.role.to_string(), - content: collapse_inline_whitespace(&message.content), - }) - .collect() -} - -fn render_episode_content(lines: &[RenderedEpisodeLine]) -> String { - let mut out = String::new(); - let mut current_bucket: Option<(i32, u32, u32, u32)> = None; - - for line in lines { - let bucket = ( - line.timestamp.year(), - line.timestamp.month(), - line.timestamp.day(), - line.timestamp.hour(), - ); - if current_bucket != Some(bucket) { - if !out.is_empty() { - out.push_str("\n\n"); - } - let _ = write!(out, "{}", format_at_header(line.timestamp)); - current_bucket = Some(bucket); - out.push('\n'); - } else { - out.push('\n'); - } - - let _ = write!(out, "{}: {}", line.role, line.content); - } - - out.trim_end().to_owned() -} - -fn build_time_anchor_candidates(lines: &[RenderedEpisodeLine]) -> Vec { - lines - .iter() - .map(|line| TimeAnchorCandidateLine { - line_index: line.line_index, - timestamp: line.timestamp, - role: line.role.clone(), - content: line.content.clone(), - }) - .collect() -} - -fn build_time_anchor_user_content(candidates: &[TimeAnchorCandidateLine]) -> String { - let mut out = String::from( - "Candidate lines for optional time anchoring.\nUse the provided `spoken_at` timestamp as reference when resolving relative time phrases.\n", - ); - - for candidate in candidates { - let _ = writeln!(out, "\nline_index={}", candidate.line_index); - let _ = writeln!( - out, - "spoken_at={}", - candidate.timestamp.format("%Y-%m-%dT%H:%M:%SZ") - ); - let _ = writeln!(out, "role={}", candidate.role); - let _ = writeln!(out, "content={}", candidate.content); - } - - out -} - -fn format_messages(messages: &[Message]) -> String { - messages - .iter() - .enumerate() - .map(|(index, message)| { - format!( - "Message {} [{}] {}: {}", - index + 1, - message.timestamp.format("%Y-%m-%dT%H:%M:%SZ"), - message.role, - message.content - ) - }) - .collect::>() - .join("\n") -} - -fn collapse_inline_whitespace(text: &str) -> String { - text.split_whitespace().collect::>().join(" ") -} - -fn month_abbrev(month: u32) -> &'static str { - match month { - 1 => "Jan", - 2 => "Feb", - 3 => "Mar", - 4 => "Apr", - 5 => "May", - 6 => "Jun", - 7 => "Jul", - 8 => "Aug", - 9 => "Sep", - 10 => "Oct", - 11 => "Nov", - 12 => "Dec", - _ => "Unknown", - } -} - -fn format_at_header(timestamp: DateTime) -> String { - let hour = timestamp.hour(); - let hour_12 = match hour % 12 { - 0 => 12, - value => value, - }; - let meridiem = if hour < 12 { "AM" } else { "PM" }; - format!( - "Spoken At: {} {}, {} {} {}", - month_abbrev(timestamp.month()), - timestamp.day(), - timestamp.year(), - hour_12, - meridiem - ) -} - -fn insertion_already_applied(content: &str, exact_text: &str) -> bool { - content.contains(&format!("{exact_text} (")) -} - -fn normalize_anchor_text(text: &str) -> String { - text - .split_whitespace() - .collect::>() - .join(" ") - .to_ascii_lowercase() -} - -fn looks_like_grounded_calendar_info(text: &str) -> bool { - text.chars().any(|c| c.is_ascii_digit()) -} - -fn apply_insertion(content: &mut String, exact_text: &str, anchor_text: &str) -> bool { - if insertion_already_applied(content, exact_text) { - return false; - } - - let matches: Vec<_> = content.match_indices(exact_text).collect(); - if matches.len() != 1 { - return false; - } - - let insert_at = matches[0].0 + exact_text.len(); - content.insert_str(insert_at, &format!(" ({anchor_text})")); - true -} - -fn is_valid_time_anchor_insertion( - insertion: &TimeAnchorInsertion, - candidate: &TimeAnchorCandidateLine, -) -> bool { - // Precision is still part of the LLM schema, but local validation is - // intentionally structural only. - let _ = insertion.precision; - if insertion.exact_text.trim().is_empty() || insertion.anchor_text.trim().is_empty() { - return false; - } - if insertion.anchor_text.contains('(') || insertion.anchor_text.contains(')') { - return false; - } - if !candidate.content.contains(&insertion.exact_text) { - return false; - } - if normalize_anchor_text(&insertion.exact_text) == normalize_anchor_text(&insertion.anchor_text) { - return false; - } - if !looks_like_grounded_calendar_info(&insertion.anchor_text) { - return false; - } - true -} diff --git a/crates/worker/src/jobs/event_segmentation.rs b/crates/worker/src/jobs/event_segmentation.rs index 4a8ec93..4cceb69 100644 --- a/crates/worker/src/jobs/event_segmentation.rs +++ b/crates/worker/src/jobs/event_segmentation.rs @@ -1,342 +1,441 @@ +use std::fmt::Write; + use apalis::prelude::{Data, TaskSink}; use apalis_postgres::PostgresStorage; -use chrono::Utc; -use plastmem_core::{ - EpisodeSpan, SegmentJobState, SegmentationJobClaim, abort_segmentation_job, - commit_segmentation_job, get_claim_messages, get_segmentation_state, take_pending_review_items, - try_claim_segmentation_job, -}; -use plastmem_entities::EpisodeClassification; -use plastmem_event_segmentation::{ - ReviewedSegment, SegmentClassification, primitive_review_llm_segmenter, - temporal_boundary_review_llm_segmenter, temporal_rule_segmenter, +use chrono::{DateTime, Datelike, Timelike, Utc}; +use fsrs::{DEFAULT_PARAMETERS, FSRS}; +use futures::future::{join_all, try_join_all}; +use plastmem_ai::{ + ChatCompletionRequestMessage, ChatCompletionRequestSystemMessage, + ChatCompletionRequestUserMessage, embed, generate_object, }; +use plastmem_core::{EpisodicMemory, MessageQueue}; +use plastmem_entities::episodic_memory; +use plastmem_event::{Event, EventData, MessageEventData, MessageEventRole}; +use plastmem_event_segmentation::{EventSegment, EventSegmentReason, EventSegmenter}; use plastmem_shared::{APP_ENV, AppError, Message}; +use schemars::JsonSchema; +use sea_orm::{DatabaseConnection, EntityTrait, TransactionTrait}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{EpisodeCreationJob, MemoryReviewJob}; +use super::{MemoryReviewJob, PredictCalibrateJob}; + +const FLASHBULB_SURPRISE_THRESHOLD: f32 = 0.85; +const FORCE_SINGLE_SEGMENT_QUEUE_LEN: usize = 30; +const DESIRED_RETENTION: f32 = 0.9; +const SURPRISE_BOOST_FACTOR: f32 = 0.5; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventSegmentationJob { pub conversation_id: Uuid, - pub active_segment_start_seq: i64, - pub active_segment_end_seq: i64, + pub fence_count: i32, + pub force_process: bool, + #[serde(default = "default_keep_tail_segment")] + pub keep_tail_segment: bool, } -impl EventSegmentationJob { - pub const fn to_claim(&self) -> SegmentationJobClaim { - SegmentationJobClaim { - conversation_id: self.conversation_id, - active_segment_start_seq: self.active_segment_start_seq, - active_segment_end_seq: self.active_segment_end_seq, - } - } +const fn default_keep_tail_segment() -> bool { + true +} - pub const fn from_claim(claim: SegmentationJobClaim) -> Self { - Self { - conversation_id: claim.conversation_id, - active_segment_start_seq: claim.active_segment_start_seq, - active_segment_end_seq: claim.active_segment_end_seq, - } - } +#[derive(Debug)] +struct BatchSegment { + messages: Vec, + title: String, + content: String, + surprise_signal: f32, +} + +struct CreatedEpisode { + id: Uuid, + surprise: f32, +} + +struct PreparedEpisode { + memory: EpisodicMemory, + surprise: f32, +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct EpisodeTitleOutput { + title: String, } #[derive(Debug, Clone)] -struct CommitPlan { - finalized_segments: Vec, - next_segment_start_seq: i64, -} - -struct SegmentationContext<'a> { - db: &'a sea_orm::DatabaseConnection, - segmentation_storage: &'a PostgresStorage, - episode_creation_storage: &'a PostgresStorage, - review_storage: &'a PostgresStorage, -} - -impl<'a> SegmentationContext<'a> { - const fn new( - db: &'a sea_orm::DatabaseConnection, - segmentation_storage: &'a PostgresStorage, - episode_creation_storage: &'a PostgresStorage, - review_storage: &'a PostgresStorage, - ) -> Self { - Self { - db, - segmentation_storage, - episode_creation_storage, - review_storage, - } - } +struct RenderedEpisodeLine { + line_index: usize, + timestamp: DateTime, + role: String, + content: String, } -// ────────────────────────────────────────────────── -// Entry -// ────────────────────────────────────────────────── +#[derive(Debug, Clone)] +struct TimeAnchorCandidateLine { + line_index: usize, + timestamp: DateTime, + role: String, + content: String, +} + +#[derive(Debug, Clone, Copy, Deserialize, JsonSchema, PartialEq, Eq, PartialOrd, Ord)] +#[serde(rename_all = "snake_case")] +enum TimeAnchorPrecision { + Time, + Day, + Week, + Month, + Year, +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct TimeAnchorOutput { + insertions: Vec, +} + +#[derive(Debug, Clone, Deserialize, JsonSchema)] +struct TimeAnchorInsertion { + line_index: u32, + exact_text: String, + anchor_text: String, + precision: TimeAnchorPrecision, +} + +const EPISODE_TITLE_SYSTEM_PROMPT: &str = r" +You are naming one conversation segment for episodic memory retrieval. +Return only JSON with `title`. + +Requirements: +1. The title must be concise, descriptive, and easy to search. +2. Keep it within 10-20 words and name the main topic, activity, or event. +3. Preserve names, places, products, and distinctive wording when they help retrieval. +4. Do not invent facts or generalize away the concrete topic. +"; + +const EPISODE_TIME_ANCHOR_SYSTEM_PROMPT: &str = r" +You are adding grounded time anchors to existing conversation lines. +Return only JSON with `insertions`. + +Each insertion must contain: +- `line_index`: the exact line index shown in the input +- `exact_text`: copy the original time phrase exactly as it appears in the line content +- `anchor_text`: only the grounded parenthetical text to append, without parentheses +- `precision`: one of `time`, `day`, `week`, `month`, or `year` + +Rules: +1. Do not rewrite, summarize, reorder, or delete any line. +2. Only add anchors for time expressions that already exist in the content. +3. Use `spoken_at` as the reference point for resolving relative time expressions. +4. If a line contains multiple time expressions that can be grounded with high confidence, return one insertion for each of them. +5. If a phrase cannot be resolved cleanly from `spoken_at`, omit the insertion entirely. +6. `exact_text` must copy the full original time phrase exactly as it appears in the line. +7. `anchor_text` must add grounded calendar information. Do not repeat, paraphrase, abstract, or generalize the original phrase. +8. The anchor precision must never be more specific than the original phrase supports. +9. Preserve the original phrase in the line and resolve it inline after that phrase. +10. Do not modify non-time text. +11. Do not include parentheses inside `anchor_text`. +"; pub async fn process_event_segmentation( job: EventSegmentationJob, - db: Data, - segmentation_storage: Data>, - episode_creation_storage: Data>, + db: Data, review_storage: Data>, + semantic_storage: Data>, ) -> Result<(), AppError> { let db = &*db; - let ctx = SegmentationContext::new( - db, - &*segmentation_storage, - &*episode_creation_storage, - &*review_storage, - ); - let claim = job.to_claim(); + let conversation_id = job.conversation_id; + let fence_count = usize::try_from(job.fence_count).unwrap_or(0); - let Some(eof_identified) = validate_claim_or_recover(&claim, &ctx).await? else { + let current_messages = MessageQueue::get(conversation_id, db).await?.messages; + if current_messages.len() < fence_count { + tracing::debug!( + conversation_id = %conversation_id, + fence_count, + actual = current_messages.len(), + "Stale event segmentation job; clearing fence" + ); + MessageQueue::finalize_job(conversation_id, None, db).await?; return Ok(()); - }; + } - let claimed_messages = get_claim_messages(&claim, ctx.db).await?; - let result = process_claimed_segment_range(&claim, eof_identified, &claimed_messages, &ctx).await; - - if let Err(err) = result { - if let Err(abort_err) = abort_segmentation_job(&claim, ctx.db).await { - tracing::error!( - conversation_id = %claim.conversation_id, - active_segment_start_seq = claim.active_segment_start_seq, - active_segment_end_seq = claim.active_segment_end_seq, - error = %abort_err, - "Failed to abort segmentation job after processing error" - ); - return Err(err); - } + let force_due_to_backlog = current_messages.len() >= FORCE_SINGLE_SEGMENT_QUEUE_LEN; + let should_force_single_segment = job.force_process || force_due_to_backlog; + let batch_messages = ¤t_messages[..fence_count]; + let segments = batch_segment(batch_messages).await?; - match try_claim_and_enqueue_segmentation_job( - claim.conversation_id, - "processing_error_recovery", - ctx.db, - ctx.segmentation_storage, - ) - .await - { - Ok(true) => { - tracing::warn!( - conversation_id = %claim.conversation_id, - active_segment_start_seq = claim.active_segment_start_seq, - active_segment_end_seq = claim.active_segment_end_seq, - error = %err, - "Segmentation job failed; aborted claim and enqueued a fresh retry" - ); - return Ok(()); - } - Ok(false) => { - tracing::warn!( - conversation_id = %claim.conversation_id, - active_segment_start_seq = claim.active_segment_start_seq, - active_segment_end_seq = claim.active_segment_end_seq, - error = %err, - "Segmentation job failed; aborted claim but no fresh retry was enqueued" - ); - } - Err(recovery_err) => { - tracing::error!( - conversation_id = %claim.conversation_id, - active_segment_start_seq = claim.active_segment_start_seq, - active_segment_end_seq = claim.active_segment_end_seq, - processing_error = %err, - recovery_error = %recovery_err, - "Segmentation job failed and recovery re-trigger also failed" - ); - } - } - return Err(err); + if segments.len() == 1 && !should_force_single_segment { + tracing::info!(conversation_id = %conversation_id, "No split detected; deferring for more messages"); + MessageQueue::clear_fence(conversation_id, db).await?; + return Ok(()); } - Ok(()) -} + let (drain_segments, new_prev_content): (&[BatchSegment], Option) = if segments.len() == 1 + { + tracing::info!( + conversation_id = %conversation_id, + messages = fence_count, + force_process = job.force_process, + force_due_to_backlog, + queue_len = current_messages.len(), + "Force processing as single episode" + ); + (&segments[..], None) + } else if job.keep_tail_segment { + let to_drain = &segments[..segments.len() - 1]; + let last_content = to_drain.last().map(|segment| segment.content.clone()); + tracing::info!( + conversation_id = %conversation_id, + total_segments = segments.len(), + draining = to_drain.len(), + keep_tail_segment = job.keep_tail_segment, + "Event segmentation complete" + ); + (to_drain, last_content) + } else { + tracing::info!( + conversation_id = %conversation_id, + total_segments = segments.len(), + draining = segments.len(), + keep_tail_segment = job.keep_tail_segment, + "Event segmentation complete" + ); + (&segments[..], None) + }; -// ────────────────────────────────────────────────── -// Main flow -// ────────────────────────────────────────────────── - -// Core recovers stale leases before creating a fresh claim. This worker-side -// check guards already-enqueued jobs against races, retries, and duplicate -// deliveries after the active lease has moved on. -async fn validate_claim_or_recover( - claim: &SegmentationJobClaim, - ctx: &SegmentationContext<'_>, -) -> Result, AppError> { - let state = get_segmentation_state(claim.conversation_id, ctx.db).await?; - match state.job_state { - SegmentJobState::Active { - active_segment_start_seq, - active_segment_end_seq, - .. - } if active_segment_start_seq == claim.active_segment_start_seq - && active_segment_end_seq == claim.active_segment_end_seq => - { - Ok(Some(state.eof_identified)) - } - _ => { - let re_enqueued = try_claim_and_enqueue_segmentation_job( - claim.conversation_id, - "stale_job_recovery", - ctx.db, - ctx.segmentation_storage, - ) - .await?; - tracing::debug!( - conversation_id = %claim.conversation_id, - active_segment_start_seq = claim.active_segment_start_seq, - active_segment_end_seq = claim.active_segment_end_seq, - re_enqueued, - "Skipping stale segmentation job" - ); - Ok(None) - } - } -} + let drain_count = drain_segments + .iter() + .map(|segment| segment.messages.len()) + .sum::(); -async fn process_claimed_segment_range( - claim: &SegmentationJobClaim, - eof_identified: bool, - claimed_messages: &[plastmem_core::ConversationMessage], - ctx: &SegmentationContext<'_>, -) -> Result<(), AppError> { - if claimed_messages.is_empty() { - return Err(AppError::new(anyhow::anyhow!( - "Segmentation claim has no messages" - ))); - } + enqueue_pending_reviews(conversation_id, batch_messages, db, &review_storage).await?; - let rule_output = temporal_rule_segmenter(claimed_messages) - .map_err(|reason| AppError::new(anyhow::anyhow!(reason)))?; - let (reviewed_segments, reviewed_boundaries) = - primitive_review_llm_segmenter(claimed_messages, &rule_output).await?; - let final_segments = temporal_boundary_review_llm_segmenter( - claimed_messages, - &reviewed_segments, - &reviewed_boundaries, + let prepared_episodes = prepare_episodes_batch(conversation_id, drain_segments).await?; + let episodes = persist_episodes_batch( + conversation_id, + drain_count, + new_prev_content, + &prepared_episodes, + db, ) .await?; - let commit_plan = build_commit_plan( - &final_segments, - eof_identified, - claim.active_segment_end_seq, - ) - .map_err(|reason| AppError::new(anyhow::anyhow!(reason)))?; + enqueue_predict_calibrate_jobs(conversation_id, &episodes, &semantic_storage).await?; - let created_at = Utc::now(); - let finalized_spans: Vec = commit_plan - .finalized_segments - .iter() - .map(|segment| EpisodeSpan { - conversation_id: claim.conversation_id, - start_seq: segment.start_seq, - end_seq: segment.end_seq, - classification: map_classification(segment.classification.clone()), - created_at, - }) - .collect(); + Ok(()) +} + +async fn batch_segment(messages: &[Message]) -> Result, AppError> { + let events = messages.iter().map(message_to_event).collect::>(); + let event_segments = EventSegmenter::segment(&events).await?; + let mut segments = event_segments_to_batch_segments(messages, &event_segments)?; - commit_segmentation_job( - claim, - &finalized_spans, - commit_plan.next_segment_start_seq, - ctx.db, + let generated_entries = try_join_all( + segments + .iter() + .map(|segment| generate_episode_artifacts(&segment.messages)), ) .await?; - enqueue_episode_creation_jobs(&finalized_spans, ctx.episode_creation_storage).await?; + for (segment, (title, content)) in segments.iter_mut().zip(generated_entries) { + segment.title = title; + segment.content = content; + } - if !finalized_spans.is_empty() { - enqueue_pending_reviews( - claim.conversation_id, - &extract_review_context(claimed_messages), - ctx.db, - ctx.review_storage, - ) - .await?; + Ok(segments) +} + +fn event_segments_to_batch_segments( + messages: &[Message], + event_segments: &[EventSegment], +) -> Result, AppError> { + let mut offset = 0usize; + let mut segments = Vec::with_capacity(event_segments.len()); + + for event_segment in event_segments { + let len = event_segment.events.len(); + if len == 0 { + continue; + } + if offset + len > messages.len() { + return Err(AppError::new(anyhow::anyhow!( + "Event segmenter returned segments longer than source messages" + ))); + } + + segments.push(BatchSegment { + messages: messages[offset..offset + len].to_vec(), + title: String::new(), + content: String::new(), + surprise_signal: surprise_signal(event_segment), + }); + offset += len; } - if commit_plan.next_segment_start_seq > claim.active_segment_start_seq { - try_claim_and_enqueue_segmentation_job( - claim.conversation_id, - "commit_follow_up", - ctx.db, - ctx.segmentation_storage, - ) - .await?; + if offset != messages.len() { + return Err(AppError::new(anyhow::anyhow!( + "Event segmenter did not cover all source messages" + ))); } - Ok(()) + Ok(segments) } -fn build_commit_plan( - segments: &[ReviewedSegment], - eof_identified: bool, - claimed_end_seq: i64, -) -> Result { - if segments.is_empty() { - return Err("Cannot build commit plan from empty segment list".to_owned()); - } +fn message_to_event(message: &Message) -> Event { + let role = match message.role.0.to_ascii_lowercase().as_str() { + "user" => MessageEventRole::User, + "assistant" => MessageEventRole::Assistant, + _ => MessageEventRole::Custom(message.role.0.clone()), + }; - if eof_identified { - return Ok(CommitPlan { - finalized_segments: segments.to_vec(), - next_segment_start_seq: claimed_end_seq + 1, - }); + Event::new( + EventData::Message(MessageEventData { + role, + content: message.content.clone(), + }), + message.timestamp, + None, + ) +} + +fn surprise_signal(segment: &EventSegment) -> f32 { + match segment.reason { + EventSegmentReason::InitialSegment => 0.2, + EventSegmentReason::HardTimeGap => 0.9, + _ => segment + .score + .max(segment.boundary_before_confidence) + .max(segment.boundary_after_confidence) + .clamp(0.5, 0.8), } +} - if segments.len() == 1 { - return Ok(CommitPlan { - finalized_segments: Vec::new(), - next_segment_start_seq: segments[0].start_seq, - }); +async fn prepare_episode( + conversation_id: Uuid, + messages: &[Message], + title: &str, + content: &str, + surprise_signal: f32, +) -> Result, AppError> { + if content.is_empty() { + tracing::warn!(conversation_id = %conversation_id, "Skipping episode creation: empty content"); + return Ok(None); } - let carry_over = segments - .last() - .ok_or_else(|| "Missing carry-over segment".to_owned())?; - Ok(CommitPlan { - finalized_segments: segments[..segments.len() - 1].to_vec(), - next_segment_start_seq: carry_over.start_seq, - }) + let surprise = surprise_signal.clamp(0.0, 1.0); + let embedding_input = if title.is_empty() { + content.to_owned() + } else { + format!("{title}. {content}") + }; + let embedding = embed(&embedding_input).await?; + + let fsrs = FSRS::new(Some(&DEFAULT_PARAMETERS))?; + let initial_states = fsrs.next_states(None, DESIRED_RETENTION, 0)?; + let initial_state = initial_states.good.memory; + let boosted_stability = initial_state.stability * (1.0 + surprise * SURPRISE_BOOST_FACTOR); + let now = Utc::now(); + let start_at = messages.first().map_or(now, |message| message.timestamp); + let end_at = messages.last().map_or(now, |message| message.timestamp); + + Ok(Some(PreparedEpisode { + memory: EpisodicMemory { + id: Uuid::now_v7(), + conversation_id, + messages: messages.to_vec(), + title: title.to_owned(), + content: content.to_owned(), + classification: None, + embedding, + stability: boosted_stability, + difficulty: initial_state.difficulty, + surprise, + start_at, + end_at, + created_at: now, + last_reviewed_at: now, + consolidated_at: None, + }, + surprise, + })) } -// ────────────────────────────────────────────────── -// Utilities -// ────────────────────────────────────────────────── +async fn prepare_episodes_batch( + conversation_id: Uuid, + segments: &[BatchSegment], +) -> Result, AppError> { + let futures = segments.iter().map(|segment| { + prepare_episode( + conversation_id, + &segment.messages, + &segment.title, + &segment.content, + segment.surprise_signal, + ) + }); -fn extract_review_context(messages: &[plastmem_core::ConversationMessage]) -> Vec { - messages - .iter() - .map(plastmem_core::ConversationMessage::to_message) - .collect() + Ok(try_join_all(futures).await?.into_iter().flatten().collect()) } -fn map_classification(classification: SegmentClassification) -> EpisodeClassification { - match classification { - SegmentClassification::LowInfo => EpisodeClassification::LowInfo, - SegmentClassification::Informative => EpisodeClassification::Informative, +async fn persist_episodes_batch( + conversation_id: Uuid, + drain_count: usize, + prev_episode_content: Option, + episodes: &[PreparedEpisode], + db: &DatabaseConnection, +) -> Result, AppError> { + let txn = db.begin().await?; + + let active_models = episodes + .iter() + .map(|episode| { + let model = episode.memory.to_model()?; + Ok::<_, AppError>(model.into()) + }) + .collect::, _>>()?; + + if !active_models.is_empty() { + episodic_memory::Entity::insert_many(active_models) + .exec(&txn) + .await?; } -} -// ────────────────────────────────────────────────── -// Side effects -// ────────────────────────────────────────────────── + MessageQueue::drain(conversation_id, drain_count, &txn).await?; + MessageQueue::finalize_job(conversation_id, prev_episode_content, &txn).await?; + txn.commit().await?; + + Ok( + episodes + .iter() + .map(|episode| { + tracing::info!( + episode_id = %episode.memory.id, + conversation_id = %conversation_id, + title = %episode.memory.title, + messages = episode.memory.messages.len(), + surprise = episode.surprise, + "Episode created" + ); + + CreatedEpisode { + id: episode.memory.id, + surprise: episode.surprise, + } + }) + .collect(), + ) +} async fn enqueue_pending_reviews( conversation_id: Uuid, context_messages: &[Message], - db: &sea_orm::DatabaseConnection, + db: &DatabaseConnection, review_storage: &PostgresStorage, ) -> Result<(), AppError> { if !APP_ENV.enable_fsrs_review { return Ok(()); } - if let Some(pending_reviews) = take_pending_review_items(conversation_id, db).await? { + if let Some(pending_reviews) = MessageQueue::take_pending_reviews(conversation_id, db).await? { let review_job = MemoryReviewJob { pending_reviews, context_messages: context_messages.to_vec(), @@ -345,55 +444,346 @@ async fn enqueue_pending_reviews( let mut storage = review_storage.clone(); storage.push(review_job).await?; } - Ok(()) } -// When the current worker attempt is about to end, ask core to re-check -// whether the conversation now exposes a fresh claimable segment range. -// If there is more unsegmented work after commit, stale-job recovery, or -// processing-error abort, enqueue one follow-up segmentation job to continue. -async fn try_claim_and_enqueue_segmentation_job( +async fn enqueue_predict_calibrate_jobs( conversation_id: Uuid, - trigger_context: &'static str, - db: &sea_orm::DatabaseConnection, - segmentation_storage: &PostgresStorage, -) -> Result { - if let Some(claim) = try_claim_segmentation_job(conversation_id, db).await? { - let active_segment_start_seq = claim.active_segment_start_seq; - let active_segment_end_seq = claim.active_segment_end_seq; - let mut storage = segmentation_storage.clone(); - storage - .push(EventSegmentationJob::from_claim(claim)) - .await?; - tracing::info!( - conversation_id = %conversation_id, - active_segment_start_seq, - active_segment_end_seq, - trigger_context, - "Enqueued segmentation job" + episodes: &[CreatedEpisode], + semantic_storage: &PostgresStorage, +) -> Result<(), AppError> { + if episodes.is_empty() { + return Ok(()); + } + + let futures = episodes.iter().map(|episode| { + let mut storage = semantic_storage.clone(); + let job = PredictCalibrateJob { + conversation_id, + episode_id: episode.id, + force: episode.surprise >= FLASHBULB_SURPRISE_THRESHOLD, + }; + async move { storage.push(job).await } + }); + + let results: Result, _> = join_all(futures).await.into_iter().collect(); + results?; + + tracing::info!( + conversation_id = %conversation_id, + created_jobs = episodes.len(), + "Enqueued predict-calibrate jobs for new episodes" + ); + + Ok(()) +} + +async fn generate_episode_artifacts(messages: &[Message]) -> Result<(String, String), AppError> { + let mut lines = render_episode_lines(messages); + try_anchor_episode_lines(&mut lines).await; + let content = render_episode_content(&lines); + let title = generate_episode_title(messages, &content).await?; + Ok((title, content)) +} + +async fn generate_episode_title(messages: &[Message], content: &str) -> Result { + let system = ChatCompletionRequestSystemMessage::from(EPISODE_TITLE_SYSTEM_PROMPT.trim()); + let user = ChatCompletionRequestUserMessage::from(format!( + "Episode content:\n{}\n\nSource messages:\n{}", + content, + format_messages(messages) + )); + + let output = generate_object::( + vec![ + ChatCompletionRequestMessage::System(system), + ChatCompletionRequestMessage::User(user), + ], + "episodic_title_generation".to_owned(), + Some("Generate an episodic memory title".to_owned()), + ) + .await?; + + let title = output.title.trim(); + Ok(if title.is_empty() { + "Conversation Segment".to_owned() + } else { + title.to_owned() + }) +} + +async fn try_anchor_episode_lines(lines: &mut [RenderedEpisodeLine]) { + let candidates = build_time_anchor_candidates(lines); + if candidates.is_empty() { + return; + } + + let output = match request_time_anchor_insertions(&candidates).await { + Ok(output) => output, + Err(err) => { + tracing::warn!(error = %err, "Episode time anchoring failed; using deterministic content"); + return; + } + }; + + for candidate in &candidates { + let Some(line) = lines.get_mut(candidate.line_index) else { + continue; + }; + let mut insertions = output + .insertions + .iter() + .filter(|insertion| usize::try_from(insertion.line_index).ok() == Some(candidate.line_index)) + .filter(|insertion| is_valid_time_anchor_insertion(insertion, candidate)) + .cloned() + .collect::>(); + insertions.sort_by(|left, right| right.exact_text.len().cmp(&left.exact_text.len())); + + for insertion in insertions { + let _ = apply_insertion( + &mut line.content, + &insertion.exact_text, + insertion.anchor_text.trim(), + ); + } + } +} + +async fn request_time_anchor_insertions( + candidates: &[TimeAnchorCandidateLine], +) -> Result { + let system = ChatCompletionRequestSystemMessage::from(EPISODE_TIME_ANCHOR_SYSTEM_PROMPT.trim()); + let user = ChatCompletionRequestUserMessage::from(build_time_anchor_user_content(candidates)); + + generate_object::( + vec![ + ChatCompletionRequestMessage::System(system), + ChatCompletionRequestMessage::User(user), + ], + "episodic_time_anchoring".to_owned(), + Some("Add grounded time anchors to existing conversation lines".to_owned()), + ) + .await +} + +fn render_episode_lines(messages: &[Message]) -> Vec { + messages + .iter() + .enumerate() + .map(|(line_index, message)| RenderedEpisodeLine { + line_index, + timestamp: message.timestamp, + role: message.role.to_string(), + content: collapse_inline_whitespace(&message.content), + }) + .collect() +} + +fn render_episode_content(lines: &[RenderedEpisodeLine]) -> String { + let mut out = String::new(); + let mut current_bucket: Option<(i32, u32, u32, u32)> = None; + + for line in lines { + let bucket = ( + line.timestamp.year(), + line.timestamp.month(), + line.timestamp.day(), + line.timestamp.hour(), ); - return Ok(true); + if current_bucket != Some(bucket) { + if !out.is_empty() { + out.push_str("\n\n"); + } + let _ = write!(out, "{}", format_at_header(line.timestamp)); + current_bucket = Some(bucket); + out.push('\n'); + } else { + out.push('\n'); + } + + let _ = write!(out, "{}: {}", line.role, line.content); } - Ok(false) + out.trim_end().to_owned() } -async fn enqueue_episode_creation_jobs( - finalized_spans: &[EpisodeSpan], - episode_creation_storage: &PostgresStorage, -) -> Result<(), AppError> { - if finalized_spans.is_empty() { - return Ok(()); +fn build_time_anchor_candidates(lines: &[RenderedEpisodeLine]) -> Vec { + lines + .iter() + .map(|line| TimeAnchorCandidateLine { + line_index: line.line_index, + timestamp: line.timestamp, + role: line.role.clone(), + content: line.content.clone(), + }) + .collect() +} + +fn build_time_anchor_user_content(candidates: &[TimeAnchorCandidateLine]) -> String { + let mut out = String::from( + "Candidate lines for optional time anchoring.\nUse the provided `spoken_at` timestamp as reference when resolving relative time phrases.\n", + ); + + for candidate in candidates { + let _ = writeln!(out, "\nline_index={}", candidate.line_index); + let _ = writeln!( + out, + "spoken_at={}", + candidate.timestamp.format("%Y-%m-%dT%H:%M:%SZ") + ); + let _ = writeln!(out, "role={}", candidate.role); + let _ = writeln!(out, "content={}", candidate.content); } - let jobs = finalized_spans + out +} + +fn format_messages(messages: &[Message]) -> String { + messages .iter() - .map(EpisodeCreationJob::from_span) - .collect::>(); + .enumerate() + .map(|(index, message)| { + format!( + "Message {} [{}] {}: {}", + index + 1, + message.timestamp.format("%Y-%m-%dT%H:%M:%SZ"), + message.role, + message.content + ) + }) + .collect::>() + .join("\n") +} - let mut storage = episode_creation_storage.clone(); - storage.push_bulk(jobs).await?; +fn collapse_inline_whitespace(text: &str) -> String { + text.split_whitespace().collect::>().join(" ") +} - Ok(()) +fn month_abbrev(month: u32) -> &'static str { + match month { + 1 => "Jan", + 2 => "Feb", + 3 => "Mar", + 4 => "Apr", + 5 => "May", + 6 => "Jun", + 7 => "Jul", + 8 => "Aug", + 9 => "Sep", + 10 => "Oct", + 11 => "Nov", + 12 => "Dec", + _ => "Unknown", + } +} + +fn format_at_header(timestamp: DateTime) -> String { + let hour = timestamp.hour(); + let hour_12 = match hour % 12 { + 0 => 12, + value => value, + }; + let meridiem = if hour < 12 { "AM" } else { "PM" }; + format!( + "Spoken At: {} {}, {} {} {}", + month_abbrev(timestamp.month()), + timestamp.day(), + timestamp.year(), + hour_12, + meridiem + ) +} + +fn insertion_already_applied(content: &str, exact_text: &str) -> bool { + content.contains(&format!("{exact_text} (")) +} + +fn normalize_anchor_text(text: &str) -> String { + text + .split_whitespace() + .collect::>() + .join(" ") + .to_ascii_lowercase() +} + +fn looks_like_grounded_calendar_info(text: &str) -> bool { + text.chars().any(|c| c.is_ascii_digit()) +} + +fn apply_insertion(content: &mut String, exact_text: &str, anchor_text: &str) -> bool { + if insertion_already_applied(content, exact_text) { + return false; + } + + let matches = content.match_indices(exact_text).collect::>(); + if matches.len() != 1 { + return false; + } + + let insert_at = matches[0].0 + exact_text.len(); + content.insert_str(insert_at, &format!(" ({anchor_text})")); + true +} + +fn is_valid_time_anchor_insertion( + insertion: &TimeAnchorInsertion, + candidate: &TimeAnchorCandidateLine, +) -> bool { + let _ = insertion.precision; + if insertion.exact_text.trim().is_empty() || insertion.anchor_text.trim().is_empty() { + return false; + } + if insertion.anchor_text.contains('(') || insertion.anchor_text.contains(')') { + return false; + } + if !candidate.content.contains(&insertion.exact_text) { + return false; + } + if normalize_anchor_text(&insertion.exact_text) == normalize_anchor_text(&insertion.anchor_text) { + return false; + } + if !looks_like_grounded_calendar_info(&insertion.anchor_text) { + return false; + } + true +} + +#[cfg(test)] +mod tests { + use chrono::TimeZone; + use plastmem_shared::MessageRole; + + use super::*; + + fn make_messages(count: usize) -> Vec { + (0..count) + .map(|index| Message { + role: MessageRole::from("user"), + content: format!("message {index}"), + timestamp: Utc.timestamp_opt(index as i64, 0).unwrap(), + }) + .collect() + } + + #[test] + fn maps_event_segments_to_contiguous_messages() { + let messages = make_messages(5); + let events = messages.iter().map(message_to_event).collect::>(); + let segments = vec![ + EventSegment::new(events[..2].to_vec(), EventSegmentReason::InitialSegment), + EventSegment::new(events[2..].to_vec(), EventSegmentReason::TopicShift), + ]; + + let batch_segments = event_segments_to_batch_segments(&messages, &segments).unwrap(); + + assert_eq!(batch_segments.len(), 2); + assert_eq!(batch_segments[0].messages.len(), 2); + assert_eq!(batch_segments[1].messages.len(), 3); + } + + #[test] + fn hard_time_gap_maps_to_high_surprise() { + let segment = EventSegment::new(Vec::new(), EventSegmentReason::HardTimeGap); + assert_eq!(surprise_signal(&segment), 0.9); + } } diff --git a/crates/worker/src/jobs/mod.rs b/crates/worker/src/jobs/mod.rs index 0b973fa..127786e 100644 --- a/crates/worker/src/jobs/mod.rs +++ b/crates/worker/src/jobs/mod.rs @@ -1,9 +1,6 @@ mod event_segmentation; pub use event_segmentation::*; -mod episode_creation; -pub use episode_creation::*; - mod memory_review; pub use memory_review::*; diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 987007f..3fc103e 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -10,19 +10,16 @@ use plastmem_shared::AppError; use sea_orm::DatabaseConnection; pub mod jobs; -pub use jobs::EpisodeCreationJob; pub use jobs::EventSegmentationJob; pub use jobs::MemoryReviewJob; pub use jobs::PredictCalibrateJob; use jobs::{ - WorkerError, process_episode_creation, process_event_segmentation, process_memory_review, - process_predict_calibrate, + WorkerError, process_event_segmentation, process_memory_review, process_predict_calibrate, }; pub async fn worker( db: &DatabaseConnection, segmentation_backend: PostgresStorage, - episode_creation_backend: PostgresStorage, review_backend: PostgresStorage, semantic_backend: PostgresStorage, ) -> Result<(), AppError> { @@ -32,49 +29,25 @@ pub async fn worker( .register({ let db = db.clone(); let segmentation_backend = segmentation_backend.clone(); - let episode_creation_backend = episode_creation_backend.clone(); let review_backend = review_backend.clone(); + let semantic_backend = semantic_backend.clone(); move |_run_id| { WorkerBuilder::new("event-segmentation") .backend(segmentation_backend.clone()) .concurrency(1) .enable_tracing() .data(db.clone()) - .data(segmentation_backend.clone()) - .data(episode_creation_backend.clone()) .data(review_backend.clone()) + .data(semantic_backend.clone()) .build( - move |job, data, segmentation_storage, episode_creation_storage, review_storage| async move { - process_event_segmentation( - job, - data, - segmentation_storage, - episode_creation_storage, - review_storage, - ) - .await - .map_err(WorkerError::from) + move |job, data, review_storage, semantic_storage| async move { + process_event_segmentation(job, data, review_storage, semantic_storage) + .await + .map_err(WorkerError::from) }, ) } }) - .register({ - let db = db.clone(); - let semantic_backend = semantic_backend.clone(); - move |_run_id| { - WorkerBuilder::new("episode-creation") - .backend(episode_creation_backend.clone()) - .concurrency(APP_ENV.predict_calibrate_concurrency) - .enable_tracing() - .data(db.clone()) - .data(semantic_backend.clone()) - .build(move |job, data, predict_storage| async move { - process_episode_creation(job, data, predict_storage) - .await - .map_err(WorkerError::from) - }) - } - }) .register({ let db = db.clone(); move |_run_id| { diff --git a/eslint.config.js b/eslint.config.js index bf92fd5..c7451cc 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -5,6 +5,7 @@ export default defineConfig({ react: true, }).append({ rules: { + 'sonarjs/publicly-writable-directories': 'off', 'toml/padding-line-between-pairs': 'off', }, }).append({ diff --git a/skills/locomo-segmentation-debugger/SKILL.md b/skills/locomo-segmentation-debugger/SKILL.md new file mode 100644 index 0000000..09a2ca5 --- /dev/null +++ b/skills/locomo-segmentation-debugger/SKILL.md @@ -0,0 +1,48 @@ +--- +name: locomo-segmentation-debugger +description: Run and inspect the LoCoMo event segmentation debugger in `crates/event_segmentation/examples/locomo_segmenter.rs`. Use when debugging `EventSegmenter`, checking segment boundaries on LoCoMo samples, inspecting the emitted event-segment JSON array, slicing or rewriting derived JSON through shell pipes or a REPL, focusing on `conv-47`, or sweeping every sample to judge overall behavior instead of a single conversation. +--- + +# LoCoMo Segmentation Debugger + +Use `crates/event_segmentation/examples/locomo_segmenter.rs` as the primary debugger for segmentation work. It emits exactly one `Vec` JSON document on `stdout`; warnings and errors go to `stderr`. + +## Environment Note + +`locomo_segmenter` calls the embedding backend through `plastmem_ai::embed_many`. In Codex's sandbox, requests to the local provider (for example `http://localhost:11434`) may fail even when the service is healthy on the host. If the run errors at the embedding request step, re-run with escalated permissions instead of assuming the segmenter itself is broken. + +## Workflow + +1. Start with a targeted run on `conv-47` when iterating on segmentation logic. It is a strong long-context sample and is usually worth checking first. +2. If the question is about event flattening rather than boundary decisions, add `--print-events` and inspect `stderr`. +3. If the question is about JSON shape or a suspicious segment, save `stdout` to a file and inspect or transform it with the commands in [references/commands.md](references/commands.md). +4. If a change looks good on `conv-47`, run every sample before concluding anything about quality. Single-sample wins are not enough. + +## Rules + +- Treat `stdout` as machine-readable output only. Do not parse diagnostics from it. +- Treat `stderr` as human diagnostics only. Expect sample metadata, flattened events, warnings, and errors there. +- If embedding requests fail inside the sandbox, request escalation and retry before judging segmentation quality. +- Prefer `conv-47` for focused debugging. +- Prefer all samples for regressions, distribution shifts, or “is this actually better?” questions. +- Use pipe or REPL edits only on derived JSON files. The debugger itself reads LoCoMo input and produces fresh segment JSON; it does not consume edited segment JSON back in. + +## Quick Use + +Use these common entry points: + +```bash +cargo run -q -p plastmem_event_segmentation --example locomo_segmenter -- --sample-id conv-47 > /tmp/conv-47.segments.json +``` + +```bash +cargo run -q -p plastmem_event_segmentation --example locomo_segmenter -- --sample-id conv-47 --print-events > /tmp/conv-47.segments.json +``` + +Then load [references/commands.md](references/commands.md) for: + +- single-sample commands +- all-sample sweeps +- `node` pipe filters for partial JSON inspection +- `node` REPL snippets for ad hoc mutation or extraction +- pretty-print helpers when `jq` is unavailable diff --git a/skills/locomo-segmentation-debugger/agents/openai.yaml b/skills/locomo-segmentation-debugger/agents/openai.yaml new file mode 100644 index 0000000..180c39d --- /dev/null +++ b/skills/locomo-segmentation-debugger/agents/openai.yaml @@ -0,0 +1,4 @@ +interface: + display_name: LoCoMo Segmentation Debugger + short_description: Use the locomo_segmenter debugger and inspect segment JSON. + default_prompt: 'Use the LoCoMo event segmentation debugger, inspect JSON output, and compare conv-47 against broader sample runs when needed.' diff --git a/skills/locomo-segmentation-debugger/references/commands.md b/skills/locomo-segmentation-debugger/references/commands.md new file mode 100644 index 0000000..b893b6a --- /dev/null +++ b/skills/locomo-segmentation-debugger/references/commands.md @@ -0,0 +1,107 @@ +# Commands + +This project environment has `node` and `python3`. Do not assume `jq` is installed. + +`locomo_segmenter` is not a pure local parser. It calls the configured embedding API, and in Codex's sandbox that request may fail against a host-local provider such as `http://localhost:11434`. If you see embedding request errors, re-run the `cargo run ... locomo_segmenter ...` command with escalated permissions before diagnosing segmentation behavior. + +## Targeted Run + +Use `conv-47` first for segmentation iteration: + +```bash +cargo run -q -p plastmem_event_segmentation --example locomo_segmenter -- --sample-id conv-47 > /tmp/conv-47.segments.json +``` + +If you need to inspect flattened event construction as well: + +```bash +cargo run -q -p plastmem_event_segmentation --example locomo_segmenter -- --sample-id conv-47 --print-events > /tmp/conv-47.segments.json +``` + +`stdout` becomes `/tmp/conv-47.segments.json`. `stderr` still shows sample metadata and warnings. + +## Pretty Print + +Use Python for a quick pretty-print: + +```bash +python3 -m json.tool /tmp/conv-47.segments.json +``` + +## Pipe Inspection With Node + +Print the segment count: + +```bash +cat /tmp/conv-47.segments.json | node -e 'let s="";process.stdin.on("data",d=>s+=d);process.stdin.on("end",()=>{const x=JSON.parse(s);console.log(x.length)})' +``` + +Print the first segment: + +```bash +cat /tmp/conv-47.segments.json | node -e 'let s="";process.stdin.on("data",d=>s+=d);process.stdin.on("end",()=>{const x=JSON.parse(s);console.log(JSON.stringify(x[0],null,2))})' +``` + +Print only boundary reasons and event counts: + +```bash +cat /tmp/conv-47.segments.json | node -e 'let s="";process.stdin.on("data",d=>s+=d);process.stdin.on("end",()=>{const x=JSON.parse(s).map((seg,i)=>({index:i,reasons:seg.reasons,events:seg.events.length,score:seg.score}));console.log(JSON.stringify(x,null,2))})' +``` + +Print one event window from one segment: + +```bash +cat /tmp/conv-47.segments.json | node -e 'let s="";process.stdin.on("data",d=>s+=d);process.stdin.on("end",()=>{const x=JSON.parse(s);console.log(JSON.stringify(x[3].events.slice(0,3),null,2))})' +``` + +## REPL Inspection Or Mutation + +Use a `node` REPL when you want to poke at the JSON interactively: + +```bash +node +``` + +Then: + +```js +const fs = require('node:fs') + +const segments = JSON.parse(fs.readFileSync('/tmp/conv-47.segments.json', 'utf8')) + +segments.length +segments[0].reasons +segments[3].events.slice(0, 2) +``` + +Write a derived file after trimming or rewriting fields: + +```js +const trimmed = segments.map(({ events, reasons, score }) => ({ events: events.slice(0, 2), reasons, score })) +fs.writeFileSync('/tmp/conv-47.trimmed.json', JSON.stringify(trimmed, null, 2)) +``` + +Use this for analysis artifacts only. Do not treat edited JSON as input to the debugger. + +## All-Sample Sweep + +If the question is about overall quality or regressions, run every sample instead of only `conv-47`: + +```bash +mkdir -p /tmp/locomo-segments +node -e "const fs=require('node:fs');const data=JSON.parse(fs.readFileSync('benchmarks/locomo/data/locomo10.json','utf8'));for(const sample of data){console.log(sample.sample_id)}" | while read -r sample_id; do cargo run -q -p plastmem_event_segmentation --example locomo_segmenter -- --sample-id \"$sample_id\" > \"/tmp/locomo-segments/$sample_id.json\"; done +``` + +After that, inspect counts across all samples: + +```bash +node -e "const fs=require('node:fs');const path='\/tmp\/locomo-segments';for(const name of fs.readdirSync(path).filter(x=>x.endsWith('.json')).sort()){const segments=JSON.parse(fs.readFileSync(path+'/'+name,'utf8'));console.log(name.replace(/\\.json$/,''), segments.length)}" +``` + +## When To Choose What + +- Use `conv-47` for rapid iteration. +- Use `--print-events` when the problem may be in sample flattening rather than segmentation. +- Use saved JSON plus `node` pipe commands when you need one-off slices. +- Use the `node` REPL when the question is exploratory and you do not yet know which fields matter. +- Use the all-sample sweep when judging broad quality, regressions, or distribution changes. diff --git a/src/main.rs b/src/main.rs index b048eb0..2b29e3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,9 +4,7 @@ use apalis_postgres::PostgresStorage; use plastmem_migration::{Migrator, MigratorTrait}; use plastmem_server::server; use plastmem_shared::{APP_ENV, AppError}; -use plastmem_worker::{ - EpisodeCreationJob, EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob, worker, -}; +use plastmem_worker::{EventSegmentationJob, MemoryReviewJob, PredictCalibrateJob, worker}; use sea_orm::Database; use tracing_error::ErrorLayer; #[cfg(debug_assertions)] @@ -46,7 +44,6 @@ async fn main() -> Result<(), AppError> { let pool = db.get_postgres_connection_pool(); PostgresStorage::setup(pool).await?; let segment_job_storage = PostgresStorage::::new(pool); - let episode_creation_job_storage = PostgresStorage::::new(pool); let review_job_storage = PostgresStorage::::new(pool); let semantic_job_storage = PostgresStorage::::new(pool); @@ -54,14 +51,12 @@ async fn main() -> Result<(), AppError> { worker( &db, segment_job_storage.clone(), - episode_creation_job_storage.clone(), review_job_storage.clone(), semantic_job_storage.clone() ), server( db.clone(), segment_job_storage, - episode_creation_job_storage, review_job_storage, semantic_job_storage, #[cfg(debug_assertions)]