Skip to content

Commit 81b73cf

Browse files
authored
feat: unified frame pipe — single channel, WASM demux, shared frame type constants (#721)
* feat: unified frame pipe — single channel, single event, single command Replaces the multi-channel relay architecture with one pipe: Before: 2 channels (sync_tx, broadcast_tx), 2 relay tasks, 2 Tauri events After: 1 channel (frame_tx), 1 relay task, 1 event (daemon:frame) PipeChannel { frame_tx } replaces the old raw_sync_tx parameter. All frame types (AutomergeSync, Broadcast, Presence) flow through one channel preserving daemon-sent order. Response frames are still consumed by the request/response cycle. New send_frame Tauri command handles outgoing frames by type byte. Old send_automerge_sync kept for backward compat during migration. Frame type constants moved to notebook-doc::frame_types so all consumers (daemon, WASM, Tauri, Python) share one source of truth. WASM gains receive_frame() method that demuxes by frame type byte, applies sync internally, and returns typed FrameEvent JSON for the frontend — all protocol knowledge lives in Rust. Pending frame buffering during request/response now prepends the type byte so frames pipe correctly after the response arrives. * feat(frontend): unified daemon:frame listener with WASM demux Replaces the automerge:from-daemon listener with daemon:frame which receives all frame types through the unified pipe. The WASM handle.receive_frame() demuxes by frame type byte, applies sync internally, and returns typed FrameEvent JSON. Broadcasts are re-emitted as daemon:broadcast for backward compat with useDaemonKernel and useEnvProgress. Presence events re-emitted as daemon:presence for usePresence. Sync replies from receive_frame (SyncReply events) are sent back to the daemon via send_frame with the type byte prepended. frame-types.ts provides shared constants matching notebook_doc::frame_types in Rust. WASM artifacts rebuilt with receive_frame() and presence exports. * fix: address Copilot review — buffer all frame types, rename methods, update docs - Buffer Broadcast and Presence frames during wait_for_response_with_broadcast as typed frame bytes (type byte + payload) in pending_sync_frames, so they reach the frontend via the unified pipe after the response arrives. Previously only AutomergeSync was buffered; Broadcast went through a separate broadcast_tx and Presence was dropped. - Rename connect_split_with_raw_sync → connect_split_with_pipe (Unix + Windows) - Update setup_sync_receivers doc comment to describe unified relay - Update receive_frame doc comment to clarify SyncReply wire format - Remove unnecessary braces around process_incoming_frame block * fix: double type prefix bug + rename pending_sync_frames → pending_pipe_frames pending_sync_frames now stores fully-typed frames (type byte + payload) for AutomergeSync, Broadcast, and Presence. The drain loop was prepending another AutomergeSync type byte, corrupting frames and mislabeling non-sync frames. Fix: drain loop forwards as-is. Rename to pending_pipe_frames to reflect that it buffers all frame types, not just sync. * build: rebuild WASM artifacts with receive_frame and presence exports * feat(presence): add usePresence hook, remove send_automerge_sync - Create usePresence.ts: listens for daemon:presence events, maintains remote peer state (cursors/selections), exposes setCursor/setSelection for outgoing presence via send_frame - Migrate remaining send_automerge_sync calls to send_frame in save() and notebook-metadata syncToRelay() - Remove dead send_automerge_sync Tauri command (frontend fully migrated) - Update doc comments to reference send_frame * refactor: rename daemon:frame/broadcast/presence to notebook: prefix Room-scoped events belong to the notebook: namespace. The daemon: prefix is reserved for connection lifecycle events (daemon:ready, daemon:disconnected). - daemon:frame → notebook:frame - daemon:broadcast → notebook:broadcast - daemon:presence → notebook:presence * fix: address copilot review — catch on unlisten, stale comment - Add .catch(() => {}) to unlisten cleanup calls per repo pattern - Update stale automerge:from-daemon reference to notebook:frame - WASM artifacts still have daemon:frame in docs — will fix on next wasm-pack rebuild * build: rebuild WASM artifacts with notebook:frame doc comments
1 parent 08dfcf5 commit 81b73cf

File tree

16 files changed

+970
-403
lines changed

16 files changed

+970
-403
lines changed

apps/notebook/src/hooks/useAutomergeNotebook.ts

Lines changed: 86 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
save as saveDialog,
66
} from "@tauri-apps/plugin-dialog";
77
import { useCallback, useEffect, useRef, useState } from "react";
8+
import { frame_types } from "../lib/frame-types";
89
import { logger } from "../lib/logger";
910
import {
1011
type CellSnapshot,
@@ -100,8 +101,12 @@ export function useAutomergeNotebook() {
100101
const syncToRelay = useCallback((handle: NotebookHandle) => {
101102
const msg = handle.generate_sync_message();
102103
if (msg) {
103-
invoke("send_automerge_sync", {
104-
syncMessage: Array.from(msg),
104+
// Prepend frame type byte for the unified send_frame command
105+
const frameData = new Uint8Array(1 + msg.length);
106+
frameData[0] = frame_types.AUTOMERGE_SYNC;
107+
frameData.set(msg, 1);
108+
invoke("send_frame", {
109+
frameData: Array.from(frameData),
105110
}).catch((e: unknown) =>
106111
logger.warn("[automerge-notebook] sync to relay failed:", e),
107112
);
@@ -137,7 +142,7 @@ export function useAutomergeNotebook() {
137142
* Any IPC failures are logged and do not cause `bootstrap()` to reject.
138143
*
139144
* Loading state is set to `true` here and is cleared when the first
140-
* `automerge:from-daemon` message is received, regardless of its
145+
* `notebook:frame` sync message is received, regardless of its
141146
* `changed` flag.
142147
*/
143148
const bootstrap = useCallback(async () => {
@@ -205,36 +210,82 @@ export function useAutomergeNotebook() {
205210
},
206211
);
207212

208-
// ── Incoming Automerge sync from daemon (via Tauri relay) ────────
209-
const unlistenSync = webview.listen<number[]>(
210-
"automerge:from-daemon",
213+
// ── Incoming frames from daemon (unified pipe) ──────────────────
214+
//
215+
// All frame types (AutomergeSync, Broadcast, Presence) arrive through
216+
// one event. The WASM handle.receive_frame() demuxes by the first byte,
217+
// applies sync internally, and returns typed FrameEvent JSON.
218+
//
219+
// Broadcasts are re-emitted as "notebook:broadcast" for backward compat
220+
// with useDaemonKernel and useEnvProgress (they listen independently).
221+
const unlistenFrame = webview.listen<number[]>(
222+
"notebook:frame",
211223
async (event) => {
212224
if (cancelled) return;
213225
const handle = handleRef.current;
214226
if (!handle) return;
215227
try {
216228
const bytes = new Uint8Array(event.payload);
217-
const changed = handle.receive_sync_message(bytes);
218-
if (awaitingInitialSyncRef.current) {
219-
awaitingInitialSyncRef.current = false;
220-
setIsLoading(false);
221-
}
222-
if (changed) {
223-
await materializeCells(handle);
224-
// Notify metadata subscribers (useSyncExternalStore) that the
225-
// doc changed. This covers metadata updates from the daemon
226-
// (e.g. trust re-signing, dependency sync from other windows).
227-
// Note: local cell mutations (add/delete/source) don't call
228-
// notifyMetadataChanged() because they only touch cells, not
229-
// the metadata key. If a future mutation affects metadata,
230-
// add a notify call there.
231-
notifyMetadataChanged();
229+
const resultJson = handle.receive_frame(bytes);
230+
if (!resultJson) return;
231+
232+
const events: Array<{
233+
type: string;
234+
changed?: boolean;
235+
reply?: number[];
236+
payload?: unknown;
237+
}> = JSON.parse(resultJson);
238+
239+
for (const frameEvent of events) {
240+
switch (frameEvent.type) {
241+
case "sync_applied": {
242+
if (awaitingInitialSyncRef.current) {
243+
awaitingInitialSyncRef.current = false;
244+
setIsLoading(false);
245+
}
246+
if (frameEvent.changed) {
247+
await materializeCells(handle);
248+
notifyMetadataChanged();
249+
}
250+
break;
251+
}
252+
case "sync_reply": {
253+
// WASM generated a sync response — send it back to the daemon
254+
if (frameEvent.reply) {
255+
const replyData = new Uint8Array(1 + frameEvent.reply.length);
256+
replyData[0] = frame_types.AUTOMERGE_SYNC;
257+
replyData.set(new Uint8Array(frameEvent.reply), 1);
258+
invoke("send_frame", {
259+
frameData: Array.from(replyData),
260+
}).catch((e: unknown) =>
261+
logger.warn("[automerge-notebook] sync reply failed:", e),
262+
);
263+
}
264+
break;
265+
}
266+
case "broadcast": {
267+
// Re-emit as "notebook:broadcast" for useDaemonKernel/useEnvProgress
268+
// backward compat. They listen independently and expect JSON payloads.
269+
if (frameEvent.payload) {
270+
webview
271+
.emit("notebook:broadcast", frameEvent.payload)
272+
.catch(() => {});
273+
}
274+
break;
275+
}
276+
case "presence": {
277+
// Re-emit for usePresence hook
278+
if (frameEvent.payload) {
279+
webview
280+
.emit("notebook:presence", frameEvent.payload)
281+
.catch(() => {});
282+
}
283+
break;
284+
}
285+
}
232286
}
233-
// The sync protocol may need multiple roundtrips — always
234-
// check whether we have something to send back.
235-
syncToRelay(handle);
236287
} catch (e) {
237-
logger.warn("[automerge-notebook] receive sync failed:", e);
288+
logger.warn("[automerge-notebook] receive frame failed:", e);
238289
}
239290
},
240291
);
@@ -257,17 +308,17 @@ export function useAutomergeNotebook() {
257308

258309
return () => {
259310
cancelled = true;
260-
unlistenReady.then((fn) => fn());
261-
unlistenFileOpened.then((fn) => fn());
262-
unlistenSync.then((fn) => fn());
263-
unlistenClearOutputs.then((fn) => fn());
311+
unlistenReady.then((fn) => fn()).catch(() => {});
312+
unlistenFileOpened.then((fn) => fn()).catch(() => {});
313+
unlistenFrame.then((fn) => fn()).catch(() => {});
314+
unlistenClearOutputs.then((fn) => fn()).catch(() => {});
264315
// Free WASM handle.
265316
resetNotebookCells();
266317
setNotebookHandle(null);
267318
handleRef.current?.free();
268319
handleRef.current = null;
269320
};
270-
}, [bootstrap, materializeCells, syncToRelay, refreshBlobPort]);
321+
}, [bootstrap, materializeCells, refreshBlobPort]);
271322

272323
// ── Cell mutations ─────────────────────────────────────────────────
273324

@@ -412,8 +463,11 @@ export function useAutomergeNotebook() {
412463
if (handle) {
413464
const msg = handle.generate_sync_message();
414465
if (msg) {
415-
await invoke("send_automerge_sync", {
416-
syncMessage: Array.from(msg),
466+
const frameData = new Uint8Array(1 + msg.length);
467+
frameData[0] = frame_types.AUTOMERGE_SYNC;
468+
frameData.set(msg, 1);
469+
await invoke("send_frame", {
470+
frameData: Array.from(frameData),
417471
});
418472
}
419473
}

apps/notebook/src/hooks/useDaemonKernel.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ export function useDaemonKernel({
146146
refreshBlobPort();
147147

148148
const unlistenBroadcast = webview.listen<DaemonBroadcast>(
149-
"daemon:broadcast",
149+
"notebook:broadcast",
150150
(event) => {
151151
if (cancelled) return;
152152

@@ -377,7 +377,7 @@ export function useDaemonKernel({
377377
}
378378

379379
case "env_progress":
380-
// Handled by useEnvProgress hook's own daemon:broadcast listener
380+
// Handled by useEnvProgress hook's own notebook:broadcast listener
381381
break;
382382

383383
case "env_sync_state": {
@@ -407,7 +407,7 @@ export function useDaemonKernel({
407407

408408
case "file_changed": {
409409
// External file changes detected and merged into Automerge doc.
410-
// The actual cell data comes through `automerge:from-daemon` (Automerge sync relay).
410+
// The actual cell data comes through `notebook:frame` (Automerge sync relay).
411411
// This broadcast is for notification purposes.
412412
const fileBroadcast = broadcast as {
413413
cells: unknown[];

apps/notebook/src/hooks/useEnvProgress.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,10 @@ export function useEnvProgress() {
212212
processEvent(event.payload);
213213
});
214214

215-
// Also listen for daemon:broadcast events with env_progress
215+
// Also listen for notebook:broadcast events with env_progress
216216
// (from daemon-managed environment preparation during kernel launch)
217217
const unlistenBroadcast = listen<DaemonBroadcast>(
218-
"daemon:broadcast",
218+
"notebook:broadcast",
219219
(event) => {
220220
const broadcast = event.payload;
221221
if (broadcast.event === "env_progress") {

0 commit comments

Comments
 (0)