8
8
import sqlite3
9
9
from collections import defaultdict
10
10
11
+
11
12
class SignCache (CacheBase ):
12
13
# increment this every time there are changes to table schema or data model
13
14
VERSION : int = 1
15
+
14
16
def __init__ (self , store_path : Path , org_name : str ) -> None :
15
17
sqlite3 .register_adapter (DetailedUserInfo , adapt_user )
16
18
sqlite3 .register_converter ("detailed_user_info" , convert_user )
@@ -33,15 +35,15 @@ def __init__(self, store_path: Path, org_name: str) -> None:
33
35
self .init_meta ()
34
36
self .should_refresh = True
35
37
super ().__init__ ()
36
-
38
+
37
39
def rebuild_tables (self ):
38
40
self .db_conn .execute ("drop table users" )
39
41
self .db_conn .execute ("drop table groups" )
40
42
self .db_conn .execute ("drop table user_groups" )
41
43
for s in [sign_users_schema , sign_groups_schema , sign_user_groups_schema ]:
42
44
self .db_conn .execute (s )
43
45
self .db_conn .commit ()
44
-
46
+
45
47
def clear_all (self ):
46
48
self .db_conn .execute ("delete from users" )
47
49
self .db_conn .execute ("delete from groups" )
@@ -55,7 +57,7 @@ def cache_user(self, user: DetailedUserInfo):
55
57
def update_user (self , user : DetailedUserInfo ):
56
58
self .db_conn .execute ("update users set user = ? where id = ?" , (user , user .id ))
57
59
self .db_conn .commit ()
58
-
60
+
59
61
def get_users (self ) -> list [DetailedUserInfo ]:
60
62
cur = self .db_conn .cursor ()
61
63
cur .execute ("select user from users" )
@@ -82,7 +84,7 @@ def cache_group(self, group: GroupInfo):
82
84
def delete_group (self , group : GroupInfo ):
83
85
self .db_conn .execute ("delete from groups where id = ?" , (group .groupId , ))
84
86
self .db_conn .commit ()
85
-
87
+
86
88
def get_groups (self ) -> list [GroupInfo ]:
87
89
cur = self .db_conn .cursor ()
88
90
cur .execute ("select group_info from groups" )
@@ -91,7 +93,7 @@ def get_groups(self) -> list[GroupInfo]:
91
93
def cache_user_group (self , user_id : str , user_group : UserGroupInfo ):
92
94
self .db_conn .execute ("insert into user_groups(user_id, user_group) values (?,?)" , (user_id , user_group ))
93
95
self .db_conn .commit ()
94
-
96
+
95
97
def get_user_groups (self ) -> list [tuple [str , list [UserGroupInfo ]]]:
96
98
groups_by_user = defaultdict (list )
97
99
cur = self .db_conn .cursor ()
@@ -106,20 +108,26 @@ def update_user_groups(self, user_id: str, user_groups: list[UserGroupInfo]):
106
108
for user_group in user_groups :
107
109
self .cache_user_group (user_id , user_group )
108
110
111
+
109
112
def adapt_user (user : DetailedUserInfo ) -> str :
110
113
return json .dumps (user .__dict__ , cls = JSONEncoder ).encode ('ascii' )
111
114
115
+
112
116
def convert_user (s : str ) -> DetailedUserInfo :
113
117
return DetailedUserInfo .from_dict (json .loads (s ))
114
118
119
+
115
120
def adapt_group (group : GroupInfo ) -> str :
116
121
return json .dumps (group .__dict__ , cls = JSONEncoder ).encode ('ascii' )
117
122
123
+
118
124
def convert_group (s : str ) -> GroupInfo :
119
125
return GroupInfo .from_dict (json .loads (s ))
120
126
127
+
121
128
def adapt_user_group (user_group : UserGroupInfo ) -> str :
122
129
return json .dumps (user_group .__dict__ , cls = JSONEncoder ).encode ('ascii' )
123
130
131
+
124
132
def convert_user_group (s : str ) -> UserGroupInfo :
125
133
return UserGroupInfo .from_dict (json .loads (s ))
0 commit comments