1
1
# -*- coding: utf-8 -*-
2
- # Copyright 2015 OpenMarket Ltd
3
- # Copyright 2018 New Vector Ltd
2
+ # Copyright 2015-2020 The Matrix.org Foundation C.I.C.
4
3
#
5
4
# Licensed under the Apache License, Version 2.0 (the "License");
6
5
# you may not use this file except in compliance with the License.
15
14
# limitations under the License.
16
15
17
16
import logging
17
+ from typing import Dict , Tuple
18
18
19
+ import attr
19
20
from canonicaljson import json
20
21
21
22
from twisted .internet import defer
36
37
]
37
38
38
39
40
+ @attr .s
41
+ class EventPushSummary :
42
+ """Summary of pending event push actions for a given user in a given room."""
43
+
44
+ unread_count = attr .ib (type = int )
45
+ stream_ordering = attr .ib (type = int )
46
+ old_user_id = attr .ib (type = str )
47
+ notif_count = attr .ib (type = int )
48
+
49
+
39
50
def _serialize_action (actions , is_highlight ):
40
51
"""Custom serializer for actions. This allows us to "compress" common actions.
41
52
@@ -122,32 +133,50 @@ def _get_unread_counts_by_receipt_txn(
122
133
123
134
def _get_unread_counts_by_pos_txn (self , txn , room_id , user_id , stream_ordering ):
124
135
125
- # First get number of notifications.
126
- # We don't need to put a notif=1 clause as all rows always have
127
- # notif=1
136
+ # First get number of actions, grouped on whether the action notifies.
128
137
sql = (
129
- "SELECT count(*)"
138
+ "SELECT count(*), notif "
130
139
" FROM event_push_actions ea"
131
140
" WHERE"
132
141
" user_id = ?"
133
142
" AND room_id = ?"
134
143
" AND stream_ordering > ?"
144
+ " GROUP BY notif"
135
145
)
136
-
137
146
txn .execute (sql , (user_id , room_id , stream_ordering ))
138
- row = txn .fetchone ()
139
- notify_count = row [0 ] if row else 0
147
+ rows = txn .fetchall ()
148
+
149
+ # We should get a maximum number of two rows: one for notif = 0, which is the
150
+ # number of actions that contribute to the unread_count but not to the
151
+ # notify_count, and one for notif = 1, which is the number of actions that
152
+ # contribute to both counters. If one or both rows don't appear, then the
153
+ # value for the matching counter should be 0.
154
+ unread_count = 0
155
+ notify_count = 0
156
+ for row in rows :
157
+ # We always increment unread_count because actions that notify also
158
+ # contribute to it.
159
+ unread_count += row [0 ]
160
+ if row [1 ] == 1 :
161
+ notify_count = row [0 ]
162
+ elif row [1 ] != 0 :
163
+ logger .warning (
164
+ "Unexpected value %d for column 'notif' in table"
165
+ " 'event_push_actions'" ,
166
+ row [1 ],
167
+ )
140
168
141
169
txn .execute (
142
170
"""
143
- SELECT notif_count FROM event_push_summary
171
+ SELECT notif_count, unread_count FROM event_push_summary
144
172
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
145
173
""" ,
146
174
(room_id , user_id , stream_ordering ),
147
175
)
148
176
rows = txn .fetchall ()
149
177
if rows :
150
178
notify_count += rows [0 ][0 ]
179
+ unread_count += rows [0 ][1 ]
151
180
152
181
# Now get the number of highlights
153
182
sql = (
@@ -164,7 +193,11 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
164
193
row = txn .fetchone ()
165
194
highlight_count = row [0 ] if row else 0
166
195
167
- return {"notify_count" : notify_count , "highlight_count" : highlight_count }
196
+ return {
197
+ "unread_count" : unread_count ,
198
+ "notify_count" : notify_count ,
199
+ "highlight_count" : highlight_count ,
200
+ }
168
201
169
202
@defer .inlineCallbacks
170
203
def get_push_action_users_in_range (self , min_stream_ordering , max_stream_ordering ):
@@ -222,6 +255,7 @@ def get_after_receipt(txn):
222
255
" AND ep.user_id = ?"
223
256
" AND ep.stream_ordering > ?"
224
257
" AND ep.stream_ordering <= ?"
258
+ " AND ep.notif = 1"
225
259
" ORDER BY ep.stream_ordering ASC LIMIT ?"
226
260
)
227
261
args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -250,6 +284,7 @@ def get_no_receipt(txn):
250
284
" AND ep.user_id = ?"
251
285
" AND ep.stream_ordering > ?"
252
286
" AND ep.stream_ordering <= ?"
287
+ " AND ep.notif = 1"
253
288
" ORDER BY ep.stream_ordering ASC LIMIT ?"
254
289
)
255
290
args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -322,6 +357,7 @@ def get_after_receipt(txn):
322
357
" AND ep.user_id = ?"
323
358
" AND ep.stream_ordering > ?"
324
359
" AND ep.stream_ordering <= ?"
360
+ " AND ep.notif = 1"
325
361
" ORDER BY ep.stream_ordering DESC LIMIT ?"
326
362
)
327
363
args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -350,6 +386,7 @@ def get_no_receipt(txn):
350
386
" AND ep.user_id = ?"
351
387
" AND ep.stream_ordering > ?"
352
388
" AND ep.stream_ordering <= ?"
389
+ " AND ep.notif = 1"
353
390
" ORDER BY ep.stream_ordering DESC LIMIT ?"
354
391
)
355
392
args = [user_id , user_id , min_stream_ordering , max_stream_ordering , limit ]
@@ -399,7 +436,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
399
436
def _get_if_maybe_push_in_range_for_user_txn (txn ):
400
437
sql = """
401
438
SELECT 1 FROM event_push_actions
402
- WHERE user_id = ? AND stream_ordering > ?
439
+ WHERE user_id = ? AND stream_ordering > ? AND notif = 1
403
440
LIMIT 1
404
441
"""
405
442
@@ -428,14 +465,15 @@ def add_push_actions_to_staging(self, event_id, user_id_actions):
428
465
return
429
466
430
467
# This is a helper function for generating the necessary tuple that
431
- # can be used to inert into the `event_push_actions_staging` table.
468
+ # can be used to insert into the `event_push_actions_staging` table.
432
469
def _gen_entry (user_id , actions ):
433
470
is_highlight = 1 if _action_has_highlight (actions ) else 0
471
+ notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
434
472
return (
435
473
event_id , # event_id column
436
474
user_id , # user_id column
437
475
_serialize_action (actions , is_highlight ), # actions column
438
- 1 , # notif column
476
+ notif , # notif column
439
477
is_highlight , # highlight column
440
478
)
441
479
@@ -817,24 +855,51 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
817
855
# Calculate the new counts that should be upserted into event_push_summary
818
856
sql = """
819
857
SELECT user_id, room_id,
820
- coalesce(old.notif_count , 0) + upd.notif_count ,
858
+ coalesce(old.%s , 0) + upd.cnt ,
821
859
upd.stream_ordering,
822
860
old.user_id
823
861
FROM (
824
- SELECT user_id, room_id, count(*) as notif_count ,
862
+ SELECT user_id, room_id, count(*) as cnt ,
825
863
max(stream_ordering) as stream_ordering
826
864
FROM event_push_actions
827
865
WHERE ? <= stream_ordering AND stream_ordering < ?
828
866
AND highlight = 0
867
+ %s
829
868
GROUP BY user_id, room_id
830
869
) AS upd
831
870
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
832
871
"""
833
872
834
- txn .execute (sql , (old_rotate_stream_ordering , rotate_to_stream_ordering ))
835
- rows = txn .fetchall ()
873
+ # First get the count of unread messages.
874
+ txn .execute (
875
+ sql % ("unread_count" , "" ),
876
+ (old_rotate_stream_ordering , rotate_to_stream_ordering ),
877
+ )
836
878
837
- logger .info ("Rotating notifications, handling %d rows" , len (rows ))
879
+ # We need to merge both lists into a single object because we might not have the
880
+ # same amount of rows in each of them. In this case we use a dict indexed on the
881
+ # user ID and room ID to make it easier to populate.
882
+ summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
883
+ for row in txn :
884
+ summaries [(row [0 ], row [1 ])] = EventPushSummary (
885
+ unread_count = row [2 ],
886
+ stream_ordering = row [3 ],
887
+ old_user_id = row [4 ],
888
+ notif_count = 0 ,
889
+ )
890
+
891
+ # Then get the count of notifications.
892
+ txn .execute (
893
+ sql % ("notif_count" , "AND notif = 1" ),
894
+ (old_rotate_stream_ordering , rotate_to_stream_ordering ),
895
+ )
896
+
897
+ # notif_rows is populated based on a subset of the query used to populate
898
+ # unread_rows, so we can be sure that there will be no KeyError here.
899
+ for row in txn :
900
+ summaries [(row [0 ], row [1 ])].notif_count = row [2 ]
901
+
902
+ logger .info ("Rotating notifications, handling %d rows" , len (summaries ))
838
903
839
904
# If the `old.user_id` above is NULL then we know there isn't already an
840
905
# entry in the table, so we simply insert it. Otherwise we update the
@@ -844,22 +909,34 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
844
909
table = "event_push_summary" ,
845
910
values = [
846
911
{
847
- "user_id" : row [0 ],
848
- "room_id" : row [1 ],
849
- "notif_count" : row [2 ],
850
- "stream_ordering" : row [3 ],
912
+ "user_id" : user_id ,
913
+ "room_id" : room_id ,
914
+ "notif_count" : summary .notif_count ,
915
+ "unread_count" : summary .unread_count ,
916
+ "stream_ordering" : summary .stream_ordering ,
851
917
}
852
- for row in rows
853
- if row [ 4 ] is None
918
+ for (( user_id , room_id ), summary ) in summaries . items ()
919
+ if summary . old_user_id is None
854
920
],
855
921
)
856
922
857
923
txn .executemany (
858
924
"""
859
- UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
925
+ UPDATE event_push_summary
926
+ SET notif_count = ?, unread_count = ?, stream_ordering = ?
860
927
WHERE user_id = ? AND room_id = ?
861
928
""" ,
862
- ((row [2 ], row [3 ], row [0 ], row [1 ]) for row in rows if row [4 ] is not None ),
929
+ (
930
+ (
931
+ summary .notif_count ,
932
+ summary .unread_count ,
933
+ summary .stream_ordering ,
934
+ user_id ,
935
+ room_id ,
936
+ )
937
+ for ((user_id , room_id ), summary ) in summaries .items ()
938
+ if summary .old_user_id is not None
939
+ ),
863
940
)
864
941
865
942
txn .execute (
0 commit comments