-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathybasedoc.py
More file actions
77 lines (59 loc) · 1.7 KB
/
ybasedoc.py
File metadata and controls
77 lines (59 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Optional
from pycrdt import Doc, Map
class YBaseDoc(ABC):
def __init__(self, ydoc: Optional[Doc] = None):
if ydoc is None:
self._ydoc = Doc()
else:
self._ydoc = ydoc
self._ystate = Map()
self._ydoc["state"] = self._ystate
self._subscriptions: Dict[Any, str] = {}
@property
@abstractmethod
def version(self) -> str:
...
@property
def ystate(self) -> Map:
return self._ystate
@property
def ydoc(self) -> Doc:
return self._ydoc
@property
def source(self) -> Any:
return self.get()
@source.setter
def source(self, value: Any):
return self.set(value)
@property
def dirty(self) -> Optional[bool]:
return self._ystate.get("dirty")
@dirty.setter
def dirty(self, value: bool) -> None:
self._ystate["dirty"] = value
@property
def path(self) -> Optional[str]:
return self._ystate.get("path")
@path.setter
def path(self, value: str) -> None:
self._ystate["path"] = value
@property
def file_id(self) -> Optional[str]:
return self._ystate.get("file_id")
@file_id.setter
def file_id(self, value: str) -> None:
self._ystate["file_id"] = value
@abstractmethod
def get(self) -> Any:
...
@abstractmethod
def set(self, value: Any) -> None:
...
@abstractmethod
def observe(self, callback: Callable[[str, Any], None]) -> None:
...
def unobserve(self) -> None:
for k, v in self._subscriptions.items():
k.unobserve(v)
self._subscriptions = {}