|
43 | 43 | DatabasePool,
|
44 | 44 | LoggingDatabaseConnection,
|
45 | 45 | LoggingTransaction,
|
| 46 | + make_tuple_in_list_sql_clause, |
46 | 47 | )
|
47 | 48 | from synapse.storage.engines._base import IsolationLevel
|
48 | 49 | from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
@@ -481,6 +482,83 @@ def f(
|
481 | 482 | }
|
482 | 483 | return results
|
483 | 484 |
|
| 485 | + async def get_linearized_receipts_for_events( |
| 486 | + self, |
| 487 | + room_and_event_ids: Collection[Tuple[str, str]], |
| 488 | + ) -> Sequence[JsonMapping]: |
| 489 | + """Get all receipts for the given set of events. |
| 490 | +
|
| 491 | + Arguments: |
| 492 | + room_and_event_ids: A collection of 2-tuples of room ID and |
| 493 | + event IDs to fetch receipts for |
| 494 | +
|
| 495 | + Returns: |
| 496 | + A list of receipts, one per room. |
| 497 | + """ |
| 498 | + |
| 499 | + def get_linearized_receipts_for_events_txn( |
| 500 | + txn: LoggingTransaction, |
| 501 | + room_id_event_id_tuples: Collection[Tuple[str, str]], |
| 502 | + ) -> List[Tuple[str, str, str, str, Optional[str], str]]: |
| 503 | + clause, args = make_tuple_in_list_sql_clause( |
| 504 | + self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples |
| 505 | + ) |
| 506 | + |
| 507 | + sql = f""" |
| 508 | + SELECT room_id, receipt_type, user_id, event_id, thread_id, data |
| 509 | + FROM receipts_linearized |
| 510 | + WHERE {clause} |
| 511 | + """ |
| 512 | + |
| 513 | + txn.execute(sql, args) |
| 514 | + |
| 515 | + return txn.fetchall() |
| 516 | + |
| 517 | + # room_id -> event_id -> receipt_type -> user_id -> receipt data |
| 518 | + room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {} |
| 519 | + for batch in batch_iter(room_and_event_ids, 1000): |
| 520 | + batch_results = await self.db_pool.runInteraction( |
| 521 | + "get_linearized_receipts_for_events", |
| 522 | + get_linearized_receipts_for_events_txn, |
| 523 | + batch, |
| 524 | + ) |
| 525 | + |
| 526 | + for ( |
| 527 | + room_id, |
| 528 | + receipt_type, |
| 529 | + user_id, |
| 530 | + event_id, |
| 531 | + thread_id, |
| 532 | + data, |
| 533 | + ) in batch_results: |
| 534 | + content = room_to_content.setdefault(room_id, {}) |
| 535 | + user_receipts = content.setdefault(event_id, {}).setdefault( |
| 536 | + receipt_type, {} |
| 537 | + ) |
| 538 | + |
| 539 | + receipt_data = db_to_json(data) |
| 540 | + if thread_id is not None: |
| 541 | + receipt_data["thread_id"] = thread_id |
| 542 | + |
| 543 | + # MSC4102: always replace threaded receipts with unthreaded ones |
| 544 | + # if there is a clash. Specifically: |
| 545 | + # - if there is no existing receipt, great, set the data. |
| 546 | + # - if there is an existing receipt, is it threaded (thread_id |
| 547 | + # present)? YES: replace if this receipt has no thread id. |
| 548 | + # NO: do not replace. This means we will drop some receipts, but |
| 549 | + # MSC4102 is designed to drop semantically meaningless receipts, |
| 550 | + # so this is okay. Previously, we would drop meaningful data! |
| 551 | + if user_id in user_receipts: |
| 552 | + if "thread_id" in user_receipts[user_id] and not thread_id: |
| 553 | + user_receipts[user_id] = receipt_data |
| 554 | + else: |
| 555 | + user_receipts[user_id] = receipt_data |
| 556 | + |
| 557 | + return [ |
| 558 | + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": content} |
| 559 | + for room_id, content in room_to_content.items() |
| 560 | + ] |
| 561 | + |
484 | 562 | @cached(
|
485 | 563 | num_args=2,
|
486 | 564 | )
|
@@ -996,6 +1074,12 @@ def __init__(
|
996 | 1074 | self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
|
997 | 1075 | self._background_receipts_graph_unique_index,
|
998 | 1076 | )
|
| 1077 | + self.db_pool.updates.register_background_index_update( |
| 1078 | + update_name="receipts_room_id_event_id_index", |
| 1079 | + index_name="receipts_linearized_event_id", |
| 1080 | + table="receipts_linearized", |
| 1081 | + columns=("room_id", "event_id"), |
| 1082 | + ) |
999 | 1083 |
|
1000 | 1084 | async def _populate_receipt_event_stream_ordering(
|
1001 | 1085 | self, progress: JsonDict, batch_size: int
|
|
0 commit comments