4141from prometheus_client import Counter
4242
4343from twisted .internet import defer
44+ from twisted .internet .defer import Deferred
4445
4546from synapse .api .constants import EduTypes , EventTypes , HistoryVisibility , Membership
4647from synapse .api .errors import AuthError
5253from synapse .metrics import LaterGauge
5354from synapse .streams .config import PaginationConfig
5455from synapse .types import (
56+ ISynapseReactor ,
5557 JsonDict ,
5658 MultiWriterStreamToken ,
5759 PersistedEventPosition ,
6163 StreamToken ,
6264 UserID ,
6365)
64- from synapse .util .async_helpers import ObservableDeferred , timeout_deferred
66+ from synapse .util .async_helpers import (
67+ timeout_deferred ,
68+ )
6569from synapse .util .metrics import Measure
6670from synapse .util .stringutils import shortstr
6771from synapse .visibility import filter_events_for_client
@@ -90,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
9094 return n
9195
9296
93- class _NotificationListener :
94- """This represents a single client connection to the events stream.
95- The events stream handler will have yielded to the deferred, so to
96- notify the handler it is sufficient to resolve the deferred.
97- """
98-
99- __slots__ = ["deferred" ]
100-
101- def __init__ (self , deferred : "defer.Deferred" ):
102- self .deferred = deferred
103-
104-
10597class _NotifierUserStream :
10698 """This represents a user connected to the event stream.
10799 It tracks the most recent stream token for that user.
@@ -114,11 +106,13 @@ class _NotifierUserStream:
114106
115107 def __init__ (
116108 self ,
109+ reactor : ISynapseReactor ,
117110 user_id : str ,
118111 rooms : StrCollection ,
119112 current_token : StreamToken ,
120113 time_now_ms : int ,
121114 ):
115+ self .reactor = reactor
122116 self .user_id = user_id
123117 self .rooms = set (rooms )
124118
@@ -130,28 +124,31 @@ def __init__(
130124 self .current_token = current_token
131125 self .last_notified_ms = time_now_ms
132126
133- self .notify_deferred : ObservableDeferred [StreamToken ] = ObservableDeferred (
134- defer .Deferred ()
135- )
127+ # Set of listeners that we need to wake up when there has been a change.
128+ self .listeners : Set [Deferred [StreamToken ]] = set ()
136129
137- def notify (
130+ def update_and_fetch_deferreds (
138131 self ,
139132 current_token : StreamToken ,
140133 time_now_ms : int ,
141- ) -> None :
142- """Notify any listeners for this user of a new event from an
143- event source.
134+ ) -> Collection ["Deferred[StreamToken]" ]:
135+ """Update the stream for this user because of a new event from an
136+ event source, and return the set of deferreds to wake up.
137+
144138 Args:
145139 current_token: The new current token.
146140 time_now_ms: The current time in milliseconds.
141+
142+ Returns:
143+ The set of deferreds that need to be called.
147144 """
148145 self .current_token = current_token
149146 self .last_notified_ms = time_now_ms
150- notify_deferred = self .notify_deferred
151147
152- with PreserveLoggingContext ():
153- self .notify_deferred = ObservableDeferred (defer .Deferred ())
154- notify_deferred .callback (self .current_token )
148+ listeners = self .listeners
149+ self .listeners = set ()
150+
151+ return listeners
155152
156153 def remove (self , notifier : "Notifier" ) -> None :
157154 """Remove this listener from all the indexes in the Notifier
@@ -165,9 +162,9 @@ def remove(self, notifier: "Notifier") -> None:
165162 notifier .user_to_user_stream .pop (self .user_id )
166163
167164 def count_listeners (self ) -> int :
168- return len (self .notify_deferred . observers () )
165+ return len (self .listeners )
169166
170- def new_listener (self , token : StreamToken ) -> _NotificationListener :
167+ def new_listener (self , token : StreamToken ) -> "Deferred[StreamToken]" :
171168 """Returns a deferred that is resolved when there is a new token
172169 greater than the given token.
173170
@@ -177,10 +174,17 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
177174 """
178175 # Immediately wake up stream if something has already since happened
179176 # since their last token.
180- if self .current_token != token :
181- return _NotificationListener (defer .succeed (self .current_token ))
182- else :
183- return _NotificationListener (self .notify_deferred .observe ())
177+ if token != self .current_token :
178+ return defer .succeed (self .current_token )
179+
180+ # Create a new deferred and add it to the set of listeners. We add a
181+ # cancel handler to remove it from the set again, to handle timeouts.
182+ deferred : "Deferred[StreamToken]" = Deferred (
183+ canceller = lambda d : self .listeners .discard (d )
184+ )
185+ self .listeners .add (deferred )
186+
187+ return deferred
184188
185189
186190@attr .s (slots = True , frozen = True , auto_attribs = True )
@@ -233,6 +237,7 @@ def __init__(self, hs: "HomeServer"):
233237 # List of callbacks to be notified when a lock is released
234238 self ._lock_released_callback : List [Callable [[str , str , str ], None ]] = []
235239
240+ self .reactor = hs .get_reactor ()
236241 self .clock = hs .get_clock ()
237242 self .appservice_handler = hs .get_application_service_handler ()
238243 self ._pusher_pool = hs .get_pusherpool ()
@@ -329,12 +334,20 @@ async def on_un_partial_stated_room(
329334 user_streams = self .room_to_user_streams .get (room_id , set ())
330335 time_now_ms = self .clock .time_msec ()
331336 current_token = self .event_sources .get_current_token ()
337+
338+ listeners : List ["Deferred[StreamToken]" ] = []
332339 for user_stream in user_streams :
333340 try :
334- user_stream .notify (current_token , time_now_ms )
341+ listeners .extend (
342+ user_stream .update_and_fetch_deferreds (current_token , time_now_ms )
343+ )
335344 except Exception :
336345 logger .exception ("Failed to notify listener" )
337346
347+ with PreserveLoggingContext ():
348+ for listener in listeners :
349+ listener .callback (current_token )
350+
338351 users_woken_by_stream_counter .labels (StreamKeyType .UN_PARTIAL_STATED_ROOMS ).inc (
339352 len (user_streams )
340353 )
@@ -538,12 +551,24 @@ def on_new_event(
538551
539552 time_now_ms = self .clock .time_msec ()
540553 current_token = self .event_sources .get_current_token ()
554+ listeners : List ["Deferred[StreamToken]" ] = []
541555 for user_stream in user_streams :
542556 try :
543- user_stream .notify (current_token , time_now_ms )
557+ listeners .extend (
558+ user_stream .update_and_fetch_deferreds (
559+ current_token , time_now_ms
560+ )
561+ )
544562 except Exception :
545563 logger .exception ("Failed to notify listener" )
546564
565+ # We resolve all these deferreds in one go so that we only need to
566+ # call `PreserveLoggingContext` once, as it has a bunch of overhead
567+ # (to calculate performance stats)
568+ with PreserveLoggingContext ():
569+ for listener in listeners :
570+ listener .callback (current_token )
571+
547572 users_woken_by_stream_counter .labels (stream_key ).inc (len (user_streams ))
548573
549574 self .notify_replication ()
@@ -582,6 +607,7 @@ async def wait_for_events(
582607 if room_ids is None :
583608 room_ids = await self .store .get_rooms_for_user (user_id )
584609 user_stream = _NotifierUserStream (
610+ reactor = self .reactor ,
585611 user_id = user_id ,
586612 rooms = room_ids ,
587613 current_token = current_token ,
@@ -604,8 +630,8 @@ async def wait_for_events(
604630 # Now we wait for the _NotifierUserStream to be told there
605631 # is a new token.
606632 listener = user_stream .new_listener (prev_token )
607- listener . deferred = timeout_deferred (
608- listener . deferred ,
633+ listener = timeout_deferred (
634+ listener ,
609635 (end_time - now ) / 1000.0 ,
610636 self .hs .get_reactor (),
611637 )
@@ -618,7 +644,7 @@ async def wait_for_events(
618644 )
619645
620646 with PreserveLoggingContext ():
621- await listener . deferred
647+ await listener
622648
623649 log_kv (
624650 {
0 commit comments