Skip to content

Commit a524fe0

Browse files
committed
Backport PR jupyter-widgets#3335: Use control comm target in LabManager
1 parent 9636b75 commit a524fe0

File tree

3 files changed

+243
-137
lines changed

3 files changed

+243
-137
lines changed

jupyterlab_widgets/src/manager.ts

Lines changed: 2 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,13 @@ import * as Backbone from 'backbone';
66

77
import {
88
ManagerBase, shims, IClassicComm, IWidgetRegistryData, ExportMap,
9-
ExportData, WidgetModel, WidgetView, put_buffers, serialize_state, IStateOptions
9+
ExportData, WidgetModel, WidgetView, serialize_state, IStateOptions
1010
} from '@jupyter-widgets/base';
1111

1212
import {
1313
IDisposable
1414
} from '@lumino/disposable';
1515

16-
import {
17-
PromiseDelegate
18-
} from '@lumino/coreutils';
19-
2016
import {
2117
Widget
2218
} from '@lumino/widgets';
@@ -225,69 +221,8 @@ class WidgetManager extends ManagerBase<Widget> implements IDisposable {
225221
if (this.context.sessionContext.session?.kernel.handleComms === false) {
226222
return;
227223
}
228-
const comm_ids = await this._get_comm_info();
229-
230-
// For each comm id that we do not know about, create the comm, and request the state.
231-
const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => {
232-
try {
233-
await this.get_model(comm_id);
234-
// If we successfully get the model, do no more.
235-
return;
236-
} catch (e) {
237-
// If we have the widget model not found error, then we can create the
238-
// widget. Otherwise, rethrow the error. We have to check the error
239-
// message text explicitly because the get_model function in this
240-
// class throws a generic error with this specific text.
241-
if (e.message !== 'widget model not found') {
242-
throw e;
243-
}
244-
const comm = await this._create_comm(this.comm_target_name, comm_id);
245-
246-
let msg_id: string;
247-
const info = new PromiseDelegate<Private.ICommUpdateData>();
248-
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
249-
if ((msg.parent_header as any).msg_id === msg_id
250-
&& msg.header.msg_type === 'comm_msg'
251-
&& msg.content.data.method === 'update') {
252-
let data = (msg.content.data as any);
253-
let buffer_paths = data.buffer_paths || [];
254-
// Make sure the buffers are DataViews
255-
let buffers = (msg.buffers || []).map(b => {
256-
if (b instanceof DataView) {
257-
return b;
258-
} else {
259-
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
260-
}
261-
});
262-
put_buffers(data.state, buffer_paths, buffers);
263-
info.resolve({comm, msg});
264-
}
265-
});
266-
msg_id = comm.send({
267-
method: 'request_state'
268-
}, this.callbacks(undefined));
269224

270-
return info.promise;
271-
}
272-
}));
273-
274-
// We put in a synchronization barrier here so that we don't have to
275-
// topologically sort the restored widgets. `new_model` synchronously
276-
// registers the widget ids before reconstructing their state
277-
// asynchronously, so promises to every widget reference should be available
278-
// by the time they are used.
279-
await Promise.all(widgets_info.map(async widget_info => {
280-
if (!widget_info) {
281-
return;
282-
}
283-
const content = widget_info.msg.content as any;
284-
await this.new_model({
285-
model_name: content.data.state._model_name,
286-
model_module: content.data.state._model_module,
287-
model_module_version: content.data.state._model_module_version,
288-
comm: widget_info.comm,
289-
}, content.data.state);
290-
}));
225+
return super._loadFromKernel();
291226
}
292227

293228

@@ -538,16 +473,3 @@ namespace WidgetManager {
538473
saveState: boolean
539474
};
540475
}
541-
542-
543-
namespace Private {
544-
545-
/**
546-
* Data promised when a comm info request resolves.
547-
*/
548-
export
549-
interface ICommUpdateData {
550-
comm: IClassicComm;
551-
msg: KernelMessage.ICommMsgMsg;
552-
}
553-
}

packages/base/src/manager-base.ts

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import * as utils from './utils';
55
import * as services from '@jupyterlab/services';
66

7+
import {
8+
PromiseDelegate,
9+
} from '@lumino/coreutils';
10+
711
import {
812
DOMWidgetView, WidgetModel, WidgetView, DOMWidgetModel
913
} from './widget';
@@ -18,6 +22,21 @@ import {
1822

1923
const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0];
2024

25+
/**
26+
* The control comm target name.
27+
*/
28+
export const CONTROL_COMM_TARGET = 'jupyter.widget.control';
29+
30+
/**
31+
* The supported version for the control comm channel.
32+
*/
33+
export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0';
34+
35+
/**
36+
* Time (in ms) after which we consider the control comm target not responding.
37+
*/
38+
export const CONTROL_COMM_TIMEOUT = 4000;
39+
2140
/**
2241
* The options for a model.
2342
*
@@ -361,7 +380,201 @@ abstract class ManagerBase<T> {
361380
widget_model.name = options.model_name;
362381
widget_model.module = options.model_module;
363382
return widget_model;
383+
}
364384

385+
/**
386+
* Fetch all widgets states from the kernel using the control comm channel
387+
* If this fails (control comm handler not implemented kernel side),
388+
* it will fallback to `_loadFromKernelSlow`.
389+
*
390+
* This is a utility function that can be used in subclasses.
391+
*/
392+
protected async _loadFromKernel(): Promise<void> {
393+
// Try fetching all widget states through the control comm
394+
let data: any;
395+
let buffers: any;
396+
try {
397+
const initComm = await this._create_comm(
398+
CONTROL_COMM_TARGET,
399+
utils.uuid(),
400+
{},
401+
{ version: CONTROL_COMM_PROTOCOL_VERSION }
402+
);
403+
404+
await new Promise((resolve, reject) => {
405+
initComm.on_msg((msg: any) => {
406+
data = msg['content']['data'];
407+
408+
if (data.method !== 'update_states') {
409+
console.warn(`
410+
Unknown ${data.method} message on the Control channel
411+
`);
412+
return;
413+
}
414+
415+
buffers = (msg.buffers || []).map((b: any) => {
416+
if (b instanceof DataView) {
417+
return b;
418+
} else {
419+
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
420+
}
421+
});
422+
423+
resolve(null);
424+
});
425+
426+
initComm.on_close(() => reject('Control comm was closed too early'));
427+
428+
// Send a states request msg
429+
initComm.send({ method: 'request_states' }, {});
430+
431+
// Reject if we didn't get a response in time
432+
setTimeout(
433+
() => reject('Control comm did not respond in time'),
434+
CONTROL_COMM_TIMEOUT
435+
);
436+
});
437+
438+
initComm.close();
439+
} catch (error) {
440+
console.warn(
441+
'Failed to fetch widgets through the "jupyter.widget.control" comm channel, fallback to slow fetching of widgets. Reason:',
442+
error
443+
);
444+
// Fallback to the old implementation for old ipywidgets backend versions (<=7.6)
445+
return this._loadFromKernelSlow();
446+
}
447+
448+
const states: any = data.states;
449+
450+
// Extract buffer paths
451+
const bufferPaths: any = {};
452+
for (const bufferPath of data.buffer_paths) {
453+
if (!bufferPaths[bufferPath[0]]) {
454+
bufferPaths[bufferPath[0]] = [];
455+
}
456+
bufferPaths[bufferPath[0]].push(bufferPath.slice(1));
457+
}
458+
459+
// Start creating all widgets
460+
await Promise.all(
461+
Object.keys(states).map(async (widget_id) => {
462+
try {
463+
const state = states[widget_id];
464+
const comm = await this._create_comm('jupyter.widget', widget_id);
465+
466+
// Put binary buffers
467+
if (widget_id in bufferPaths) {
468+
const nBuffers = bufferPaths[widget_id].length;
469+
utils.put_buffers(
470+
state,
471+
bufferPaths[widget_id],
472+
buffers.splice(0, nBuffers)
473+
);
474+
}
475+
476+
await this.new_model(
477+
{
478+
model_name: state.model_name,
479+
model_module: state.model_module,
480+
model_module_version: state.model_module_version,
481+
model_id: widget_id,
482+
comm: comm,
483+
},
484+
state.state
485+
);
486+
} catch (error) {
487+
// Failed to create a widget model, we continue creating other models so that
488+
// other widgets can render
489+
console.error(error);
490+
}
491+
})
492+
);
493+
}
494+
495+
/**
496+
* Old implementation of fetching widgets one by one using
497+
* the request_state message on each comm.
498+
*
499+
* This is a utility function that can be used in subclasses.
500+
*/
501+
protected async _loadFromKernelSlow(): Promise<void> {
502+
const comm_ids = await this._get_comm_info();
503+
504+
// For each comm id that we do not know about, create the comm, and request the state.
505+
const widgets_info = await Promise.all(
506+
Object.keys(comm_ids).map(async (comm_id) => {
507+
try {
508+
const model = this.get_model(comm_id);
509+
// TODO Have the same this.get_model implementation for
510+
// the widgetsnbextension and labextension, the one that
511+
// throws an error if the model is not found instead of
512+
// returning undefined
513+
if (model === undefined) {
514+
throw new Error('widget model not found');
515+
}
516+
await model;
517+
// If we successfully get the model, do no more.
518+
return;
519+
} catch (e) {
520+
// If we have the widget model not found error, then we can create the
521+
// widget. Otherwise, rethrow the error. We have to check the error
522+
// message text explicitly because the get_model function in this
523+
// class throws a generic error with this specific text.
524+
if (e.message !== 'widget model not found') {
525+
throw e;
526+
}
527+
const comm = await this._create_comm(this.comm_target_name, comm_id);
528+
529+
let msg_id = '';
530+
const info = new PromiseDelegate<Private.ICommUpdateData>();
531+
comm.on_msg((msg) => {
532+
if (
533+
(msg.parent_header as any).msg_id === msg_id &&
534+
msg.header.msg_type === 'comm_msg' &&
535+
msg.content.data.method === 'update'
536+
) {
537+
const data = msg.content.data as any;
538+
const buffer_paths = data.buffer_paths || [];
539+
const buffers = msg.buffers || [];
540+
utils.put_buffers(data.state, buffer_paths, buffers);
541+
info.resolve({ comm, msg });
542+
}
543+
});
544+
msg_id = comm.send(
545+
{
546+
method: 'request_state',
547+
},
548+
this.callbacks(undefined)
549+
);
550+
551+
return info.promise;
552+
}
553+
})
554+
);
555+
556+
// We put in a synchronization barrier here so that we don't have to
557+
// topologically sort the restored widgets. `new_model` synchronously
558+
// registers the widget ids before reconstructing their state
559+
// asynchronously, so promises to every widget reference should be available
560+
// by the time they are used.
561+
await Promise.all(
562+
widgets_info.map(async (widget_info) => {
563+
if (!widget_info) {
564+
return;
565+
}
566+
const content = widget_info.msg.content as any;
567+
await this.new_model(
568+
{
569+
model_name: content.data.state._model_name,
570+
model_module: content.data.state._model_module,
571+
model_module_version: content.data.state._model_module_version,
572+
comm: widget_info.comm,
573+
},
574+
content.data.state
575+
);
576+
})
577+
);
365578
}
366579

367580
/**
@@ -586,3 +799,13 @@ function serialize_state(models: WidgetModel[], options: IStateOptions = {}) {
586799
});
587800
return {version_major: 2, version_minor: 0, state: state};
588801
}
802+
803+
namespace Private {
804+
/**
805+
* Data promised when a comm info request resolves.
806+
*/
807+
export interface ICommUpdateData {
808+
comm: IClassicComm;
809+
msg: services.KernelMessage.ICommMsgMsg;
810+
}
811+
}

0 commit comments

Comments
 (0)