Skip to content

Commit 81e0f57

Browse files
Fix perf when streams don't change often (#17767)
There is a bug with the `StreamChangeCache` where it would incorrectly return that all entities had changed if asked for entities changed *since* the earliest stream position. Note that for streams we use the inequalities: `$min_stream_id < stream_id <= $max_stream_id`, i.e. when we ask the stream change cache for all things that have changed since `$stream_id` we don't care for events that happened *at* `$stream_id`. Specifically: `_earliest_known_stream_pos` is the position at which we know that we'll have entries for all changes since that point, we can use the cache for any stream IDs that equal `_earliest_known_stream_pos`. `_earliest_known_stream_pos` is set in three places: - On startup we set it either to: - the current maximum stream ID, with not prefilled values; or - the minimum of the latest N values we pulled from the DB - When we evict items from the bottom, we set it to the stream ID of the evicted items. This was changed in matrix-org/synapse#14435, but I think we were overly conservative there. --------- Co-authored-by: Andrew Morgan <[email protected]>
1 parent ae4862c commit 81e0f57

File tree

3 files changed

+19
-14
lines changed

3 files changed

+19
-14
lines changed

changelog.d/17767.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix performance of streams that don't change often.

synapse/util/caches/stream_change_cache.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
142142
"""
143143
assert isinstance(stream_pos, int)
144144

145-
# _cache is not valid at or before the earliest known stream position, so
145+
# _cache is not valid before the earliest known stream position, so
146146
# return that the entity has changed.
147-
if stream_pos <= self._earliest_known_stream_pos:
147+
if stream_pos < self._earliest_known_stream_pos:
148148
self.metrics.inc_misses()
149149
return True
150150

@@ -186,7 +186,7 @@ def get_entities_changed(
186186
This will be all entities if the given stream position is at or earlier
187187
than the earliest known stream position.
188188
"""
189-
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
189+
if not self._cache or stream_pos < self._earliest_known_stream_pos:
190190
self.metrics.inc_misses()
191191
return set(entities)
192192

@@ -238,9 +238,9 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
238238
"""
239239
assert isinstance(stream_pos, int)
240240

241-
# _cache is not valid at or before the earliest known stream position, so
241+
# _cache is not valid before the earliest known stream position, so
242242
# return that an entity has changed.
243-
if stream_pos <= self._earliest_known_stream_pos:
243+
if stream_pos < self._earliest_known_stream_pos:
244244
self.metrics.inc_misses()
245245
return True
246246

@@ -270,9 +270,9 @@ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
270270
"""
271271
assert isinstance(stream_pos, int)
272272

273-
# _cache is not valid at or before the earliest known stream position, so
273+
# _cache is not valid before the earliest known stream position, so
274274
# return None to mark that it is unknown if an entity has changed.
275-
if stream_pos <= self._earliest_known_stream_pos:
275+
if stream_pos < self._earliest_known_stream_pos:
276276
return AllEntitiesChangedResult(None)
277277

278278
changed_entities: List[EntityType] = []

tests/util/test_stream_change_cache.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ def test_has_entity_changed(self) -> None:
5353
# return True, whether it's a known entity or not.
5454
self.assertTrue(cache.has_entity_changed("[email protected]", 0))
5555
self.assertTrue(cache.has_entity_changed("[email protected]", 0))
56-
self.assertTrue(cache.has_entity_changed("[email protected]", 3))
57-
self.assertTrue(cache.has_entity_changed("[email protected]", 3))
56+
self.assertTrue(cache.has_entity_changed("[email protected]", 2))
57+
self.assertTrue(cache.has_entity_changed("[email protected]", 2))
5858

5959
def test_entity_has_changed_pops_off_start(self) -> None:
6060
"""
@@ -76,9 +76,11 @@ def test_entity_has_changed_pops_off_start(self) -> None:
7676
self.assertTrue("[email protected]" not in cache._entity_to_key)
7777

7878
self.assertEqual(
79-
cache.get_all_entities_changed(3).entities, ["[email protected]"]
79+
cache.get_all_entities_changed(2).entities,
80+
8081
)
81-
self.assertFalse(cache.get_all_entities_changed(2).hit)
82+
self.assertFalse(cache.get_all_entities_changed(1).hit)
83+
self.assertTrue(cache.get_all_entities_changed(2).hit)
8284

8385
# If we update an existing entity, it keeps the two existing entities
8486
cache.entity_has_changed("[email protected]", 5)
@@ -89,7 +91,8 @@ def test_entity_has_changed_pops_off_start(self) -> None:
8991
cache.get_all_entities_changed(3).entities,
9092
9193
)
92-
self.assertFalse(cache.get_all_entities_changed(2).hit)
94+
self.assertFalse(cache.get_all_entities_changed(1).hit)
95+
self.assertTrue(cache.get_all_entities_changed(2).hit)
9396

9497
def test_get_all_entities_changed(self) -> None:
9598
"""
@@ -114,7 +117,8 @@ def test_get_all_entities_changed(self) -> None:
114117
self.assertEqual(
115118
cache.get_all_entities_changed(3).entities, ["[email protected]"]
116119
)
117-
self.assertFalse(cache.get_all_entities_changed(1).hit)
120+
self.assertFalse(cache.get_all_entities_changed(0).hit)
121+
self.assertTrue(cache.get_all_entities_changed(1).hit)
118122

119123
# ... later, things gest more updates
120124
cache.entity_has_changed("[email protected]", 5)
@@ -149,7 +153,7 @@ def test_has_any_entity_changed(self) -> None:
149153
# With no entities, it returns True for the past, present, and False for
150154
# the future.
151155
self.assertTrue(cache.has_any_entity_changed(0))
152-
self.assertTrue(cache.has_any_entity_changed(1))
156+
self.assertFalse(cache.has_any_entity_changed(1))
153157
self.assertFalse(cache.has_any_entity_changed(2))
154158

155159
# We add an entity

0 commit comments

Comments
 (0)