Skip to content

Commit 261e746

Browse files
Sliding sync: Add classes for per-connection state (#17574)
This is some prep work ahead of correctly tracking receipts, where we will also want to track the room status in terms of last receipt we had sent down. Essentially, we add two classes `PerConnectionState` and a mutable version, and then operate on those. --------- Co-authored-by: Eric Eastwood <[email protected]>
1 parent 993644d commit 261e746

File tree

2 files changed

+190
-94
lines changed

2 files changed

+190
-94
lines changed

changelog.d/17574.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor per-connection state in experimental sliding sync handler.

synapse/handlers/sliding_sync.py

Lines changed: 189 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#
2020
import enum
2121
import logging
22+
import typing
23+
from collections import ChainMap
2224
from enum import Enum
2325
from itertools import chain
2426
from typing import (
@@ -30,11 +32,13 @@
3032
List,
3133
Literal,
3234
Mapping,
35+
MutableMapping,
3336
Optional,
3437
Sequence,
3538
Set,
3639
Tuple,
3740
Union,
41+
cast,
3842
)
3943

4044
import attr
@@ -571,21 +575,21 @@ async def current_sync_for_user(
571575
# See https://github.com/matrix-org/matrix-doc/issues/1144
572576
raise NotImplementedError()
573577

574-
if from_token:
575-
# Check that we recognize the connection position, if not tell the
576-
# clients that they need to start again.
577-
#
578-
# If we don't do this and the client asks for the full range of
579-
# rooms, we end up sending down all rooms and their state from
580-
# scratch (which can be very slow). By expiring the connection we
581-
# allow the client a chance to do an initial request with a smaller
582-
# range of rooms to get them some results sooner but will end up
583-
# taking the same amount of time (more with round-trips and
584-
# re-processing) in the end to get everything again.
585-
if not await self.connection_store.is_valid_token(
586-
sync_config, from_token.connection_position
587-
):
588-
raise SlidingSyncUnknownPosition()
578+
# Get the per-connection state (if any).
579+
#
580+
# Raises an exception if there is a `connection_position` that we don't
581+
# recognize. If we don't do this and the client asks for the full range
582+
# of rooms, we end up sending down all rooms and their state from
583+
# scratch (which can be very slow). By expiring the connection we allow
584+
# the client a chance to do an initial request with a smaller range of
585+
# rooms to get them some results sooner but will end up taking the same
586+
# amount of time (more with round-trips and re-processing) in the end to
587+
# get everything again.
588+
previous_connection_state = (
589+
await self.connection_store.get_per_connection_state(
590+
sync_config, from_token
591+
)
592+
)
589593

590594
await self.connection_store.mark_token_seen(
591595
sync_config=sync_config,
@@ -781,11 +785,7 @@ async def current_sync_for_user(
781785
# we haven't sent the room down, or we have but there are missing
782786
# updates).
783787
for room_id in relevant_room_map:
784-
status = await self.connection_store.have_sent_room(
785-
sync_config,
786-
from_token.connection_position,
787-
room_id,
788-
)
788+
status = previous_connection_state.rooms.have_sent_room(room_id)
789789
if (
790790
# The room was never sent down before so the client needs to know
791791
# about it regardless of any updates.
@@ -821,6 +821,7 @@ async def current_sync_for_user(
821821
async def handle_room(room_id: str) -> None:
822822
room_sync_result = await self.get_room_sync_data(
823823
sync_config=sync_config,
824+
per_connection_state=previous_connection_state,
824825
room_id=room_id,
825826
room_sync_config=relevant_rooms_to_send_map[room_id],
826827
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -853,6 +854,8 @@ async def handle_room(room_id: str) -> None:
853854
)
854855

855856
if has_lists or has_room_subscriptions:
857+
new_connection_state = previous_connection_state.get_mutable()
858+
856859
# We now calculate if any rooms outside the range have had updates,
857860
# which we are not sending down.
858861
#
@@ -882,11 +885,18 @@ async def handle_room(room_id: str) -> None:
882885
)
883886
unsent_room_ids = list(missing_event_map_by_room)
884887

885-
connection_position = await self.connection_store.record_rooms(
888+
new_connection_state.rooms.record_unsent_rooms(
889+
unsent_room_ids, from_token.stream_token
890+
)
891+
892+
new_connection_state.rooms.record_sent_rooms(
893+
relevant_rooms_to_send_map.keys()
894+
)
895+
896+
connection_position = await self.connection_store.record_new_state(
886897
sync_config=sync_config,
887898
from_token=from_token,
888-
sent_room_ids=relevant_rooms_to_send_map.keys(),
889-
unsent_room_ids=unsent_room_ids,
899+
per_connection_state=new_connection_state,
890900
)
891901
elif from_token:
892902
connection_position = from_token.connection_position
@@ -1939,6 +1949,7 @@ async def get_current_state_at(
19391949
async def get_room_sync_data(
19401950
self,
19411951
sync_config: SlidingSyncConfig,
1952+
per_connection_state: "PerConnectionState",
19421953
room_id: str,
19431954
room_sync_config: RoomSyncConfig,
19441955
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1986,11 +1997,7 @@ async def get_room_sync_data(
19861997
from_bound = None
19871998
initial = True
19881999
if from_token and not room_membership_for_user_at_to_token.newly_joined:
1989-
room_status = await self.connection_store.have_sent_room(
1990-
sync_config=sync_config,
1991-
connection_token=from_token.connection_position,
1992-
room_id=room_id,
1993-
)
2000+
room_status = per_connection_state.rooms.have_sent_room(room_id)
19942001
if room_status.status == HaveSentRoomFlag.LIVE:
19952002
from_bound = from_token.stream_token.room_key
19962003
initial = False
@@ -3034,6 +3041,121 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
30343041
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
30353042

30363043

3044+
@attr.s(auto_attribs=True, slots=True, frozen=True)
3045+
class RoomStatusMap:
3046+
"""For a given stream, e.g. events, records what we have or have not sent
3047+
down for that stream in a given room."""
3048+
3049+
# `room_id` -> `HaveSentRoom`
3050+
_statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict)
3051+
3052+
def have_sent_room(self, room_id: str) -> HaveSentRoom:
3053+
"""Return whether we have previously sent the room down"""
3054+
return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER)
3055+
3056+
def get_mutable(self) -> "MutableRoomStatusMap":
3057+
"""Get a mutable copy of this state."""
3058+
return MutableRoomStatusMap(
3059+
statuses=self._statuses,
3060+
)
3061+
3062+
def copy(self) -> "RoomStatusMap":
3063+
"""Make a copy of the class. Useful for converting from a mutable to
3064+
immutable version."""
3065+
3066+
return RoomStatusMap(statuses=dict(self._statuses))
3067+
3068+
3069+
class MutableRoomStatusMap(RoomStatusMap):
3070+
"""A mutable version of `RoomStatusMap`"""
3071+
3072+
# We use a ChainMap here so that we can easily track what has been updated
3073+
# and what hasn't. Note that when we persist the per connection state this
3074+
# will get flattened to a normal dict (via calling `.copy()`)
3075+
_statuses: typing.ChainMap[str, HaveSentRoom]
3076+
3077+
def __init__(
3078+
self,
3079+
statuses: Mapping[str, HaveSentRoom],
3080+
) -> None:
3081+
# ChainMap requires a mutable mapping, but we're not actually going to
3082+
# mutate it.
3083+
statuses = cast(MutableMapping, statuses)
3084+
3085+
super().__init__(
3086+
statuses=ChainMap({}, statuses),
3087+
)
3088+
3089+
def get_updates(self) -> Mapping[str, HaveSentRoom]:
3090+
"""Return only the changes that were made"""
3091+
return self._statuses.maps[0]
3092+
3093+
def record_sent_rooms(self, room_ids: StrCollection) -> None:
3094+
"""Record that we have sent these rooms in the response"""
3095+
for room_id in room_ids:
3096+
current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER)
3097+
if current_status.status == HaveSentRoomFlag.LIVE:
3098+
continue
3099+
3100+
self._statuses[room_id] = HAVE_SENT_ROOM_LIVE
3101+
3102+
def record_unsent_rooms(
3103+
self, room_ids: StrCollection, from_token: StreamToken
3104+
) -> None:
3105+
"""Record that we have not sent these rooms in the response, but there
3106+
have been updates.
3107+
"""
3108+
# Whether we add/update the entries for unsent rooms depends on the
3109+
# existing entry:
3110+
# - LIVE: We have previously sent down everything up to
3111+
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
3112+
# `last_room_token`.
3113+
# - PREVIOUSLY: We have previously sent down everything up to *a*
3114+
# given token, so we don't need to update the entry.
3115+
# - NEVER: We have never previously sent down the room, and we haven't
3116+
# sent anything down this time either so we leave it as NEVER.
3117+
3118+
for room_id in room_ids:
3119+
current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER)
3120+
if current_status.status != HaveSentRoomFlag.LIVE:
3121+
continue
3122+
3123+
self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key)
3124+
3125+
3126+
@attr.s(auto_attribs=True)
3127+
class PerConnectionState:
3128+
"""The per-connection state. A snapshot of what we've sent down the connection before.
3129+
3130+
Currently, we track whether we've sent down various aspects of a given room before.
3131+
3132+
We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position.
3133+
3134+
Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request.
3135+
3136+
Attributes:
3137+
rooms: The status of each room for the events stream.
3138+
"""
3139+
3140+
rooms: RoomStatusMap = attr.Factory(RoomStatusMap)
3141+
3142+
def get_mutable(self) -> "MutablePerConnectionState":
3143+
"""Get a mutable copy of this state."""
3144+
return MutablePerConnectionState(
3145+
rooms=self.rooms.get_mutable(),
3146+
)
3147+
3148+
3149+
@attr.s(auto_attribs=True)
3150+
class MutablePerConnectionState(PerConnectionState):
3151+
"""A mutable version of `PerConnectionState`"""
3152+
3153+
rooms: MutableRoomStatusMap
3154+
3155+
def has_updates(self) -> bool:
3156+
return bool(self.rooms.get_updates())
3157+
3158+
30373159
@attr.s(auto_attribs=True)
30383160
class SlidingSyncConnectionStore:
30393161
"""In-memory store of per-connection state, including what rooms we have
@@ -3063,9 +3185,9 @@ class SlidingSyncConnectionStore:
30633185
to mapping of room ID to `HaveSentRoom`.
30643186
"""
30653187

3066-
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
3067-
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
3068-
attr.Factory(dict)
3188+
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
3189+
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
3190+
dict
30693191
)
30703192

30713193
async def is_valid_token(
@@ -3078,48 +3200,52 @@ async def is_valid_token(
30783200
conn_key = self._get_connection_key(sync_config)
30793201
return connection_token in self._connections.get(conn_key, {})
30803202

3081-
async def have_sent_room(
3082-
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
3083-
) -> HaveSentRoom:
3084-
"""For the given user_id/conn_id/token, return whether we have
3085-
previously sent the room down
3203+
async def get_per_connection_state(
3204+
self,
3205+
sync_config: SlidingSyncConfig,
3206+
from_token: Optional[SlidingSyncStreamToken],
3207+
) -> PerConnectionState:
3208+
"""Fetch the per-connection state for the token.
3209+
3210+
Raises:
3211+
SlidingSyncUnknownPosition if the connection_token is unknown
30863212
"""
3213+
if from_token is None:
3214+
return PerConnectionState()
3215+
3216+
connection_position = from_token.connection_position
3217+
if connection_position == 0:
3218+
# Initial sync (request without a `from_token`) starts at `0` so
3219+
# there is no existing per-connection state
3220+
return PerConnectionState()
30873221

30883222
conn_key = self._get_connection_key(sync_config)
3089-
sync_statuses = self._connections.setdefault(conn_key, {})
3090-
room_status = sync_statuses.get(connection_token, {}).get(
3091-
room_id, HAVE_SENT_ROOM_NEVER
3092-
)
3223+
sync_statuses = self._connections.get(conn_key, {})
3224+
connection_state = sync_statuses.get(connection_position)
30933225

3094-
return room_status
3226+
if connection_state is None:
3227+
raise SlidingSyncUnknownPosition()
3228+
3229+
return connection_state
30953230

30963231
@trace
3097-
async def record_rooms(
3232+
async def record_new_state(
30983233
self,
30993234
sync_config: SlidingSyncConfig,
31003235
from_token: Optional[SlidingSyncStreamToken],
3101-
*,
3102-
sent_room_ids: StrCollection,
3103-
unsent_room_ids: StrCollection,
3236+
per_connection_state: MutablePerConnectionState,
31043237
) -> int:
3105-
"""Record which rooms we have/haven't sent down in a new response
3106-
3107-
Attributes:
3108-
sync_config
3109-
from_token: The since token from the request, if any
3110-
sent_room_ids: The set of room IDs that we have sent down as
3111-
part of this request (only needs to be ones we didn't
3112-
previously sent down).
3113-
unsent_room_ids: The set of room IDs that have had updates
3114-
since the `from_token`, but which were not included in
3115-
this request
3238+
"""Record updated per-connection state, returning the connection
3239+
position associated with the new state.
3240+
3241+
If there are no changes to the state this may return the same token as
3242+
the existing per-connection state.
31163243
"""
31173244
prev_connection_token = 0
31183245
if from_token is not None:
31193246
prev_connection_token = from_token.connection_position
31203247

3121-
# If there are no changes then this is a noop.
3122-
if not sent_room_ids and not unsent_room_ids:
3248+
if not per_connection_state.has_updates():
31233249
return prev_connection_token
31243250

31253251
conn_key = self._get_connection_key(sync_config)
@@ -3130,42 +3256,11 @@ async def record_rooms(
31303256
new_store_token = prev_connection_token + 1
31313257
sync_statuses.pop(new_store_token, None)
31323258

3133-
# Copy over and update the room mappings.
3134-
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
3135-
3136-
# Whether we have updated the `new_room_statuses`, if we don't by the
3137-
# end we can treat this as a noop.
3138-
have_updated = False
3139-
for room_id in sent_room_ids:
3140-
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
3141-
have_updated = True
3142-
3143-
# Whether we add/update the entries for unsent rooms depends on the
3144-
# existing entry:
3145-
# - LIVE: We have previously sent down everything up to
3146-
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
3147-
# `last_room_token`.
3148-
# - PREVIOUSLY: We have previously sent down everything up to *a*
3149-
# given token, so we don't need to update the entry.
3150-
# - NEVER: We have never previously sent down the room, and we haven't
3151-
# sent anything down this time either so we leave it as NEVER.
3152-
3153-
# Work out the new state for unsent rooms that were `LIVE`.
3154-
if from_token:
3155-
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
3156-
else:
3157-
new_unsent_state = HAVE_SENT_ROOM_NEVER
3158-
3159-
for room_id in unsent_room_ids:
3160-
prev_state = new_room_statuses.get(room_id)
3161-
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
3162-
new_room_statuses[room_id] = new_unsent_state
3163-
have_updated = True
3164-
3165-
if not have_updated:
3166-
return prev_connection_token
3167-
3168-
sync_statuses[new_store_token] = new_room_statuses
3259+
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
3260+
# don't grow forever.
3261+
sync_statuses[new_store_token] = PerConnectionState(
3262+
rooms=per_connection_state.rooms.copy(),
3263+
)
31693264

31703265
return new_store_token
31713266

0 commit comments

Comments
 (0)