2424from twisted .internet import defer
2525
2626from synapse .api .constants import EventTypes , Membership
27+ from synapse .metrics .background_process_metrics import run_as_background_process
28+ from synapse .storage ._base import LoggingTransaction
2729from synapse .storage .events_worker import EventsWorkerStore
2830from synapse .types import get_domain_from_id
2931from synapse .util .async_helpers import Linearizer
5355MemberSummary = namedtuple ("MemberSummary" , ("members" , "count" ))
5456
5557_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
58+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
5659
5760
5861class RoomMemberWorkerStore (EventsWorkerStore ):
62+ def __init__ (self , db_conn , hs ):
63+ super (RoomMemberWorkerStore , self ).__init__ (db_conn , hs )
64+
65+ # Is the current_state_events.membership up to date? Or is the
66+ # background update still running?
67+ self ._current_state_events_membership_up_to_date = False
68+
69+ txn = LoggingTransaction (
70+ db_conn .cursor (),
71+ name = "_check_safe_current_state_events_membership_updated" ,
72+ database_engine = self .database_engine ,
73+ )
74+ self ._check_safe_current_state_events_membership_updated_txn (txn )
75+ txn .close ()
76+
77+ def _check_safe_current_state_events_membership_updated_txn (self , txn ):
78+ """Checks if it is safe to assume the new current_state_events
79+ membership column is up to date
80+ """
81+
82+ pending_update = self ._simple_select_one_txn (
83+ txn ,
84+ table = "background_updates" ,
85+ keyvalues = {"update_name" : _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME },
86+ retcols = ["update_name" ],
87+ allow_none = True ,
88+ )
89+
90+ self ._current_state_events_membership_up_to_date = not pending_update
91+
92+ # If the update is still running, reschedule to run.
93+ if pending_update :
94+ self ._clock .call_later (
95+ 15.0 ,
96+ run_as_background_process ,
97+ "_check_safe_current_state_events_membership_updated" ,
98+ self .runInteraction ,
99+ "_check_safe_current_state_events_membership_updated" ,
100+ self ._check_safe_current_state_events_membership_updated_txn ,
101+ )
102+
59103 @cachedInlineCallbacks (max_entries = 100000 , iterable = True , cache_context = True )
60104 def get_hosts_in_room (self , room_id , cache_context ):
61105 """Returns the set of all hosts currently in the room
@@ -69,14 +113,23 @@ def get_hosts_in_room(self, room_id, cache_context):
69113 @cached (max_entries = 100000 , iterable = True )
70114 def get_users_in_room (self , room_id ):
71115 def f (txn ):
72- sql = (
73- "SELECT m.user_id FROM room_memberships as m"
74- " INNER JOIN current_state_events as c"
75- " ON m.event_id = c.event_id "
76- " AND m.room_id = c.room_id "
77- " AND m.user_id = c.state_key"
78- " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
79- )
116+ # If we can assume current_state_events.membership is up to date
117+ # then we can avoid a join, which is a Very Good Thing given how
118+ # frequently this function gets called.
119+ if self ._current_state_events_membership_up_to_date :
120+ sql = """
121+ SELECT state_key FROM current_state_events
122+ WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
123+ """
124+ else :
125+ sql = """
126+ SELECT state_key FROM room_memberships as m
127+ INNER JOIN current_state_events as c
128+ ON m.event_id = c.event_id
129+ AND m.room_id = c.room_id
130+ AND m.user_id = c.state_key
131+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
132+ """
80133
81134 txn .execute (sql , (room_id , Membership .JOIN ))
82135 return [to_ascii (r [0 ]) for r in txn ]
@@ -98,15 +151,26 @@ def _get_room_summary_txn(txn):
98151 # first get counts.
99152 # We do this all in one transaction to keep the cache small.
100153 # FIXME: get rid of this when we have room_stats
101- sql = """
102- SELECT count(*), m.membership FROM room_memberships as m
103- INNER JOIN current_state_events as c
104- ON m.event_id = c.event_id
105- AND m.room_id = c.room_id
106- AND m.user_id = c.state_key
107- WHERE c.type = 'm.room.member' AND c.room_id = ?
108- GROUP BY m.membership
109- """
154+
155+ # If we can assume current_state_events.membership is up to date
156+ # then we can avoid a join, which is a Very Good Thing given how
157+ # frequently this function gets called.
158+ if self ._current_state_events_membership_up_to_date :
159+ sql = """
160+ SELECT count(*), membership FROM current_state_events
161+ WHERE type = 'm.room.member' AND room_id = ?
162+ GROUP BY membership
163+ """
164+ else :
165+ sql = """
166+ SELECT count(*), m.membership FROM room_memberships as m
167+ INNER JOIN current_state_events as c
168+ ON m.event_id = c.event_id
169+ AND m.room_id = c.room_id
170+ AND m.user_id = c.state_key
171+ WHERE c.type = 'm.room.member' AND c.room_id = ?
172+ GROUP BY m.membership
173+ """
110174
111175 txn .execute (sql , (room_id ,))
112176 res = {}
@@ -224,7 +288,7 @@ def _get_rooms_for_user_where_membership_is_txn(
224288 results = []
225289 if membership_list :
226290 where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
227- " OR " .join (["membership = ?" for _ in membership_list ]),
291+ " OR " .join (["m. membership = ?" for _ in membership_list ]),
228292 )
229293
230294 args = [user_id ]
@@ -453,8 +517,8 @@ def is_host_joined(self, room_id, host):
453517
454518 sql = """
455519 SELECT state_key FROM current_state_events AS c
456- INNER JOIN room_memberships USING (event_id)
457- WHERE membership = 'join'
520+ INNER JOIN room_memberships AS m USING (event_id)
521+ WHERE m. membership = 'join'
458522 AND type = 'm.room.member'
459523 AND c.room_id = ?
460524 AND state_key LIKE ?
@@ -602,6 +666,10 @@ def __init__(self, db_conn, hs):
602666 self .register_background_update_handler (
603667 _MEMBERSHIP_PROFILE_UPDATE_NAME , self ._background_add_membership_profile
604668 )
669+ self .register_background_update_handler (
670+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME ,
671+ self ._background_current_state_membership ,
672+ )
605673
606674 def _store_room_members_txn (self , txn , events , backfilled ):
607675 """Store a room member in the database.
@@ -781,6 +849,52 @@ def add_membership_profile_txn(txn):
781849
782850 defer .returnValue (result )
783851
852+ @defer .inlineCallbacks
853+ def _background_current_state_membership (self , progress , batch_size ):
854+ """Update the new membership column on current_state_events.
855+ """
856+
857+ if "rooms" not in progress :
858+ rooms = yield self ._simple_select_onecol (
859+ table = "current_state_events" ,
860+ keyvalues = {},
861+ retcol = "DISTINCT room_id" ,
862+ desc = "_background_current_state_membership_get_rooms" ,
863+ )
864+ progress ["rooms" ] = rooms
865+
866+ rooms = progress ["rooms" ]
867+
868+ def _background_current_state_membership_txn (txn ):
869+ processed = 0
870+ while rooms and processed < batch_size :
871+ sql = """
872+ UPDATE current_state_events AS c
873+ SET membership = (
874+ SELECT membership FROM room_memberships
875+ WHERE event_id = c.event_id
876+ )
877+ WHERE room_id = ?
878+ """
879+ txn .execute (sql , (rooms .pop (),))
880+ processed += txn .rowcount
881+
882+ self ._background_update_progress_txn (
883+ txn , _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME , progress
884+ )
885+
886+ return processed
887+
888+ result = yield self .runInteraction (
889+ "_background_current_state_membership_update" ,
890+ _background_current_state_membership_txn ,
891+ )
892+
893+ if not rooms :
894+ yield self ._end_background_update (_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME )
895+
896+ defer .returnValue (result )
897+
784898
785899class _JoinedHostsCache (object ):
786900 """Cache for joined hosts in a room that is optimised to handle updates
0 commit comments