Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 2b084c5

Browse files
authored
Merge device list replication streams (#14833)
1 parent db5145a commit 2b084c5

File tree

7 files changed

+72
-38
lines changed

7 files changed

+72
-38
lines changed

changelog.d/14826.misc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Merge tag and normal account data replication streams.
1+
Merge the two account data and the two device list replication streams.

changelog.d/14833.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Merge the two account data and the two device list replication streams.

docs/upgrade.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,13 @@ process, for example:
9292
9393
## Changes to the account data replication streams
9494
95-
Synapse has changed the format of the account data replication streams (between
96-
workers). This is a forwards- and backwards-incompatible change: v1.75 workers
97-
cannot process account data replicated by v1.76 workers, and vice versa.
95+
Synapse has changed the format of the account data and devices replication
96+
streams (between workers). This is a forwards- and backwards-incompatible
97+
change: v1.75 workers cannot process account data replicated by v1.76 workers,
98+
and vice versa.
9899
99100
Once all workers are upgraded to v1.76 (or downgraded to v1.75), account data
100-
replication will resume as normal.
101+
and device replication will resume as normal.
101102
102103
103104
# Upgrading to v1.74.0

synapse/replication/tcp/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ async def on_rdata(
187187
elif stream_name == DeviceListsStream.NAME:
188188
all_room_ids: Set[str] = set()
189189
for row in rows:
190-
if row.entity.startswith("@"):
190+
if row.entity.startswith("@") and not row.is_signature:
191191
room_ids = await self.store.get_rooms_for_user(row.entity)
192192
all_room_ids.update(room_ids)
193193
self.notifier.on_new_event(
@@ -422,7 +422,11 @@ async def process_replication_rows(
422422
# The entities are either user IDs (starting with '@') whose devices
423423
# have changed, or remote servers that we need to tell about
424424
# changes.
425-
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
425+
hosts = {
426+
row.entity
427+
for row in rows
428+
if not row.entity.startswith("@") and not row.is_signature
429+
}
426430
for host in hosts:
427431
self.federation_sender.send_device_messages(host, immediate=False)
428432

synapse/replication/tcp/streams/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
Stream,
3838
ToDeviceStream,
3939
TypingStream,
40-
UserSignatureStream,
4140
)
4241
from synapse.replication.tcp.streams.events import EventsStream
4342
from synapse.replication.tcp.streams.federation import FederationStream
@@ -62,7 +61,6 @@
6261
ToDeviceStream,
6362
FederationStream,
6463
AccountDataStream,
65-
UserSignatureStream,
6664
UnPartialStatedRoomStream,
6765
UnPartialStatedEventStream,
6866
)
@@ -82,7 +80,6 @@
8280
"DeviceListsStream",
8381
"ToDeviceStream",
8482
"AccountDataStream",
85-
"UserSignatureStream",
8683
"UnPartialStatedRoomStream",
8784
"UnPartialStatedEventStream",
8885
]

synapse/replication/tcp/streams/_base.py

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -463,18 +463,67 @@ class DeviceListsStream(Stream):
463463
@attr.s(slots=True, frozen=True, auto_attribs=True)
464464
class DeviceListsStreamRow:
465465
entity: str
466+
# Indicates that a user has signed their own device with their user-signing key
467+
is_signature: bool
466468

467469
NAME = "device_lists"
468470
ROW_TYPE = DeviceListsStreamRow
469471

470472
def __init__(self, hs: "HomeServer"):
471-
store = hs.get_datastores().main
473+
self.store = hs.get_datastores().main
472474
super().__init__(
473475
hs.get_instance_name(),
474-
current_token_without_instance(store.get_device_stream_token),
475-
store.get_all_device_list_changes_for_remotes,
476+
current_token_without_instance(self.store.get_device_stream_token),
477+
self._update_function,
478+
)
479+
480+
async def _update_function(
481+
self,
482+
instance_name: str,
483+
from_token: Token,
484+
current_token: Token,
485+
target_row_count: int,
486+
) -> StreamUpdateResult:
487+
(
488+
device_updates,
489+
devices_to_token,
490+
devices_limited,
491+
) = await self.store.get_all_device_list_changes_for_remotes(
492+
instance_name, from_token, current_token, target_row_count
476493
)
477494

495+
(
496+
signatures_updates,
497+
signatures_to_token,
498+
signatures_limited,
499+
) = await self.store.get_all_user_signature_changes_for_remotes(
500+
instance_name, from_token, current_token, target_row_count
501+
)
502+
503+
upper_limit_token = current_token
504+
if devices_limited:
505+
upper_limit_token = min(upper_limit_token, devices_to_token)
506+
if signatures_limited:
507+
upper_limit_token = min(upper_limit_token, signatures_to_token)
508+
509+
device_updates = [
510+
(stream_id, (entity, False))
511+
for stream_id, (entity,) in device_updates
512+
if stream_id <= upper_limit_token
513+
]
514+
515+
signatures_updates = [
516+
(stream_id, (entity, True))
517+
for stream_id, (entity,) in signatures_updates
518+
if stream_id <= upper_limit_token
519+
]
520+
521+
updates = list(
522+
heapq.merge(device_updates, signatures_updates, key=lambda row: row[0])
523+
)
524+
525+
return updates, upper_limit_token, devices_limited or signatures_limited
526+
478527

479528
class ToDeviceStream(Stream):
480529
"""New to_device messages for a client"""
@@ -583,22 +632,3 @@ async def _update_function(
583632
heapq.merge(room_rows, global_rows, tag_rows, key=lambda row: row[0])
584633
)
585634
return updates, to_token, limited
586-
587-
588-
class UserSignatureStream(Stream):
589-
"""A user has signed their own device with their user-signing key"""
590-
591-
@attr.s(slots=True, frozen=True, auto_attribs=True)
592-
class UserSignatureStreamRow:
593-
user_id: str
594-
595-
NAME = "user_signature"
596-
ROW_TYPE = UserSignatureStreamRow
597-
598-
def __init__(self, hs: "HomeServer"):
599-
store = hs.get_datastores().main
600-
super().__init__(
601-
hs.get_instance_name(),
602-
current_token_without_instance(store.get_device_stream_token),
603-
store.get_all_user_signature_changes_for_remotes,
604-
)

synapse/storage/databases/main/devices.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
whitelisted_homeserver,
3939
)
4040
from synapse.metrics.background_process_metrics import wrap_as_background_process
41-
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
41+
from synapse.replication.tcp.streams._base import DeviceListsStream
4242
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
4343
from synapse.storage.database import (
4444
DatabasePool,
@@ -163,24 +163,25 @@ def process_replication_rows(
163163
) -> None:
164164
if stream_name == DeviceListsStream.NAME:
165165
self._invalidate_caches_for_devices(token, rows)
166-
elif stream_name == UserSignatureStream.NAME:
167-
for row in rows:
168-
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
166+
169167
return super().process_replication_rows(stream_name, instance_name, token, rows)
170168

171169
def process_replication_position(
172170
self, stream_name: str, instance_name: str, token: int
173171
) -> None:
174172
if stream_name == DeviceListsStream.NAME:
175173
self._device_list_id_gen.advance(instance_name, token)
176-
elif stream_name == UserSignatureStream.NAME:
177-
self._device_list_id_gen.advance(instance_name, token)
174+
178175
super().process_replication_position(stream_name, instance_name, token)
179176

180177
def _invalidate_caches_for_devices(
181178
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
182179
) -> None:
183180
for row in rows:
181+
if row.is_signature:
182+
self._user_signature_stream_cache.entity_has_changed(row.entity, token)
183+
continue
184+
184185
# The entities are either user IDs (starting with '@') whose devices
185186
# have changed, or remote servers that we need to tell about
186187
# changes.

0 commit comments

Comments
 (0)