Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit af18780

Browse files
authored
Merge pull request #5809 from matrix-org/erikj/handle_pusher_stop
Handle pusher being deleted during processing.
2 parents 0b6fbb2 + 96bdd66 commit af18780

File tree

4 files changed

+55
-21
lines changed

4 files changed

+55
-21
lines changed

changelog.d/5809.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle pusher being deleted during processing rather than logging an exception.

synapse/push/emailpusher.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,19 @@ def save_last_stream_ordering_and_success(self, last_stream_ordering):
234234
return
235235

236236
self.last_stream_ordering = last_stream_ordering
237-
yield self.store.update_pusher_last_stream_ordering_and_success(
238-
self.app_id,
239-
self.email,
240-
self.user_id,
241-
last_stream_ordering,
242-
self.clock.time_msec(),
237+
pusher_still_exists = (
238+
yield self.store.update_pusher_last_stream_ordering_and_success(
239+
self.app_id,
240+
self.email,
241+
self.user_id,
242+
last_stream_ordering,
243+
self.clock.time_msec(),
244+
)
243245
)
246+
if not pusher_still_exists:
247+
# The pusher has been deleted while we were processing, so
248+
# lets just stop and return.
249+
self.on_stop()
244250

245251
def seconds_until(self, ts_msec):
246252
secs = (ts_msec - self.clock.time_msec()) / 1000

synapse/push/httppusher.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,21 @@ def _unsafe_process(self):
199199
http_push_processed_counter.inc()
200200
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
201201
self.last_stream_ordering = push_action["stream_ordering"]
202-
yield self.store.update_pusher_last_stream_ordering_and_success(
203-
self.app_id,
204-
self.pushkey,
205-
self.user_id,
206-
self.last_stream_ordering,
207-
self.clock.time_msec(),
202+
pusher_still_exists = (
203+
yield self.store.update_pusher_last_stream_ordering_and_success(
204+
self.app_id,
205+
self.pushkey,
206+
self.user_id,
207+
self.last_stream_ordering,
208+
self.clock.time_msec(),
209+
)
208210
)
211+
if not pusher_still_exists:
212+
# The pusher has been deleted while we were processing, so
213+
# lets just stop and return.
214+
self.on_stop()
215+
return
216+
209217
if self.failing_since:
210218
self.failing_since = None
211219
yield self.store.update_pusher_failing_since(
@@ -234,12 +242,17 @@ def _unsafe_process(self):
234242
)
235243
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
236244
self.last_stream_ordering = push_action["stream_ordering"]
237-
yield self.store.update_pusher_last_stream_ordering(
245+
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
238246
self.app_id,
239247
self.pushkey,
240248
self.user_id,
241249
self.last_stream_ordering,
242250
)
251+
if not pusher_still_exists:
252+
# The pusher has been deleted while we were processing, so
253+
# lets just stop and return.
254+
self.on_stop()
255+
return
243256

244257
self.failing_since = None
245258
yield self.store.update_pusher_failing_since(

synapse/storage/pusher.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -308,22 +308,36 @@ def update_pusher_last_stream_ordering(
308308
def update_pusher_last_stream_ordering_and_success(
309309
self, app_id, pushkey, user_id, last_stream_ordering, last_success
310310
):
311-
yield self._simple_update_one(
312-
"pushers",
313-
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
314-
{
311+
"""Update the last stream ordering position we've processed up to for
312+
the given pusher.
313+
314+
Args:
315+
app_id (str)
316+
pushkey (str)
317+
last_stream_ordering (int)
318+
last_success (int)
319+
320+
Returns:
321+
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
322+
"""
323+
updated = yield self._simple_update(
324+
table="pushers",
325+
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
326+
updatevalues={
315327
"last_stream_ordering": last_stream_ordering,
316328
"last_success": last_success,
317329
},
318330
desc="update_pusher_last_stream_ordering_and_success",
319331
)
320332

333+
return bool(updated)
334+
321335
@defer.inlineCallbacks
322336
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
323-
yield self._simple_update_one(
324-
"pushers",
325-
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
326-
{"failing_since": failing_since},
337+
yield self._simple_update(
338+
table="pushers",
339+
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
340+
updatevalues={"failing_since": failing_since},
327341
desc="update_pusher_failing_since",
328342
)
329343

0 commit comments

Comments
 (0)