From 44e70c9685f04fde72ae660739a8fecfd839aa43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Jun 2019 10:29:36 +0100 Subject: [PATCH 1/6] Fix background updates to handle redactions/rejections In background updates based on current state delta stream we need to handle that we may not have all the events (or at least that `get_events` may raise an exception). --- synapse/handlers/presence.py | 11 ++++++---- synapse/handlers/stats.py | 18 +++++++++++----- synapse/storage/events_worker.py | 37 ++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6209858bbb9a..e49c8203efc6 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -828,14 +828,17 @@ def _handle_state_delta(self, deltas): # joins. continue - event = yield self.store.get_event(event_id) - if event.content.get("membership") != Membership.JOIN: + event = yield self.store.get_event(event_id, allow_none=True) + if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id) - if prev_event.content.get("membership") == Membership.JOIN: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if ( + prev_event + and prev_event.content.get("membership") == Membership.JOIN + ): # Ignore changes to join events. continue diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 0e92b405ba6f..7ad16c85665e 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -115,6 +115,7 @@ def _handle_deltas(self, deltas): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -136,10 +137,15 @@ def _handle_deltas(self, deltas): event_content = {} if event_id is not None: - event_content = (yield self.store.get_event(event_id)).content or {} + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) # quantise time to the nearest bucket - now = yield self.store.get_received_ts(event_id) now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size if typ == EventTypes.Member: @@ -149,9 +155,11 @@ def _handle_deltas(self, deltas): # compare them. prev_event_content = {} if prev_event_id is not None: - prev_event_content = ( - yield self.store.get_event(prev_event_id) - ).content + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True, + ) + if prev_event: + prev_event_content = prev_event.content membership = event_content.get("membership", Membership.LEAVE) prev_membership = prev_event_content.get("membership", Membership.LEAVE) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 17824280485a..cc7df5cf14df 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -78,6 +78,43 @@ def get_received_ts(self, event_id): desc="get_received_ts", ) + def get_received_ts_by_stream_pos(self, stream_ordering): + """Given a stream ordering get an approximate timestamp of when it + happened. + + This is done by simply taking the received ts of the first event that + has a stream ordering greater than or equal to the given stream pos. + If none exists returns the current time, on the assumption that it must + have happened recently. + + Args: + stream_ordering (int) + + Returns: + Deferred[int] + """ + + def _get_approximate_received_ts_txn(txn): + sql = """ + SELECT received_ts FROM events + WHERE stream_ordering >= ? + LIMIT 1 + """ + + txn.execute(sql, (stream_ordering,)) + row = txn.fetchone() + if row and row[0]: + ts = row[0] + else: + ts = self.clock.time_msec() + + return ts + + return self.runInteraction( + "get_approximate_received_ts", + _get_approximate_received_ts_txn, + ) + @defer.inlineCallbacks def get_event( self, From 9bb2f2b93e968f165915d3d6dcc627f3d8846373 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Jun 2019 10:33:07 +0100 Subject: [PATCH 2/6] Newsfile --- changelog.d/5352.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5352.bugfix diff --git a/changelog.d/5352.bugfix b/changelog.d/5352.bugfix new file mode 100644 index 000000000000..2ffefe5a6846 --- /dev/null +++ b/changelog.d/5352.bugfix @@ -0,0 +1 @@ +Fix room stats and presence background updates to correctly handle missing events. From bb6de23285ff0aec9cc5bbb80f1be2b6a6ec4b1f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 5 Jun 2019 21:26:30 +1000 Subject: [PATCH 3/6] add a test --- tests/handlers/test_stats.py | 61 +++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index 249aba3d598f..f94a4704848c 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -204,7 +204,7 @@ def test_incorrect_state_transition(self): "a2": {"membership": "not a real thing"}, } - def get_event(event_id): + def get_event(event_id, allow_none=True): m = Mock() m.content = events[event_id] d = defer.Deferred() @@ -249,3 +249,62 @@ def get_received_ts(event_id): self.assertEqual( f.value.args[0], "'not a real thing' is not a valid membership" ) + + @unittest.DEBUG + def test_redacted_prev_event(self): + """ + If the prev_event does not exist, then it is assumed to be a LEAVE. + """ + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + + # Do the initial population of the user directory via the background update + self._add_background_updates() + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + events = { + "a1": None, + "a2": {"membership": Membership.JOIN}, + } + + def get_event(event_id, allow_none=True): + if events.get(event_id): + m = Mock() + m.content = events[event_id] + else: + m = None + d = defer.Deferred() + self.reactor.callLater(0.0, d.callback, m) + return d + + def get_received_ts(event_id): + return defer.succeed(1) + + self.store.get_received_ts = get_received_ts + self.store.get_event = get_event + + deltas = [ + { + "type": EventTypes.Member, + "state_key": "some_user:test", + "room_id": room_1, + "event_id": "a2", + "prev_event_id": "a1", + "stream_id": 100, + } + ] + + # Handle our fake deltas, which has a user going from LEAVE -> JOIN. + f = self.get_success(self.handler._handle_deltas(deltas)) + + # One delta, with two joined members -- the room creator, and our fake + # user. + r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) + self.assertEqual(len(r), 1) + self.assertEqual(r[0]["joined_members"], 2) + + From 205bb2dd248ff84620f073e5de71594583a2b71f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 5 Jun 2019 21:40:26 +1000 Subject: [PATCH 4/6] fix --- tests/handlers/test_stats.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index f94a4704848c..d01ebb397028 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -299,12 +299,10 @@ def get_received_ts(event_id): ] # Handle our fake deltas, which has a user going from LEAVE -> JOIN. - f = self.get_success(self.handler._handle_deltas(deltas)) + self.get_success(self.handler._handle_deltas(deltas)) # One delta, with two joined members -- the room creator, and our fake # user. r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) self.assertEqual(len(r), 1) self.assertEqual(r[0]["joined_members"], 2) - - From d3cc6d204a6a093ea5893eddf6782fc444af20d9 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 5 Jun 2019 22:18:58 +1000 Subject: [PATCH 5/6] de-bleb --- tests/handlers/test_stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index d01ebb397028..eee43553c5f9 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -224,7 +224,7 @@ def get_received_ts(event_id): "room_id": "room", "event_id": "a1", "prev_event_id": "a2", - "stream_id": "bleb", + "stream_id": 60, } ] @@ -241,7 +241,7 @@ def get_received_ts(event_id): "room_id": "room", "event_id": "a2", "prev_event_id": "a1", - "stream_id": "bleb", + "stream_id": 100, } ] From 55173a17fdead2acb249d9055752d038f8b2780c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 5 Jun 2019 23:03:07 +1000 Subject: [PATCH 6/6] remove debug --- tests/handlers/test_stats.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index eee43553c5f9..2710c991cfec 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -250,7 +250,6 @@ def get_received_ts(event_id): f.value.args[0], "'not a real thing' is not a valid membership" ) - @unittest.DEBUG def test_redacted_prev_event(self): """ If the prev_event does not exist, then it is assumed to be a LEAVE.