-
Notifications
You must be signed in to change notification settings - Fork 346
Add stream_ordering
sort to Sliding Sync /sync
#17293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
MadLittleMods
merged 34 commits into
develop
from
madlittlemods/msc3575-sliding-sync-sort
Jun 17, 2024
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
ce45cc1
Sliding sync sort stub
MadLittleMods 75b701f
Add changelog
MadLittleMods c8a240f
Prefer `? < a AND a <= ?`
MadLittleMods 2a82ac0
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods 2e1d142
Add actual guranteed order for UNION
MadLittleMods b1af992
Some clean-up
MadLittleMods 901ce62
Try to better explain why
MadLittleMods 87ad458
Fix `get_last_event_in_room_before_stream_ordering(...)` not finding …
MadLittleMods 431b31e
Add actual guranteed order for UNION
MadLittleMods d7f40ae
Try to better explain why
MadLittleMods a8056ae
Add changelog
MadLittleMods 3f317a9
We're actually using sub-query syntax so we can ORDER each query
MadLittleMods 54bdc0c
Fix invalid syntax
MadLittleMods 4d585b6
Merge branch 'develop' into madlittlemods/fix-and-tests-for-get_last_…
MadLittleMods 42f24de
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods 03547b0
Merge branch 'madlittlemods/fix-and-tests-for-get_last_event_in_room_…
MadLittleMods c94550d
Add `event.internal_metadata.instance_name` and event position to `ge…
MadLittleMods afb6627
Add rust changes for `event.internal_metadata.instance_name`
MadLittleMods af60f7b
First pass on `sort_rooms` and refactor to include room membership al…
MadLittleMods bd49c34
Add some tests
MadLittleMods 5060588
Fix newly_left not being added back if we returned early (when `membe…
MadLittleMods 5243a30
Fix ban test case
MadLittleMods d5929f1
Adjust wording
MadLittleMods 8935c6c
Fix lints
MadLittleMods 93aa4ff
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods 185e0b5
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods 8244b25
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods 35808b3
Fix filtering
MadLittleMods a917eda
No more sort option
MadLittleMods 84eaeea
Add rest test
MadLittleMods 99ed012
Update changelog
MadLittleMods 7d80418
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-sort
MadLittleMods ef92f3c
Stable sort with just `stream_ordering`
MadLittleMods 63ff8f9
Rename to `get_last_event_pos_in_room_before_stream_ordering(...)`
MadLittleMods File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,20 @@ | |
# | ||
# | ||
import logging | ||
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional | ||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple | ||
|
||
from immutabledict import immutabledict | ||
|
||
from synapse.api.constants import AccountDataTypes, Membership | ||
from synapse.events import EventBase | ||
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID | ||
from synapse.storage.roommember import RoomsForUser | ||
from synapse.types import ( | ||
PersistedEventPosition, | ||
Requester, | ||
RoomStreamToken, | ||
StreamToken, | ||
UserID, | ||
) | ||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -33,6 +40,27 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser: | ||
""" | ||
Quick helper to convert an event to a `RoomsForUser` object. | ||
""" | ||
# These fields should be present for all persisted events | ||
assert event.internal_metadata.stream_ordering is not None | ||
assert event.internal_metadata.instance_name is not None | ||
|
||
return RoomsForUser( | ||
room_id=event.room_id, | ||
sender=event.sender, | ||
membership=event.membership, | ||
event_id=event.event_id, | ||
event_pos=PersistedEventPosition( | ||
event.internal_metadata.instance_name, | ||
event.internal_metadata.stream_ordering, | ||
), | ||
room_version_id=event.room_version.identifier, | ||
) | ||
|
||
|
||
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool: | ||
""" | ||
Returns True if the membership event should be included in the sync response, | ||
|
@@ -169,26 +197,28 @@ async def current_sync_for_user( | |
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
|
||
# Get all of the room IDs that the user should be able to see in the sync | ||
# response | ||
room_id_set = await self.get_sync_room_ids_for_user( | ||
sync_config.user, | ||
from_token=from_token, | ||
to_token=to_token, | ||
) | ||
|
||
# Assemble sliding window lists | ||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} | ||
if sync_config.lists: | ||
# Get all of the room IDs that the user should be able to see in the sync | ||
# response | ||
sync_room_map = await self.get_sync_room_ids_for_user( | ||
sync_config.user, | ||
from_token=from_token, | ||
to_token=to_token, | ||
) | ||
|
||
for list_key, list_config in sync_config.lists.items(): | ||
# Apply filters | ||
filtered_room_ids = room_id_set | ||
filtered_sync_room_map = sync_room_map | ||
if list_config.filters is not None: | ||
filtered_room_ids = await self.filter_rooms( | ||
sync_config.user, room_id_set, list_config.filters, to_token | ||
filtered_sync_room_map = await self.filter_rooms( | ||
sync_config.user, sync_room_map, list_config.filters, to_token | ||
) | ||
# TODO: Apply sorts | ||
sorted_room_ids = sorted(filtered_room_ids) | ||
|
||
sorted_room_info = await self.sort_rooms( | ||
filtered_sync_room_map, to_token | ||
) | ||
|
||
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] | ||
if list_config.ranges: | ||
|
@@ -197,12 +227,17 @@ async def current_sync_for_user( | |
SlidingSyncResult.SlidingWindowList.Operation( | ||
op=OperationType.SYNC, | ||
range=range, | ||
room_ids=sorted_room_ids[range[0] : range[1]], | ||
room_ids=[ | ||
room_id | ||
for room_id, _ in sorted_room_info[ | ||
range[0] : range[1] | ||
] | ||
], | ||
) | ||
) | ||
|
||
lists[list_key] = SlidingSyncResult.SlidingWindowList( | ||
count=len(sorted_room_ids), | ||
count=len(sorted_room_info), | ||
ops=ops, | ||
) | ||
|
||
|
@@ -219,7 +254,7 @@ async def get_sync_room_ids_for_user( | |
user: UserID, | ||
to_token: StreamToken, | ||
from_token: Optional[StreamToken] = None, | ||
) -> AbstractSet[str]: | ||
) -> Dict[str, RoomsForUser]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated things so instead of just a list of room ID's, we now also return the corresponding membership information for the user in the room. This way we can share the membership info in the filtering/sorting, and soon to be room data. |
||
""" | ||
Fetch room IDs that should be listed for this user in the sync response (the | ||
full room list that will be filtered, sorted, and sliced). | ||
|
@@ -237,11 +272,14 @@ async def get_sync_room_ids_for_user( | |
to tell when a room was forgotten at the moment so we can't factor it into the | ||
from/to range. | ||
|
||
|
||
Args: | ||
user: User to fetch rooms for | ||
to_token: The token to fetch rooms up to. | ||
from_token: The point in the stream to sync from. | ||
|
||
Returns: | ||
A dictionary of room IDs that should be listed in the sync response along | ||
with membership information in that room at the time of `to_token`. | ||
""" | ||
user_id = user.to_string() | ||
|
||
|
@@ -261,11 +299,11 @@ async def get_sync_room_ids_for_user( | |
|
||
# If the user has never joined any rooms before, we can just return an empty list | ||
if not room_for_user_list: | ||
return set() | ||
return {} | ||
|
||
# Our working list of rooms that can show up in the sync response | ||
sync_room_id_set = { | ||
room_for_user.room_id | ||
room_for_user.room_id: room_for_user | ||
for room_for_user in room_for_user_list | ||
if filter_membership_for_sync( | ||
membership=room_for_user.membership, | ||
|
@@ -415,7 +453,9 @@ async def get_sync_room_ids_for_user( | |
not was_last_membership_already_included | ||
and should_prev_membership_be_included | ||
): | ||
sync_room_id_set.add(room_id) | ||
sync_room_id_set[room_id] = convert_event_to_rooms_for_user( | ||
last_membership_change_after_to_token | ||
) | ||
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token` | ||
# | ||
# For example, if the last membership event after the `to_token` is a "join" | ||
|
@@ -426,7 +466,7 @@ async def get_sync_room_ids_for_user( | |
was_last_membership_already_included | ||
and not should_prev_membership_be_included | ||
): | ||
sync_room_id_set.discard(room_id) | ||
del sync_room_id_set[room_id] | ||
|
||
# 2) ----------------------------------------------------- | ||
# We fix-up newly_left rooms after the first fixup because it may have removed | ||
|
@@ -461,25 +501,32 @@ async def get_sync_room_ids_for_user( | |
# include newly_left rooms because the last event that the user should see | ||
# is their own leave event | ||
if last_membership_change_in_from_to_range.membership == Membership.LEAVE: | ||
sync_room_id_set.add(room_id) | ||
sync_room_id_set[room_id] = convert_event_to_rooms_for_user( | ||
last_membership_change_in_from_to_range | ||
) | ||
|
||
return sync_room_id_set | ||
|
||
async def filter_rooms( | ||
self, | ||
user: UserID, | ||
room_id_set: AbstractSet[str], | ||
sync_room_map: Dict[str, RoomsForUser], | ||
filters: SlidingSyncConfig.SlidingSyncList.Filters, | ||
to_token: StreamToken, | ||
) -> AbstractSet[str]: | ||
) -> Dict[str, RoomsForUser]: | ||
""" | ||
Filter rooms based on the sync request. | ||
|
||
Args: | ||
user: User to filter rooms for | ||
room_id_set: Set of room IDs to filter down | ||
sync_room_map: Dictionary of room IDs to sort along with membership | ||
information in the room at the time of `to_token`. | ||
filters: Filters to apply | ||
to_token: We filter based on the state of the room at this token | ||
|
||
Returns: | ||
A filtered dictionary of room IDs along with membership information in the | ||
room at the time of `to_token`. | ||
""" | ||
user_id = user.to_string() | ||
|
||
|
@@ -488,7 +535,7 @@ async def filter_rooms( | |
# TODO: Exclude partially stated rooms unless the `required_state` has | ||
# `["m.room.member", "$LAZY"]` | ||
|
||
filtered_room_id_set = set(room_id_set) | ||
filtered_room_id_set = set(sync_room_map.keys()) | ||
|
||
# Filter for Direct-Message (DM) rooms | ||
if filters.is_dm is not None: | ||
|
@@ -544,4 +591,57 @@ async def filter_rooms( | |
if filters.not_tags: | ||
raise NotImplementedError() | ||
|
||
return filtered_room_id_set | ||
# Assemble a new sync room map but only with the `filtered_room_id_set` | ||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} | ||
|
||
async def sort_rooms( | ||
self, | ||
sync_room_map: Dict[str, RoomsForUser], | ||
to_token: StreamToken, | ||
) -> List[Tuple[str, RoomsForUser]]: | ||
""" | ||
Sort by `stream_ordering` of the last event that the user should see in the | ||
room. `stream_ordering` is unique so we get a stable sort. | ||
|
||
Args: | ||
sync_room_map: Dictionary of room IDs to sort along with membership | ||
information in the room at the time of `to_token`. | ||
to_token: We sort based on the events in the room at this token (<= `to_token`) | ||
|
||
Returns: | ||
A sorted list of room IDs by `stream_ordering` along with membership information. | ||
""" | ||
|
||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the | ||
# user should see in the room (<= `to_token`) | ||
last_activity_in_room_map: Dict[str, int] = {} | ||
for room_id, room_for_user in sync_room_map.items(): | ||
# If they are fully-joined to the room, let's find the latest activity | ||
# at/before the `to_token`. | ||
if room_for_user.membership == Membership.JOIN: | ||
last_event_result = ( | ||
await self.store.get_last_event_pos_in_room_before_stream_ordering( | ||
room_id, to_token.room_key | ||
) | ||
) | ||
|
||
# If the room has no events at/before the `to_token`, this is probably a | ||
# mistake in the code that generates the `sync_room_map` since that should | ||
# only give us rooms that the user had membership in during the token range. | ||
assert last_event_result is not None | ||
|
||
_, event_pos = last_event_result | ||
|
||
last_activity_in_room_map[room_id] = event_pos.stream | ||
else: | ||
# Otherwise, if the user has left/been invited/knocked/been banned from | ||
# a room, they shouldn't see anything past that point. | ||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream | ||
|
||
return sorted( | ||
sync_room_map.items(), | ||
# Sort by the last activity (stream_ordering) in the room | ||
key=lambda room_info: last_activity_in_room_map[room_info[0]], | ||
# We want descending order | ||
reverse=True, | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.