Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c31793a

Browse files
committed
Merge branch 'rav/fix_linearizer_cancellation' into develop
2 parents c08f9d9 + 178ab76 commit c31793a

File tree

2 files changed

+69
-43
lines changed

2 files changed

+69
-43
lines changed

changelog.d/3676.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make the tests pass on Twisted < 18.7.0

synapse/util/async_helpers.py

Lines changed: 68 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -188,62 +188,30 @@ def __init__(self, name=None, max_count=1, clock=None):
188188
# things blocked from executing.
189189
self.key_to_defer = {}
190190

191-
@defer.inlineCallbacks
192191
def queue(self, key):
192+
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
193+
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
194+
# propagated inside inlineCallbacks until Twisted 18.7)
193195
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
194196

195197
# If the number of things executing is greater than the maximum
196198
# then add a deferred to the list of blocked items
197-
# When on of the things currently executing finishes it will callback
199+
# When one of the things currently executing finishes it will callback
198200
# this item so that it can continue executing.
199201
if entry[0] >= self.max_count:
200-
new_defer = defer.Deferred()
201-
entry[1][new_defer] = 1
202-
203-
logger.info(
204-
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
205-
)
206-
try:
207-
yield make_deferred_yieldable(new_defer)
208-
except Exception as e:
209-
if isinstance(e, CancelledError):
210-
logger.info(
211-
"Cancelling wait for linearizer lock %r for key %r",
212-
self.name, key,
213-
)
214-
else:
215-
logger.warn(
216-
"Unexpected exception waiting for linearizer lock %r for key %r",
217-
self.name, key,
218-
)
219-
220-
# we just have to take ourselves back out of the queue.
221-
del entry[1][new_defer]
222-
raise
223-
224-
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
225-
entry[0] += 1
226-
227-
# if the code holding the lock completes synchronously, then it
228-
# will recursively run the next claimant on the list. That can
229-
# relatively rapidly lead to stack exhaustion. This is essentially
230-
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
231-
#
232-
# In order to break the cycle, we add a cheeky sleep(0) here to
233-
# ensure that we fall back to the reactor between each iteration.
234-
#
235-
# (This needs to happen while we hold the lock, and the context manager's exit
236-
# code must be synchronous, so this is the only sensible place.)
237-
yield self._clock.sleep(0)
238-
202+
res = self._await_lock(key)
239203
else:
240204
logger.info(
241205
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
242206
)
243207
entry[0] += 1
208+
res = defer.succeed(None)
209+
210+
# once we successfully get the lock, we need to return a context manager which
211+
# will release the lock.
244212

245213
@contextmanager
246-
def _ctx_manager():
214+
def _ctx_manager(_):
247215
try:
248216
yield
249217
finally:
@@ -264,7 +232,64 @@ def _ctx_manager():
264232
# map.
265233
del self.key_to_defer[key]
266234

267-
defer.returnValue(_ctx_manager())
235+
res.addCallback(_ctx_manager)
236+
return res
237+
238+
def _await_lock(self, key):
239+
"""Helper for queue: adds a deferred to the queue
240+
241+
Assumes that we've already checked that we've reached the limit of the number
242+
of lock-holders we allow. Creates a new deferred which is added to the list, and
243+
adds some management around cancellations.
244+
245+
Returns the deferred, which will callback once we have secured the lock.
246+
247+
"""
248+
entry = self.key_to_defer[key]
249+
250+
logger.info(
251+
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
252+
)
253+
254+
new_defer = make_deferred_yieldable(defer.Deferred())
255+
entry[1][new_defer] = 1
256+
257+
def cb(_r):
258+
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
259+
entry[0] += 1
260+
261+
# if the code holding the lock completes synchronously, then it
262+
# will recursively run the next claimant on the list. That can
263+
# relatively rapidly lead to stack exhaustion. This is essentially
264+
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
265+
#
266+
# In order to break the cycle, we add a cheeky sleep(0) here to
267+
# ensure that we fall back to the reactor between each iteration.
268+
#
269+
# (This needs to happen while we hold the lock, and the context manager's exit
270+
# code must be synchronous, so this is the only sensible place.)
271+
return self._clock.sleep(0)
272+
273+
def eb(e):
274+
logger.info("defer %r got err %r", new_defer, e)
275+
if isinstance(e, CancelledError):
276+
logger.info(
277+
"Cancelling wait for linearizer lock %r for key %r",
278+
self.name, key,
279+
)
280+
281+
else:
282+
logger.warn(
283+
"Unexpected exception waiting for linearizer lock %r for key %r",
284+
self.name, key,
285+
)
286+
287+
# we just have to take ourselves back out of the queue.
288+
del entry[1][new_defer]
289+
return e
290+
291+
new_defer.addCallbacks(cb, eb)
292+
return new_defer
268293

269294

270295
class ReadWriteLock(object):

0 commit comments

Comments
 (0)