refactor(worker): use event segmenter instead of queue#67
refactor(worker): use event segmenter instead of queue#67
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a significant refactor of the message processing and event segmentation pipeline. It consolidates message queuing, pending review management, and episode creation into a new MessageQueue and a revised EventSegmentationJob worker, removing the older, more granular job types. The changes update API endpoints and worker configurations to align with the new architecture. Key feedback includes a high-severity concern about potential data loss for pending reviews if enqueuing fails, and medium-severity issues regarding the loss or underutilization of prev_episode_content during segmentation, which could impact context continuity. A compatibility warning was also noted for a PostgreSQL 14-specific jsonpath syntax.
| "Segmentation claim has no messages" | ||
| ))); | ||
| } | ||
| enqueue_pending_reviews(conversation_id, batch_messages, db, &review_storage).await?; |
There was a problem hiding this comment.
enqueue_pending_reviews calls take_pending_reviews, which clears reviews from the database in its own transaction. If the subsequent review_storage.push fails, these reviews are lost from the database and never enqueued in Apalis. Consider clearing the reviews from the database only after successful enqueuing, or ideally, within the same transaction as the episode persistence.
| actual = current_messages.len(), | ||
| "Stale event segmentation job; clearing fence" | ||
| ); | ||
| MessageQueue::finalize_job(conversation_id, None, db).await?; |
There was a problem hiding this comment.
Using finalize_job with None here will overwrite any existing prev_episode_content in the database. Since this is a stale job recovery case where no new content was generated, it's better to use clear_fence to preserve the existing context.
| MessageQueue::finalize_job(conversation_id, None, db).await?; | |
| MessageQueue::clear_fence(conversation_id, db).await?; |
| queue_len = current_messages.len(), | ||
| "Force processing as single episode" | ||
| ); | ||
| (&segments[..], None) |
There was a problem hiding this comment.
When forcing a single segment, new_prev_content is set to None, which causes the context for the next segmentation batch to be lost. It should be set to the content of the segment just processed to maintain continuity.
| (&segments[..], None) | |
| (&segments[..], segments.last().map(|s| s.content.clone())) |
| keep_tail_segment = job.keep_tail_segment, | ||
| "Event segmentation complete" | ||
| ); | ||
| (&segments[..], None) |
There was a problem hiding this comment.
|
|
||
| async fn batch_segment(messages: &[Message]) -> Result<Vec<BatchSegment>, AppError> { | ||
| let events = messages.iter().map(message_to_event).collect::<Vec<_>>(); | ||
| let event_segments = EventSegmenter::segment(&events).await?; |
| C: ConnectionTrait, | ||
| { | ||
| let sql = format!( | ||
| "UPDATE message_queue SET messages = jsonb_path_query_array(messages, '$[{count} to last]'::jsonpath) WHERE id = $1" |
There was a problem hiding this comment.
Before
plast-mem
Overall
Samples
Categories
full-context
Overall
Samples
Categories
delta
Overall Delta
Category Delta
After
plast-mem
Overall
Samples
Categories
full-context
Overall
Samples
Categories
delta
Overall Delta
Category Delta