Skip to content

Set notebook progressively#393

Open
davidbrochart wants to merge 8 commits intojupyter-server:mainfrom
davidbrochart:aset-transaction
Open

Set notebook progressively#393
davidbrochart wants to merge 8 commits intojupyter-server:mainfrom
davidbrochart:aset-transaction

Conversation

@davidbrochart
Copy link
Collaborator

@davidbrochart davidbrochart marked this pull request as draft March 13, 2026 13:51
@davidbrochart davidbrochart added the enhancement New feature or request label Mar 13, 2026
await lowlevel.checkpoint()
gen = self._set(value)
while True:
async with self._ydoc.transaction():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async context manager makes it possible to register async document observer callbacks, so that back-pressure can be applied e.g. to send updates over the wire.

@davidbrochart davidbrochart marked this pull request as ready for review March 13, 2026 15:08
async def test_modify_single_cell(modifications, expected_events, doc_action):
do, is_async = doc_action
if is_async:
pytest.skip(reason="FIXME: check progressive updates")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy to test events, since they are now more granular.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is gone in f537967, and we update the notebook in a single transaction in this test.

return val

async def aset(self, value: dict) -> None:
async def aset(self, value: dict, progressive: bool = False) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the progressive creation of the notebook is behind a flag, this PR keeps the previous behavior by default.

nb.observe(record_changes)
await do(nb, "set", model)
if is_async:
await do(nb, "set", model, progressive=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to parametrize this test with progressive True/False? I guess it's a bit confusing when async=False but we could skip (async=False, progressive=True) as not implemented.

if progressive:
gen = self._set(value)
while True:
async with self._ydoc.transaction():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about enforcing async transactions. They currently require global collaboration in the code base. On the other hand it's the only way to support back-pressure.

@davidbrochart
Copy link
Collaborator Author

I don't know if we should set all cell inputs first, then cell outputs. That might complicate the logic in _set?

@krassowski
Copy link
Collaborator

I don't know if we should set all cell inputs first, then cell outputs. That might complicate the logic in _set?

Maybe we could contain the delayed output insertion to create_ycell and use tasks to delay the change? Though if we just used set and forget tasks, then await aset() would not actually await for completion, so I think we would want to modify _set to await for these extra tasks then.

def create_ycell(self, value: dict[str, Any]) -> Map:
"""
Creates YMap with the content of the cell.
:param value: A cell.
:type value: Dict[str, Any]
:return: A new cell.
:rtype: :class:`pycrdt.Map`
"""
cell = copy.deepcopy(value)
if "id" not in cell:
cell["id"] = str(uuid4())
cell_type = cell["cell_type"]
cell_source = cell["source"]
cell_source = "".join(cell_source) if isinstance(cell_source, list) else cell_source
cell["source"] = Text(cell_source)
cell["metadata"] = Map(cell.get("metadata", {}))
if cell_type in ("raw", "markdown"):
if "attachments" in cell and not cell["attachments"]:
del cell["attachments"]
elif cell_type == "code":
outputs = cell.get("outputs", [])
for idx, output in enumerate(outputs):
if output.get("output_type") == "stream":
text = output.get("text", "")
if isinstance(text, str):
ytext = Text(text)
else:
ytext = Text("".join(text))
output["text"] = ytext
outputs[idx] = Map(output)
cell["outputs"] = Array(outputs)
cell["execution_state"] = "idle"
return Map(cell)

I initially suggested aset(model["content"], delay_outputs_above_mb=100) - this could be fed down to create_ycell(value, delay_outputs_above_mb=delay_outputs_above_mb) and then it would check the size of outputs in Python memory (e.g. with recursive getsizeof). Or maybe delay_outputs_above_mb, could be a class attribute/property/initialization option of YNotebook.

Regardless of the way we implement it, I feel like this could be a separate PR.

@davidbrochart
Copy link
Collaborator Author

I also think this could be done in another PR, this one already allows us to experiment. I started doing that in Jupyverse and it seems to work fine.

Copy link
Collaborator

@krassowski krassowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is useful, but not sure if final. I would recommend merging after flagging progressive as experimental.

I started doing that in Jupyverse and it seems to work fine.

Did you manage to get the cells to show up as websocket streams? So far I only can confirm that streaming works fine as in instead of one big blob over websocket we get many small blobs as a function of cell count (good). I wonder if we will need to also emit sync step messages in between updates so that frontend knows when to process a batch?

@krassowski
Copy link
Collaborator

krassowski commented Mar 17, 2026

I had some success putting the awaitable returned by aset() into a background task. Interestingly the updates on the websocket seem to get batched in a way that I don't fully follow. Nope I forgot to remove Ystore.

Should we also add progressive: bool to other document types? I guess we could chunk large text files too?

@davidbrochart
Copy link
Collaborator Author

I think the sync handshake should happen first (with an empty document), and then we start populating the document. This raises the question of how a frontend will react to an empty document. For instance, I don't think the frontend will like an empty notebook (not only no cell, but also no metadata), as it will think it is corrupted (it doesn't conform to nbformat).
If the sync handshake happens after the document is populated, its content will be sent in one WebSocket message. It means that only the very first client will receive the content in multiple WebSocket messages, because for clients connecting after, the document will already be populated.
Correct me if what I said is wrong. Otherwise, I'm starting to doubt it's worth going in that direction.

@krassowski
Copy link
Collaborator

After some more attempts I had a reasonable success with this patch:

diff --git a/packages/docprovider/src/yprovider.ts b/packages/docprovider/src/yprovider.ts
index bbbd10a..116a971 100644
--- a/packages/docprovider/src/yprovider.ts
+++ b/packages/docprovider/src/yprovider.ts
@@ -135,6 +135,7 @@ export class WebSocketProvider implements IDocumentProvider, IForkProvider {
       }
     );
 
+    this._yWebsocketProvider.on('status', this._onStatus);
     this._yWebsocketProvider.on('sync', this._onSync);
     this._yWebsocketProvider.on('connection-close', this._onConnectionClosed);
   }
@@ -165,6 +166,7 @@ export class WebSocketProvider implements IDocumentProvider, IForkProvider {
   private _disconnect(): void {
     this._yWebsocketProvider?.off('connection-close', this._onConnectionClosed);
     this._yWebsocketProvider?.off('sync', this._onSync);
+    this._yWebsocketProvider?.off('status', this._onStatus);
     this._yWebsocketProvider?.destroy();
     this._yWebsocketProvider = null;
   }
@@ -208,6 +210,14 @@ export class WebSocketProvider implements IDocumentProvider, IForkProvider {
         const state = this._sharedModel.ydoc.getMap('state');
         state.set('document_id', this._yWebsocketProvider.roomname);
       }
+      // TODO replace it with synced? Or use a new promise for connected
+      // and use that instead of `provider.ready` in ydrive?
+      this._ready.resolve();
+    }
+  };
+
+  private _onStatus = (status: {status: 'connecting' | 'connected' | 'disconnected'}) => {
+    if (status.status === 'connected') {
       this._ready.resolve();
     }
   };
diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
index 9cc4543..13731da 100644
--- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
+++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
@@ -160,10 +160,16 @@ class DocumentRoom(YRoom):
                     self._room_id,
                     self._file.path,
                 )
-                await self._document.aset(model["content"])
+                awaitable = self._document.aset(model["content"], progressive=True)  # TODO check if progressive supported
+                self.ready = True # as in ready to start streaming, not ready as all content is shown
+                self._emit(LogLevel.INFO, "initialize", "Room initialized")
+                self.create_task(awaitable)
+                self._document.dirty = False
+                return
 
               if self.ystore:
                   await self.ystore.encode_state_as_update(self.ydoc)
 
             self._document.dirty = False
             self.ready = True

I did not see any issue on the frontend. The only issue so far was an error in saving routine:

    
    Traceback (most recent call last):
      File "/jupyter_collaboration/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py", line 324, in _maybe_save_document
        saved_model = await self._file.maybe_save_content(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/jupyter_collaboration/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py", line 183, in maybe_save_content
        saved_model = await asyncio.shield(task)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/jupyter_collaboration/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py", line 197, in _save_content
        m = await ensure_async(self._contents_manager.save(model, self.path))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/jupyter_core/utils/__init__.py", line 214, in ensure_async
        result = await obj
                 ^^^^^^^^^
      File "/jupyter_server/services/contents/largefilemanager.py", line 133, in save
        return await super().save(model, path)

This is the _update_lock is not sufficient while we load notebook, we should have another lock which we only release once we completed loading.

@krassowski
Copy link
Collaborator

If the sync handshake happens after the document is populated, its content will be sent in one WebSocket message. It means that only the very first client will receive the content in multiple WebSocket messages, because for clients connecting after, the document will already be populated.

Yes. Unless we find a way to replay the transactions by following a similar aset(progressive=True) routine on each subscription, but that's a harder problem to solve. I think we can try addressing it in the future, but that does not hold this PR.

For instance, I don't think the frontend will like an empty notebook (not only no cell, but also no metadata)

"no cell" is no longer a problem, "no metadata" is kind of a problem because it will show "Choose kernel dialog", but we could enforce sending metadata first and delaying "ready" signal (or whatever we will call it) until metadata was received.

@davidbrochart
Copy link
Collaborator Author

Unless we find a way to replay the transactions by following a similar aset(progressive=True) routine on each subscription, but that's a harder problem to solve. I think we can try addressing it in the future, but that does not hold this PR.

The problem is that it's taken care of in the handshake mechanism, so not something we directly control. I don't think we can just replay changes, because they depend on the differences between the peers.
Sure we can merge this PR because it's currently harmless, but we might never use it.

@krassowski
Copy link
Collaborator

The problem is that it's taken care of in the handshake mechanism, so not something we directly control

I will open an issue on yjs to see if that was explored/could be supported in the future.

Sure we can merge this PR because it's currently harmless, but we might never use it.

Well, it would already be great for YStore disabled (no-op implementation) use case.

@davidbrochart
Copy link
Collaborator Author

I will open an issue on yjs to see if that was explored/could be supported in the future.

I opened y-crdt/y-crdt#602.

Well, it would already be great for YStore disabled (no-op implementation) use case.

I don't understand what you mean, I don't see how it can work except for the very first client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants