Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/runtimed-py/src/async_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,11 @@ impl AsyncSession {
let blob_base_url = state_guard.blob_base_url.clone();
let blob_store_path = state_guard.blob_store_path.clone();

// Confirm the daemon has merged our latest changes before executing.
// The daemon reads cell source from its own Automerge doc, so it must
// have the cell before we can reference it by ID.
handle.confirm_sync().await.map_err(to_py_err)?;

// Execute cell (daemon reads source from automerge doc)
let response = handle
.send_request(NotebookRequest::ExecuteCell {
Expand Down Expand Up @@ -1281,6 +1286,9 @@ impl AsyncSession {
.as_ref()
.ok_or_else(|| to_py_err("Not connected"))?;

// Confirm the daemon has merged our latest changes before executing.
handle.confirm_sync().await.map_err(to_py_err)?;

// Queue the cell for execution
let response = handle
.send_request(NotebookRequest::ExecuteCell {
Expand Down Expand Up @@ -1436,6 +1444,9 @@ impl AsyncSession {
.as_ref()
.ok_or_else(|| to_py_err("Not connected"))?;

// Confirm the daemon has merged our latest changes before executing.
handle.confirm_sync().await.map_err(to_py_err)?;

let response = handle
.send_request(NotebookRequest::ExecuteCell {
cell_id: cell_id.clone(),
Expand Down Expand Up @@ -1502,6 +1513,9 @@ impl AsyncSession {
.as_ref()
.ok_or_else(|| to_py_err("Not connected"))?;

// Confirm the daemon has merged our latest changes before executing.
handle.confirm_sync().await.map_err(to_py_err)?;

// Queue cell execution (daemon reads source from automerge doc)
let response = handle
.send_request(NotebookRequest::ExecuteCell { cell_id })
Expand Down
11 changes: 11 additions & 0 deletions crates/runtimed-py/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,11 @@ impl Session {
let blob_base_url = state.blob_base_url.clone();
let blob_store_path = state.blob_store_path.clone();

// Confirm the daemon has merged our latest changes before executing.
// The daemon reads cell source from its own Automerge doc, so it must
// have the cell before we can reference it by ID.
handle.confirm_sync().await.map_err(to_py_err)?;

// Execute cell (daemon reads source from automerge doc)
let response = handle
.send_request(NotebookRequest::ExecuteCell {
Expand Down Expand Up @@ -1046,6 +1051,9 @@ impl Session {
.as_ref()
.ok_or_else(|| to_py_err("Not connected"))?;

// Confirm the daemon has merged our latest changes before executing.
handle.confirm_sync().await.map_err(to_py_err)?;

// Queue the cell for execution
let response = handle
.send_request(NotebookRequest::ExecuteCell {
Expand Down Expand Up @@ -1180,6 +1188,9 @@ impl Session {
.as_ref()
.ok_or_else(|| to_py_err("Not connected"))?;

// Confirm the daemon has merged our latest changes before executing.
handle.confirm_sync().await.map_err(to_py_err)?;

// Queue cell execution (daemon reads source from automerge doc)
let response = handle
.send_request(NotebookRequest::ExecuteCell {
Expand Down
85 changes: 85 additions & 0 deletions crates/runtimed/src/notebook_sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ enum SyncCommand {
message: Vec<u8>,
reply: oneshot::Sender<Result<(), NotebookSyncError>>,
},
/// Confirm that the daemon has merged all our local changes by checking
/// that `peer_state.shared_heads` includes our local heads.
ConfirmSync {
reply: oneshot::Sender<Result<(), NotebookSyncError>>,
},
}

/// Handle for sending commands to the notebook sync task.
Expand Down Expand Up @@ -465,6 +470,30 @@ impl NotebookSyncHandle {
.map_err(|_| NotebookSyncError::ChannelClosed)?
}

/// Best-effort confirmation that the daemon has merged our local changes.
///
/// Intended for full-peer programmatic clients (`runtimed-py`) where
/// `create_cell` → `execute_cell` can fire in microseconds. Not needed
/// for the Tauri pipe path — the WASM frontend owns its doc locally,
/// and human interaction provides natural sync latency.
///
/// Attempts up to 5 sync rounds, checking `peer_state.shared_heads`
/// after each. If confirmation does not arrive, degrades gracefully
/// and returns `Ok(())` because failing execution is worse than the
/// residual race — after 5 rounds the changes are almost certainly
/// applied, the heads just haven't fully converged (e.g. concurrent
/// edits from another peer).
pub async fn confirm_sync(&self) -> Result<(), NotebookSyncError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(SyncCommand::ConfirmSync { reply: reply_tx })
.await
.map_err(|_| NotebookSyncError::ChannelClosed)?;
reply_rx
.await
.map_err(|_| NotebookSyncError::ChannelClosed)?
}

/// Send a request to the daemon and wait for a response.
pub async fn send_request(
&self,
Expand Down Expand Up @@ -2098,6 +2127,58 @@ where
Ok(())
}

/// Best-effort confirmation that the daemon has merged our local
/// changes, by checking `peer_state.shared_heads` after sync.
///
/// **Scope:** Full-peer clients (`runtimed-py`) only. The Tauri pipe
/// relay keeps an empty local doc and forwards raw bytes — it has no
/// meaningful heads to confirm. The WASM frontend doesn't need this
/// because human interaction provides natural sync latency.
///
/// **Contract:** Attempts confirmation for a bounded number of rounds
/// (currently 5). If confirmation does not arrive, degrades gracefully
/// and returns `Ok(())`. Failing execution is worse than the residual
/// race — after multiple sync round-trips the changes are almost
/// certainly applied, the heads just haven't converged yet (e.g.
/// concurrent edits from another peer, or slow daemon under load).
///
/// **Why it exists:** In the document-first architecture, the daemon
/// reads cell source from its own Automerge doc when executing. If
/// `create_cell` → `execute_cell` fires faster than the sync
/// round-trip, the daemon won't find the cell. This method closes
/// that gap for programmatic callers.
async fn sync_to_daemon_confirmed(&mut self) -> Result<(), NotebookSyncError> {
// Do the initial sync round
self.sync_to_daemon().await?;

let our_heads = self.doc.get_heads();
if our_heads.is_empty() {
return Ok(()); // Empty doc, nothing to confirm
}
Comment on lines +2150 to +2157
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new confirmation behavior is core to correctness (prevents create_cellexecute_cell races), but it isn’t covered by tests in this file. Adding a unit/integration test that simulates multiple sync rounds and asserts the method blocks until the expected heads are present in peer_state.shared_heads would help prevent regressions.

Copilot uses AI. Check for mistakes.

// The sync protocol may need multiple rounds. Bound the retries
// so we don't loop forever if something is wrong.
for _ in 0..5 {
let shared = &self.peer_state.shared_heads;
Comment on lines +2159 to +2162
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The confirmation retry count (0..5) is a magic number here. Consider extracting it (and possibly a total confirmation timeout) into a named constant/config so it’s easier to reason about and tune alongside sync_to_daemon’s internal 500ms deadline.

Copilot uses AI. Check for mistakes.
if our_heads.iter().all(|h| shared.contains(h)) {
return Ok(()); // Daemon has confirmed all our changes
}
// Not yet confirmed — do another sync round
self.sync_to_daemon().await?;
}

Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop checks shared_heads before doing the next sync round. If confirmation only becomes true on the last sync_to_daemon() call inside the loop, it won’t be re-checked and the function will fall through to the "not fully confirmed" path. Consider restructuring so each retry performs a sync and then re-checks (or add a final check after the loop).

Suggested change
// Final check after the last sync attempt in case confirmation
// only became true on the final round.
let shared = &self.peer_state.shared_heads;
if our_heads.iter().all(|h| shared.contains(h)) {
return Ok(()); // Daemon has confirmed all our changes
}

Copilot uses AI. Check for mistakes.
// Best-effort: even if not fully confirmed after retries, the
// changes are very likely applied. Log and continue rather than
// failing the mutation — a hard error here would be worse than
// the original race.
log::debug!(
"[notebook-sync-client] sync_to_daemon_confirmed: heads not fully confirmed after retries (our_heads={}, shared_heads={})",
our_heads.len(),
self.peer_state.shared_heads.len()
);
Ok(())
Comment on lines +2170 to +2179
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync_to_daemon_confirmed returns Ok(()) even when the daemon never confirms our heads (after retries it only logs). That undermines the method’s stated goal and can still allow the create_cellexecute_cell race when confirmation doesn’t arrive in time. Consider returning an error (e.g., NotebookSyncError::Timeout/SyncError) or changing the API to return a boolean so callers can decide whether to proceed with execute requests when confirmation fails.

Suggested change
// Best-effort: even if not fully confirmed after retries, the
// changes are very likely applied. Log and continue rather than
// failing the mutation — a hard error here would be worse than
// the original race.
log::debug!(
"[notebook-sync-client] sync_to_daemon_confirmed: heads not fully confirmed after retries (our_heads={}, shared_heads={})",
our_heads.len(),
self.peer_state.shared_heads.len()
);
Ok(())
// We failed to observe the daemon confirming all of our heads within
// the bounded number of sync rounds. This means we cannot uphold the
// contract that this method only returns `Ok(())` once our changes are
// confirmed, so surface this as an error to the caller.
log::debug!(
"[notebook-sync-client] sync_to_daemon_confirmed: heads not fully confirmed after retries (our_heads={}, shared_heads={})",
our_heads.len(),
self.peer_state.shared_heads.len()
);
Err(NotebookSyncError::SyncError(
"sync_to_daemon_confirmed: heads not fully confirmed after retries".into(),
))

Copilot uses AI. Check for mistakes.
}

// ── Request/Response ───────────────────────────────────────────────

/// Send a request to the daemon and wait for the response.
Expand Down Expand Up @@ -2681,6 +2762,10 @@ async fn run_sync_task<S>(
publish_snapshot(&client, &snapshot_tx);
let _ = reply.send(result);
}
SyncCommand::ConfirmSync { reply } => {
let result = client.sync_to_daemon_confirmed().await;
let _ = reply.send(result);
}
SyncCommand::ReceiveFrontendSyncMessage { message, reply } => {
let result = if raw_sync_tx.is_some() {
// Pipe mode (Tauri): queue the sync bytes to be flushed
Expand Down
Loading