15
15
# limitations under the License.
16
16
17
17
import logging
18
- from typing import List , Tuple
18
+ from typing import Dict , List , Tuple
19
19
20
20
from canonicaljson import json
21
21
22
- from twisted .internet import defer
23
-
24
22
from synapse .storage ._base import db_to_json
25
23
from synapse .storage .databases .main .account_data import AccountDataWorkerStore
24
+ from synapse .types import JsonDict
26
25
from synapse .util .caches .descriptors import cached
27
26
28
27
logger = logging .getLogger (__name__ )
29
28
30
29
31
30
class TagsWorkerStore (AccountDataWorkerStore ):
32
31
@cached ()
33
- def get_tags_for_user (self , user_id ) :
32
+ async def get_tags_for_user (self , user_id : str ) -> Dict [ str , Dict [ str , JsonDict ]] :
34
33
"""Get all the tags for a user.
35
34
36
35
37
36
Args:
38
- user_id(str) : The user to get the tags for.
37
+ user_id: The user to get the tags for.
39
38
Returns:
40
- A deferred dict mapping from room_id strings to dicts mapping from
41
- tag strings to tag content.
39
+ A mapping from room_id strings to dicts mapping from tag strings to
40
+ tag content.
42
41
"""
43
42
44
- deferred = self .db_pool .simple_select_list (
43
+ rows = await self .db_pool .simple_select_list (
45
44
"room_tags" , {"user_id" : user_id }, ["room_id" , "tag" , "content" ]
46
45
)
47
46
48
- @deferred .addCallback
49
- def tags_by_room (rows ):
50
- tags_by_room = {}
51
- for row in rows :
52
- room_tags = tags_by_room .setdefault (row ["room_id" ], {})
53
- room_tags [row ["tag" ]] = db_to_json (row ["content" ])
54
- return tags_by_room
55
-
56
- return deferred
47
+ tags_by_room = {}
48
+ for row in rows :
49
+ room_tags = tags_by_room .setdefault (row ["room_id" ], {})
50
+ room_tags [row ["tag" ]] = db_to_json (row ["content" ])
51
+ return tags_by_room
57
52
58
53
async def get_all_updated_tags (
59
54
self , instance_name : str , last_id : int , current_id : int , limit : int
@@ -127,17 +122,19 @@ def get_tag_content(txn, tag_ids):
127
122
128
123
return results , upto_token , limited
129
124
130
- @defer .inlineCallbacks
131
- def get_updated_tags (self , user_id , stream_id ):
125
+ async def get_updated_tags (
126
+ self , user_id : str , stream_id : int
127
+ ) -> Dict [str , List [str ]]:
132
128
"""Get all the tags for the rooms where the tags have changed since the
133
129
given version
134
130
135
131
Args:
136
132
user_id(str): The user to get the tags for.
137
133
stream_id(int): The earliest update to get for the user.
134
+
138
135
Returns:
139
- A deferred dict mapping from room_id strings to lists of tag
140
- strings for all the rooms that changed since the stream_id token.
136
+ A mapping from room_id strings to lists of tag strings for all the
137
+ rooms that changed since the stream_id token.
141
138
"""
142
139
143
140
def get_updated_tags_txn (txn ):
@@ -155,47 +152,53 @@ def get_updated_tags_txn(txn):
155
152
if not changed :
156
153
return {}
157
154
158
- room_ids = yield self .db_pool .runInteraction (
155
+ room_ids = await self .db_pool .runInteraction (
159
156
"get_updated_tags" , get_updated_tags_txn
160
157
)
161
158
162
159
results = {}
163
160
if room_ids :
164
- tags_by_room = yield self .get_tags_for_user (user_id )
161
+ tags_by_room = await self .get_tags_for_user (user_id )
165
162
for room_id in room_ids :
166
163
results [room_id ] = tags_by_room .get (room_id , {})
167
164
168
165
return results
169
166
170
- def get_tags_for_room (self , user_id , room_id ):
167
+ async def get_tags_for_room (
168
+ self , user_id : str , room_id : str
169
+ ) -> Dict [str , JsonDict ]:
171
170
"""Get all the tags for the given room
171
+
172
172
Args:
173
- user_id(str): The user to get tags for
174
- room_id(str): The room to get tags for
173
+ user_id: The user to get tags for
174
+ room_id: The room to get tags for
175
+
175
176
Returns:
176
- A deferred list of string tags.
177
+ A mapping of tags to tag content .
177
178
"""
178
- return self .db_pool .simple_select_list (
179
+ rows = await self .db_pool .simple_select_list (
179
180
table = "room_tags" ,
180
181
keyvalues = {"user_id" : user_id , "room_id" : room_id },
181
182
retcols = ("tag" , "content" ),
182
183
desc = "get_tags_for_room" ,
183
- ).addCallback (
184
- lambda rows : {row ["tag" ]: db_to_json (row ["content" ]) for row in rows }
185
184
)
185
+ return {row ["tag" ]: db_to_json (row ["content" ]) for row in rows }
186
186
187
187
188
188
class TagsStore (TagsWorkerStore ):
189
- @defer .inlineCallbacks
190
- def add_tag_to_room (self , user_id , room_id , tag , content ):
189
+ async def add_tag_to_room (
190
+ self , user_id : str , room_id : str , tag : str , content : JsonDict
191
+ ) -> int :
191
192
"""Add a tag to a room for a user.
193
+
192
194
Args:
193
- user_id(str): The user to add a tag for.
194
- room_id(str): The room to add a tag for.
195
- tag(str): The tag name to add.
196
- content(dict): A json object to associate with the tag.
195
+ user_id: The user to add a tag for.
196
+ room_id: The room to add a tag for.
197
+ tag: The tag name to add.
198
+ content: A json object to associate with the tag.
199
+
197
200
Returns:
198
- A deferred that completes once the tag has been added .
201
+ The next account data ID .
199
202
"""
200
203
content_json = json .dumps (content )
201
204
@@ -209,18 +212,17 @@ def add_tag_txn(txn, next_id):
209
212
self ._update_revision_txn (txn , user_id , room_id , next_id )
210
213
211
214
with self ._account_data_id_gen .get_next () as next_id :
212
- yield self .db_pool .runInteraction ("add_tag" , add_tag_txn , next_id )
215
+ await self .db_pool .runInteraction ("add_tag" , add_tag_txn , next_id )
213
216
214
217
self .get_tags_for_user .invalidate ((user_id ,))
215
218
216
- result = self ._account_data_id_gen .get_current_token ()
217
- return result
219
+ return self ._account_data_id_gen .get_current_token ()
218
220
219
- @defer .inlineCallbacks
220
- def remove_tag_from_room (self , user_id , room_id , tag ):
221
+ async def remove_tag_from_room (self , user_id : str , room_id : str , tag : str ) -> int :
221
222
"""Remove a tag from a room for a user.
223
+
222
224
Returns:
223
- A deferred that completes once the tag has been removed
225
+ The next account data ID.
224
226
"""
225
227
226
228
def remove_tag_txn (txn , next_id ):
@@ -232,21 +234,22 @@ def remove_tag_txn(txn, next_id):
232
234
self ._update_revision_txn (txn , user_id , room_id , next_id )
233
235
234
236
with self ._account_data_id_gen .get_next () as next_id :
235
- yield self .db_pool .runInteraction ("remove_tag" , remove_tag_txn , next_id )
237
+ await self .db_pool .runInteraction ("remove_tag" , remove_tag_txn , next_id )
236
238
237
239
self .get_tags_for_user .invalidate ((user_id ,))
238
240
239
- result = self ._account_data_id_gen .get_current_token ()
240
- return result
241
+ return self ._account_data_id_gen .get_current_token ()
241
242
242
- def _update_revision_txn (self , txn , user_id , room_id , next_id ):
243
+ def _update_revision_txn (
244
+ self , txn , user_id : str , room_id : str , next_id : int
245
+ ) -> None :
243
246
"""Update the latest revision of the tags for the given user and room.
244
247
245
248
Args:
246
249
txn: The database cursor
247
- user_id(str) : The ID of the user.
248
- room_id(str) : The ID of the room.
249
- next_id(int) : The the revision to advance to.
250
+ user_id: The ID of the user.
251
+ room_id: The ID of the room.
252
+ next_id: The the revision to advance to.
250
253
"""
251
254
252
255
txn .call_after (
0 commit comments