Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Back out MSC2625 implementation #7761

Merged
merged 3 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion changelog.d/7673.feature

This file was deleted.

1 change: 0 additions & 1 deletion changelog.d/7716.feature

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/7761.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unread messages count to sync responses.
3 changes: 0 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1893,9 +1893,6 @@ async def _generate_room_entry(
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
"unread_count"
]

sync_result_builder.joined.append(room_sync)

Expand Down
7 changes: 2 additions & 5 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,8 @@ def action_for_event_by_user(self, event, context):
)
if matches:
actions = [x for x in rule["actions"] if x != "dont_notify"]
if (
"notify" in actions
or "org.matrix.msc2625.mark_unread" in actions
):
# Push rules say we should act on this event.
if actions and "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
break

Expand Down
5 changes: 1 addition & 4 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
# We're populating this badge using the unread_count (instead of the
# notify_count) as this badge is the number of missed messages, not the
# number of missed notifications.
badge += 1 if notifs.get("unread_count") else 0
badge += 1 if notifs["notify_count"] else 0
return badge


Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/v1/push_rule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2020 The Matrix.org Foundation C.I.C.
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -267,7 +267,7 @@ def _check_actions(actions):
raise InvalidRuleException("No actions found")

for a in actions:
if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]:
if a in ["notify", "dont_notify", "coalesce"]:
pass
elif isinstance(a, dict) and "set_tweak" in a:
pass
Expand Down
133 changes: 28 additions & 105 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015-2020 The Matrix.org Foundation C.I.C.
# Copyright 2015 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,9 +15,7 @@
# limitations under the License.

import logging
from typing import Dict, Tuple

import attr
from canonicaljson import json

from twisted.internet import defer
Expand All @@ -37,16 +36,6 @@
]


@attr.s
class EventPushSummary:
"""Summary of pending event push actions for a given user in a given room."""

unread_count = attr.ib(type=int)
stream_ordering = attr.ib(type=int)
old_user_id = attr.ib(type=str)
notif_count = attr.ib(type=int)


def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.

Expand Down Expand Up @@ -123,7 +112,7 @@ def _get_unread_counts_by_receipt_txn(
txn.execute(sql, (room_id, last_read_event_id))
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0, "unread_count": 0}
return {"notify_count": 0, "highlight_count": 0}

stream_ordering = results[0][0]

Expand All @@ -133,50 +122,32 @@ def _get_unread_counts_by_receipt_txn(

def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

# First get number of actions, grouped on whether the action notifies.
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*), notif"
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
" GROUP BY notif"
)
txn.execute(sql, (user_id, room_id, stream_ordering))
rows = txn.fetchall()

# We should get a maximum number of two rows: one for notif = 0, which is the
# number of actions that contribute to the unread_count but not to the
# notify_count, and one for notif = 1, which is the number of actions that
# contribute to both counters. If one or both rows don't appear, then the
# value for the matching counter should be 0.
unread_count = 0
notify_count = 0
for row in rows:
# We always increment unread_count because actions that notify also
# contribute to it.
unread_count += row[0]
if row[1] == 1:
notify_count = row[0]
elif row[1] != 0:
logger.warning(
"Unexpected value %d for column 'notif' in table"
" 'event_push_actions'",
row[1],
)
txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0

txn.execute(
"""
SELECT notif_count, unread_count FROM event_push_summary
SELECT notif_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
unread_count += rows[0][1]

# Now get the number of highlights
sql = (
Expand All @@ -193,11 +164,7 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
row = txn.fetchone()
highlight_count = row[0] if row else 0

return {
"unread_count": unread_count,
"notify_count": notify_count,
"highlight_count": highlight_count,
}
return {"notify_count": notify_count, "highlight_count": highlight_count}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
Expand Down Expand Up @@ -255,7 +222,6 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -284,7 +250,6 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -357,7 +322,6 @@ def get_after_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -386,7 +350,6 @@ def get_no_receipt(txn):
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
Expand Down Expand Up @@ -436,7 +399,7 @@ def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
WHERE user_id = ? AND stream_ordering > ? AND notif = 1
WHERE user_id = ? AND stream_ordering > ?
LIMIT 1
"""

Expand Down Expand Up @@ -465,15 +428,14 @@ def add_push_actions_to_staging(self, event_id, user_id_actions):
return

# This is a helper function for generating the necessary tuple that
# can be used to insert into the `event_push_actions_staging` table.
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
notif, # notif column
1, # notif column
is_highlight, # highlight column
)

Expand Down Expand Up @@ -855,51 +817,24 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.%s, 0) + upd.cnt,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as cnt,
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
%s
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

# First get the count of unread messages.
txn.execute(
sql % ("unread_count", ""),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# We need to merge both lists into a single object because we might not have the
# same amount of rows in each of them. In this case we use a dict indexed on the
# user ID and room ID to make it easier to populate.
summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
for row in txn:
summaries[(row[0], row[1])] = EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
old_user_id=row[4],
notif_count=0,
)

# Then get the count of notifications.
txn.execute(
sql % ("notif_count", "AND notif = 1"),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# notif_rows is populated based on a subset of the query used to populate
# unread_rows, so we can be sure that there will be no KeyError here.
for row in txn:
summaries[(row[0], row[1])].notif_count = row[2]
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
rows = txn.fetchall()

logger.info("Rotating notifications, handling %d rows", len(summaries))
logger.info("Rotating notifications, handling %d rows", len(rows))

# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
Expand All @@ -909,34 +844,22 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
table="event_push_summary",
values=[
{
"user_id": user_id,
"room_id": room_id,
"notif_count": summary.notif_count,
"unread_count": summary.unread_count,
"stream_ordering": summary.stream_ordering,
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is None
for row in rows
if row[4] is None
],
)

txn.executemany(
"""
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
(
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
user_id,
room_id,
)
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is not None
),
((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
)

txn.execute(
Expand Down

This file was deleted.

19 changes: 3 additions & 16 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 0, "unread_count": 0},
{"highlight_count": 0, "notify_count": 0},
)

self.persist(
Expand All @@ -173,7 +173,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 1, "unread_count": 1},
{"highlight_count": 0, "notify_count": 1},
)

self.persist(
Expand All @@ -188,20 +188,7 @@ def test_push_actions_for_user(self):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2, "unread_count": 2},
)

self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2, "unread_count": 3},
{"highlight_count": 1, "notify_count": 2},
)

def test_get_rooms_for_user_with_stream_ordering(self):
Expand Down
Loading