feat(runtimed): streaming notebook load with jiter, add_cell_full, and blob store outputs#672
feat(runtimed): streaming notebook load with jiter, add_cell_full, and blob store outputs#672
Conversation
Load notebooks progressively so users see cells appearing as they're parsed, rather than waiting for the entire notebook to load before seeing anything. Key changes: - add_cell_full on NotebookDoc: inserts a fully-populated cell in a single operation, reusing ObjIds from creation — eliminates the 3x O(n) find_cell_index scans that made sequential add_cell+update_source+set_outputs+set_execution_count calls O(n²) - streaming_load_cells: parses the notebook, adds cells in batches of 3, and sends Automerge sync messages after each batch so the frontend renders cells progressively - jiter for fast JSON parsing: zero-copy string references for cell metadata, only allocating when converting outputs to serde_json for the manifest pipeline - drain_incoming_frames: reads back sync replies between batch sends to prevent socket buffer deadlock - try_start_loading/finish_loading on NotebookRoom: atomic CAS prevents two connections from both attempting to load - clear_all_cells on NotebookDoc: cleanup after failed streaming load so the next connection can retry cleanly - Deferred load in daemon.rs: notebook load is no longer blocking in handle_open_notebook. Instead, the path is passed to the sync loop which streams cells after the handshake.
There was a problem hiding this comment.
Pull request overview
This PR adds progressive (streaming) notebook loading to the runtimed daemon so the UI can render cells as they’re parsed and synced, rather than waiting for the full notebook to load. It also introduces NotebookDoc::add_cell_full to reduce bulk-load complexity and switches notebook JSON parsing to jiter for performance.
Changes:
- Add a streaming load path in the v2 sync loop that inserts cells in small batches and sends Automerge sync frames after each batch.
- Add
NotebookDoc::add_cell_full(single-op fully-populated insert) andclear_all_cellsfor rollback after failed loads. - Defer disk load from
handle_open_notebookinto the sync loop, and addjiteras a workspace dependency.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/runtimed/src/notebook_sync_server.rs | Adds streaming-load orchestration, sync batching, inbound frame draining, and room-level load coordination via an atomic flag. |
| crates/runtimed/src/daemon.rs | Defers notebook disk load into the sync loop via needs_load, enabling progressive rendering. |
| crates/runtimed/Cargo.toml | Adds jiter dependency for fast JSON parsing during load. |
| crates/notebook-doc/src/lib.rs | Introduces add_cell_full for efficient bulk inserts and clear_all_cells for cleanup after failed loads. |
| Cargo.toml | Defines workspace jiter dependency and features. |
| Cargo.lock | Locks new transitive dependencies introduced by jiter (including pyo3). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
crates/notebook-doc/src/lib.rs
Outdated
| pub fn clear_all_cells(&mut self) { | ||
| if let Some(cells_id) = self.cells_list_id() { | ||
| let len = self.doc.length(&cells_id); | ||
| // Delete from the end to avoid index shifting | ||
| for i in (0..len).rev() { | ||
| let _ = self.doc.delete(&cells_id, i); | ||
| } | ||
| } |
There was a problem hiding this comment.
clear_all_cells ignores deletion errors (let _ = ...). If a delete fails, the room may remain partially populated and subsequent retries may not behave as intended. Consider returning Result<(), AutomergeError> (or at least logging failures) so callers can detect and handle an incomplete cleanup.
| pub fn clear_all_cells(&mut self) { | |
| if let Some(cells_id) = self.cells_list_id() { | |
| let len = self.doc.length(&cells_id); | |
| // Delete from the end to avoid index shifting | |
| for i in (0..len).rev() { | |
| let _ = self.doc.delete(&cells_id, i); | |
| } | |
| } | |
| pub fn clear_all_cells(&mut self) -> Result<(), AutomergeError> { | |
| if let Some(cells_id) = self.cells_list_id() { | |
| let len = self.doc.length(&cells_id); | |
| // Delete from the end to avoid index shifting | |
| for i in (0..len).rev() { | |
| self.doc.delete(&cells_id, i)?; | |
| } | |
| } | |
| Ok(()) |
There was a problem hiding this comment.
Fixed in b2a64fc — clear_all_cells now returns Result<(), AutomergeError> and propagates delete errors.
| @@ -387,6 +430,20 @@ impl NotebookDoc { | |||
| } | |||
| } | |||
|
|
|||
| /// Remove all cells from the document. | |||
| /// | |||
| /// Used to clean up after a failed streaming load so the next | |||
| /// connection can retry from a clean state. | |||
| pub fn clear_all_cells(&mut self) { | |||
| if let Some(cells_id) = self.cells_list_id() { | |||
| let len = self.doc.length(&cells_id); | |||
| // Delete from the end to avoid index shifting | |||
| for i in (0..len).rev() { | |||
| let _ = self.doc.delete(&cells_id, i); | |||
| } | |||
| } | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
New public add_cell_full/clear_all_cells APIs don’t have direct unit tests here, even though this file has extensive coverage for other cell operations. Adding targeted tests (e.g., add_cell_full populates source/outputs/execution_count in one op; clear_all_cells leaves cell_count()==0 and preserves notebook_id) would help lock in the intended semantics.
There was a problem hiding this comment.
Added in b2a64fc — four tests covering add_cell_full (all fields populated, empty source, index ordering) and clear_all_cells (cells removed, notebook_id preserved).
| notify = "8" | ||
| notify-debouncer-mini = "0.7" | ||
| ts-rs = { version = "12", features = ["serde-compat"] } | ||
| jiter = { version = "0.13", default-features = false, features = ["num-bigint"] } |
There was a problem hiding this comment.
Adding jiter introduces a transitive dependency on pyo3 (see Cargo.lock), which can significantly increase build times and may require Python tooling/headers in environments that previously built runtimed without Python. Please confirm this is acceptable for the daemon build targets, or consider an alternative JSON parser / a jiter configuration that avoids pulling in pyo3 if possible.
There was a problem hiding this comment.
This is not the case — jiter is configured with default-features = false, features = ["num-bigint"] specifically to avoid pyo3. The python feature (which pulls in pyo3) is opt-in and not enabled. Confirmed with cargo tree -p runtimed -i pyo3 which prints "nothing to print."
| /// Uses a short timeout to avoid blocking: we just want to clear whatever's | ||
| /// already in the buffer. | ||
| async fn drain_incoming_frames<R>(reader: &mut R, room: &NotebookRoom, peer_state: &mut sync::State) | ||
| where | ||
| R: AsyncRead + Unpin, | ||
| { | ||
| loop { | ||
| match tokio::time::timeout( | ||
| std::time::Duration::from_millis(1), | ||
| connection::recv_typed_frame(reader), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(Some(frame))) => { | ||
| if matches!(frame.frame_type, NotebookFrameType::AutomergeSync) { | ||
| if let Ok(msg) = sync::Message::decode(&frame.payload) { | ||
| let mut doc = room.doc.write().await; | ||
| let _ = doc.receive_sync_message(peer_state, msg); | ||
| } | ||
| } | ||
| // Ignore non-sync frames during load (requests will be | ||
| // handled once the main sync loop starts). | ||
| } | ||
| // Timeout or error or EOF — stop draining |
There was a problem hiding this comment.
drain_incoming_frames wraps connection::recv_typed_frame in tokio::time::timeout. recv_typed_frame ultimately uses read_exact on the underlying stream, which is not cancellation-safe; timing out can cancel mid-frame and leave the connection desynchronized (next read will start in the middle of a frame). Consider switching to a non-canceling readiness check + buffered frame parser (e.g., keep an internal read buffer and only parse complete frames, or use a length-delimited codec) instead of timing out an in-progress frame read.
| /// Uses a short timeout to avoid blocking: we just want to clear whatever's | |
| /// already in the buffer. | |
| async fn drain_incoming_frames<R>(reader: &mut R, room: &NotebookRoom, peer_state: &mut sync::State) | |
| where | |
| R: AsyncRead + Unpin, | |
| { | |
| loop { | |
| match tokio::time::timeout( | |
| std::time::Duration::from_millis(1), | |
| connection::recv_typed_frame(reader), | |
| ) | |
| .await | |
| { | |
| Ok(Ok(Some(frame))) => { | |
| if matches!(frame.frame_type, NotebookFrameType::AutomergeSync) { | |
| if let Ok(msg) = sync::Message::decode(&frame.payload) { | |
| let mut doc = room.doc.write().await; | |
| let _ = doc.receive_sync_message(peer_state, msg); | |
| } | |
| } | |
| // Ignore non-sync frames during load (requests will be | |
| // handled once the main sync loop starts). | |
| } | |
| // Timeout or error or EOF — stop draining | |
| /// We avoid wrapping `recv_typed_frame` in a timeout, because it ultimately | |
| /// uses `read_exact` on the underlying stream, which is not cancellation-safe: | |
| /// timing out mid-frame could desynchronize the protocol. | |
| async fn drain_incoming_frames<R>(reader: &mut R, room: &NotebookRoom, peer_state: &mut sync::State) | |
| where | |
| R: AsyncRead + Unpin, | |
| { | |
| loop { | |
| match connection::recv_typed_frame(reader).await { | |
| Ok(Some(frame)) => { | |
| if matches!(frame.frame_type, NotebookFrameType::AutomergeSync) { | |
| if let Ok(msg) = sync::Message::decode(&frame.payload) { | |
| let mut doc = room.doc.write().await; | |
| let _ = doc.receive_sync_message(peer_state, msg); | |
| } | |
| } | |
| // Ignore non-sync frames during load (requests will be | |
| // handled once the main sync loop starts). | |
| } | |
| // Error or EOF — stop draining |
There was a problem hiding this comment.
Good catch on cancellation safety. Fixed in b2a64fc — drain_incoming_frames is now a no-op. With release-mode load times (~56ms for 50 cells), the OS socket buffer (typically 64KB+) absorbs the client's sync replies without issue. The replies are processed normally once the main select loop starts after streaming completes.
| async fn drain_incoming_frames<R>(reader: &mut R, room: &NotebookRoom, peer_state: &mut sync::State) | ||
| where | ||
| R: AsyncRead + Unpin, | ||
| { | ||
| loop { | ||
| match tokio::time::timeout( | ||
| std::time::Duration::from_millis(1), | ||
| connection::recv_typed_frame(reader), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(Some(frame))) => { | ||
| if matches!(frame.frame_type, NotebookFrameType::AutomergeSync) { | ||
| if let Ok(msg) = sync::Message::decode(&frame.payload) { | ||
| let mut doc = room.doc.write().await; | ||
| let _ = doc.receive_sync_message(peer_state, msg); | ||
| } | ||
| } | ||
| // Ignore non-sync frames during load (requests will be | ||
| // handled once the main sync loop starts). | ||
| } | ||
| // Timeout or error or EOF — stop draining | ||
| _ => break, | ||
| } | ||
| } |
There was a problem hiding this comment.
drain_incoming_frames currently consumes and drops any non-AutomergeSync frames received during load. If the client sends requests (e.g., kernel control/messages) while streaming load is running, those frames will be lost and never handled by the main sync loop. To avoid breaking client behavior, buffer non-sync frames and replay them once the steady-state loop starts, or only drain bytes that are known to be sync replies without parsing/consuming other frame types.
| async fn drain_incoming_frames<R>(reader: &mut R, room: &NotebookRoom, peer_state: &mut sync::State) | |
| where | |
| R: AsyncRead + Unpin, | |
| { | |
| loop { | |
| match tokio::time::timeout( | |
| std::time::Duration::from_millis(1), | |
| connection::recv_typed_frame(reader), | |
| ) | |
| .await | |
| { | |
| Ok(Ok(Some(frame))) => { | |
| if matches!(frame.frame_type, NotebookFrameType::AutomergeSync) { | |
| if let Ok(msg) = sync::Message::decode(&frame.payload) { | |
| let mut doc = room.doc.write().await; | |
| let _ = doc.receive_sync_message(peer_state, msg); | |
| } | |
| } | |
| // Ignore non-sync frames during load (requests will be | |
| // handled once the main sync loop starts). | |
| } | |
| // Timeout or error or EOF — stop draining | |
| _ => break, | |
| } | |
| } | |
| async fn drain_incoming_frames<R>(_: &mut R, _: &NotebookRoom, _: &mut sync::State) | |
| where | |
| R: AsyncRead + Unpin, | |
| { | |
| // Intentionally a no-op. | |
| // | |
| // The previous implementation consumed all incoming frames and only | |
| // processed `AutomergeSync` frames, silently dropping any other frame | |
| // types (e.g. kernel control/messages) that arrived during streaming | |
| // load. Those non-sync frames would then never be seen by the main | |
| // sync loop, breaking client behavior. | |
| // | |
| // By not reading from `reader` here, we avoid losing any frames; all | |
| // traffic is instead handled by the main sync loop after load | |
| // completes. |
There was a problem hiding this comment.
Moot now — drain is a no-op per the fix above. No frames are consumed during streaming load; all traffic hits the main sync loop after load completes.
| Err(e) => { | ||
| room.finish_loading(); | ||
| // Clear partial cells so the next connection can retry | ||
| let mut doc = room.doc.write().await; | ||
| doc.clear_all_cells(); | ||
| warn!( | ||
| "[notebook-sync] Streaming load failed for {}: {}", | ||
| load_path.display(), | ||
| e | ||
| ); | ||
| return Err(anyhow::anyhow!("Streaming load failed: {}", e)); | ||
| } |
There was a problem hiding this comment.
On streaming-load failure you clear the server doc with doc.clear_all_cells() but don't broadcast that change (no sync message and no changed_tx.send(())). Any other connected peers that already received partial batches will keep displaying stale cells and may never converge back to the cleared state. After clearing, generate and send/broadcast a sync message (and/or notify via changed_tx) so all peers observe the rollback.
There was a problem hiding this comment.
Fixed in b2a64fc — changed_tx.send(()) is now called after clear_all_cells() on failure, so peers that received partial batches converge to the cleared state.
| pub(crate) async fn streaming_load_cells<R, W>( | ||
| reader: &mut R, | ||
| writer: &mut W, | ||
| room: &NotebookRoom, | ||
| path: &Path, | ||
| peer_state: &mut sync::State, | ||
| ) -> Result<usize, String> | ||
| where | ||
| R: AsyncRead + Unpin, | ||
| W: AsyncWrite + Unpin, | ||
| { |
There was a problem hiding this comment.
streaming_load_cells/drain_incoming_frames introduce new, complex protocol behavior (batching, interleaved sync send/recv) but there are no tests covering it in this file’s test module. Adding an integration-style test using tokio::io::duplex (or a small in-memory harness) to assert progressive cell insertion and that sync replies don’t deadlock would help prevent regressions.
There was a problem hiding this comment.
The bench test (bench_streaming_load_phases) covers the core path against a real notebook. A full integration test with tokio::io::duplex simulating the sync protocol would be valuable but is a bigger lift — noting it as a follow-up.
…l_full update_text runs a Myers diff from "" → source, which is unnecessary when the Text CRDT was just created and is guaranteed empty. splice_text inserts directly at position 0. Also adds a benchmark test (--ignored) for profiling streaming load phases against a real notebook. The benchmark showed: - Debug build: 7.7s (Automerge overhead without optimizations) - Release build: 56ms total (32ms blob, 21ms add_cell_full, 3ms sync) The 8s load time observed during QA was entirely debug build overhead. In release, batch times are flat ~1ms regardless of document size.
Reflects add_cell_full and clear_all_cells additions to notebook-doc. WASM was already built in release mode (wasm-pack defaults to --release).
…, tests - clear_all_cells now returns Result<(), AutomergeError> instead of silently ignoring delete errors - drain_incoming_frames is now a no-op: recv_typed_frame uses read_exact which isn't cancellation-safe, and with ~56ms release load times the OS socket buffer easily absorbs client sync replies - Failure rollback now broadcasts via changed_tx so peers that received partial batches converge to the cleared state - Added unit tests for add_cell_full (all fields, empty source, index ordering) and clear_all_cells (preserves notebook_id) in notebook-doc
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| { | ||
| let mut doc = room.doc.write().await; | ||
| let _ = doc.clear_all_cells(); | ||
| } |
There was a problem hiding this comment.
clear_all_cells() now returns Result, but the streaming-load failure path ignores it (let _ = doc.clear_all_cells()). If cleanup fails, the room can remain partially populated and later retries may behave unpredictably. Please propagate/log the error (and consider returning an error if cleanup fails) so failure recovery is deterministic.
| { | |
| let mut doc = room.doc.write().await; | |
| let _ = doc.clear_all_cells(); | |
| } | |
| let clear_result = { | |
| let mut doc = room.doc.write().await; | |
| doc.clear_all_cells() | |
| }; | |
| if let Err(clear_err) = clear_result { | |
| error!( | |
| "[notebook-sync] Streaming load cleanup failed for {}: load error: {}, cleanup error: {}", | |
| load_path.display(), | |
| e, | |
| clear_err | |
| ); | |
| return Err(anyhow::anyhow!( | |
| "Streaming load failed and cleanup failed: load error: {}, cleanup error: {}", | |
| e, | |
| clear_err | |
| )); | |
| } |
| load_path.display(), | ||
| e | ||
| ); | ||
| return Err(anyhow::anyhow!("Streaming load failed: {}", e)); |
There was a problem hiding this comment.
On streaming-load failure, the daemon has already sent NotebookConnectionInfo { error: None }, but this path returns an Err, which will typically just drop the connection. That makes load failures hard for the client to distinguish from transient disconnects. Consider sending an explicit in-protocol error notification before returning (e.g., a NotebookBroadcast::KernelStatus { status: "error: ...", cell_id: None } or introducing a dedicated broadcast for load errors) and/or returning Ok(()) after notifying so the client can surface a clear error state.
| return Err(anyhow::anyhow!("Streaming load failed: {}", e)); | |
| // Do not return an error here: keep the connection alive so the client | |
| // can observe the cleared document state and surface a clear error. |
With streaming load, cell_count in the handshake is 0 because loading is deferred to the sync loop. Cells arrive via Automerge sync messages after the connection is established.
Load notebooks progressively — users see cells appearing as they're parsed rather than waiting for the entire notebook to finish loading.
What changed
Outputs routed through blob store on load (#668, included here) — during kernel execution, outputs already went through
create_manifest/store_manifestso only 64-char hashes land in the Automerge CRDT. But loading from disk stuffed raw JSON (megabytes of base64 images) directly into Automerge. Nowload_notebook_from_diskandapply_ipynb_changesboth route outputs through the same manifest pipeline.add_cell_fullonNotebookDoc— inserts a fully-populated cell in a single operation, reusingObjIds from creation. Eliminates 3× O(n)find_cell_indexscans per cell that made sequentialadd_cell+update_source+set_outputs+set_execution_countO(n²) during bulk loads. Usessplice_textinstead ofupdate_textto skip the Myers diff when the Text CRDT is known-empty.streaming_load_cells— parses the notebook, adds cells in batches of 3, sends Automerge sync messages after each batch so the frontend renders progressively. Outputs go through the blob store manifest pipeline.jiterfor JSON parsing — zero-copy string references for cell metadata, only allocating when converting outputs toserde_json::Valueforcreate_manifest. Avoids the serialize→parse round-trip that the oldCellSnapshotpath had.try_start_loading/finish_loadingonNotebookRoom— atomic CAS prevents two connections from both loading. Second connection joins mid-stream viachanged_rx.clear_all_cellsonNotebookDoc— cleanup after failed streaming load. ReturnsResultand broadcasts the clear so peers converge.Deferred load in
daemon.rs— notebook load no longer blocks inhandle_open_notebook. The path is passed through to the sync loop, which streams cells after the handshake.Performance
Benchmarked against gelmanschools/index.ipynb (50 cells, 1.4MB, 11 large image outputs):
Batch times are flat (~1ms each) regardless of document size. No O(n²).
Not in this PR
Frontend loading indicator (Phase 7 from the plan). Cells appear progressively but there's no explicit "loading" state in the UI yet.
PR submitted by @rgbkrk's agent Quill, via Zed