|
17 | 17 |
|
18 | 18 | import itertools |
19 | 19 | import logging |
20 | | -import operator |
21 | 20 | from collections import namedtuple |
22 | 21 |
|
23 | 22 | from canonicaljson import json |
|
30 | 29 | from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 |
31 | 30 | from synapse.events.snapshot import EventContext # noqa: F401 |
32 | 31 | from synapse.events.utils import prune_event |
33 | | -from synapse.logging.context import ( |
34 | | - LoggingContext, |
35 | | - PreserveLoggingContext, |
36 | | - make_deferred_yieldable, |
37 | | - run_in_background, |
38 | | -) |
| 32 | +from synapse.logging.context import LoggingContext, PreserveLoggingContext |
39 | 33 | from synapse.metrics.background_process_metrics import run_as_background_process |
40 | 34 | from synapse.types import get_domain_from_id |
41 | 35 | from synapse.util import batch_iter |
@@ -468,39 +462,49 @@ def _get_events_from_db(self, event_ids, allow_rejected=False): |
468 | 462 |
|
469 | 463 | Returns: |
470 | 464 | Deferred[Dict[str, _EventCacheEntry]]: |
471 | | - map from event id to result. |
| 465 | + map from event id to result. May return extra events which |
| 466 | + weren't asked for. |
472 | 467 | """ |
473 | | - if not event_ids: |
474 | | - return {} |
| 468 | + fetched_events = {} |
| 469 | + events_to_fetch = event_ids |
475 | 470 |
|
476 | | - row_map = yield self._enqueue_events(event_ids) |
| 471 | + while events_to_fetch: |
| 472 | + row_map = yield self._enqueue_events(events_to_fetch) |
477 | 473 |
|
478 | | - rows = (row_map.get(event_id) for event_id in event_ids) |
| 474 | + # we need to recursively fetch any redactions of those events |
| 475 | + redaction_ids = set() |
| 476 | + for event_id in events_to_fetch: |
| 477 | + row = row_map.get(event_id) |
| 478 | + fetched_events[event_id] = row |
| 479 | + if row: |
| 480 | + redaction_ids.update(row["redactions"]) |
479 | 481 |
|
480 | | - # filter out absent rows |
481 | | - rows = filter(operator.truth, rows) |
| 482 | + events_to_fetch = redaction_ids.difference(fetched_events.keys()) |
| 483 | + if events_to_fetch: |
| 484 | + logger.debug("Also fetching redaction events %s", events_to_fetch) |
482 | 485 |
|
483 | | - if not allow_rejected: |
484 | | - rows = (r for r in rows if r["rejected_reason"] is None) |
| 486 | + result_map = {} |
| 487 | + for event_id, row in fetched_events.items(): |
| 488 | + if not row: |
| 489 | + continue |
| 490 | + assert row["event_id"] == event_id |
485 | 491 |
|
486 | | - res = yield make_deferred_yieldable( |
487 | | - defer.gatherResults( |
488 | | - [ |
489 | | - run_in_background( |
490 | | - self._get_event_from_row, |
491 | | - row["internal_metadata"], |
492 | | - row["json"], |
493 | | - row["redactions"], |
494 | | - rejected_reason=row["rejected_reason"], |
495 | | - format_version=row["format_version"], |
496 | | - ) |
497 | | - for row in rows |
498 | | - ], |
499 | | - consumeErrors=True, |
| 492 | + rejected_reason = row["rejected_reason"] |
| 493 | + |
| 494 | + if not allow_rejected and rejected_reason: |
| 495 | + continue |
| 496 | + |
| 497 | + cache_entry = yield self._get_event_from_row( |
| 498 | + row["internal_metadata"], |
| 499 | + row["json"], |
| 500 | + row["redactions"], |
| 501 | + rejected_reason=row["rejected_reason"], |
| 502 | + format_version=row["format_version"], |
500 | 503 | ) |
501 | | - ) |
502 | 504 |
|
503 | | - return {e.event.event_id: e for e in res if e} |
| 505 | + result_map[event_id] = cache_entry |
| 506 | + |
| 507 | + return result_map |
504 | 508 |
|
505 | 509 | @defer.inlineCallbacks |
506 | 510 | def _enqueue_events(self, events): |
|
0 commit comments