@@ -67,6 +67,8 @@ class _BackgroundUpdates:
6767 EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
6868 EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
6969
70+ EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
71+
7072
7173@attr .s (slots = True , frozen = True , auto_attribs = True )
7274class _CalculateChainCover :
@@ -253,6 +255,11 @@ def __init__(
253255 replaces_index = "ev_edges_id" ,
254256 )
255257
258+ self .db_pool .updates .register_background_update_handler (
259+ _BackgroundUpdates .EVENTS_POPULATE_STATE_KEY_REJECTIONS ,
260+ self ._background_events_populate_state_key_rejections ,
261+ )
262+
256263 async def _background_reindex_fields_sender (
257264 self , progress : JsonDict , batch_size : int
258265 ) -> int :
@@ -1399,3 +1406,83 @@ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
13991406 )
14001407
14011408 return batch_size
1409+
1410+ async def _background_events_populate_state_key_rejections (
1411+ self , progress : JsonDict , batch_size : int
1412+ ) -> int :
1413+ """Back-populate `events.state_key` and `events.rejection_reason"""
1414+
1415+ min_stream_ordering_exclusive = progress ["min_stream_ordering_exclusive" ]
1416+ max_stream_ordering_inclusive = progress ["max_stream_ordering_inclusive" ]
1417+
1418+ def _populate_txn (txn : LoggingTransaction ) -> bool :
1419+ """Returns True if we're done."""
1420+
1421+ # first we need to find an endpoint.
1422+ # we need to find the final row in the batch of batch_size, which means
1423+ # we need to skip over (batch_size-1) rows and get the next row.
1424+ txn .execute (
1425+ """
1426+ SELECT stream_ordering FROM events
1427+ WHERE stream_ordering > ? AND stream_ordering <= ?
1428+ ORDER BY stream_ordering
1429+ LIMIT 1 OFFSET ?
1430+ """ ,
1431+ (
1432+ min_stream_ordering_exclusive ,
1433+ max_stream_ordering_inclusive ,
1434+ batch_size - 1 ,
1435+ ),
1436+ )
1437+
1438+ endpoint = None
1439+ row = txn .fetchone ()
1440+ if row :
1441+ endpoint = row [0 ]
1442+
1443+ where_clause = "stream_ordering > ?"
1444+ args = [min_stream_ordering_exclusive ]
1445+ if endpoint :
1446+ where_clause += " AND stream_ordering <= ?"
1447+ args .append (endpoint )
1448+
1449+ # now do the updates.
1450+ txn .execute (
1451+ f"""
1452+ UPDATE events
1453+ SET state_key = (SELECT state_key FROM state_events se WHERE se.event_id = events.event_id),
1454+ rejection_reason = (SELECT reason FROM rejections rej WHERE rej.event_id = events.event_id)
1455+ WHERE ({ where_clause } )
1456+ """ ,
1457+ args ,
1458+ )
1459+
1460+ logger .info (
1461+ "populated new `events` columns up to %s/%i: updated %i rows" ,
1462+ endpoint ,
1463+ max_stream_ordering_inclusive ,
1464+ txn .rowcount ,
1465+ )
1466+
1467+ if endpoint is None :
1468+ # we're done
1469+ return True
1470+
1471+ progress ["min_stream_ordering_exclusive" ] = endpoint
1472+ self .db_pool .updates ._background_update_progress_txn (
1473+ txn ,
1474+ _BackgroundUpdates .EVENTS_POPULATE_STATE_KEY_REJECTIONS ,
1475+ progress ,
1476+ )
1477+ return False
1478+
1479+ done = await self .db_pool .runInteraction (
1480+ desc = "events_populate_state_key_rejections" , func = _populate_txn
1481+ )
1482+
1483+ if done :
1484+ await self .db_pool .updates ._end_background_update (
1485+ _BackgroundUpdates .EVENTS_POPULATE_STATE_KEY_REJECTIONS
1486+ )
1487+
1488+ return batch_size
0 commit comments