Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10591.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some of the federation event authentication code for clarity.
3 changes: 2 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,11 @@ def __str__(self):
return self.__repr__()

def __repr__(self):
return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
return "<FrozenEvent event_id=%r, type=%r, state_key=%r, outlier=%s>" % (
self.get("event_id", None),
self.get("type", None),
self.get("state_key", None),
self.internal_metadata.is_outlier(),
)


Expand Down
1 change: 1 addition & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ async def _process_incoming_pdus_in_room_inner(
# has started processing).
while True:
async with lock:
logger.info("handling received PDU: %s", event)
try:
await self.handler.on_receive_pdu(
origin, event, sent_to_us_directly=True
Expand Down
52 changes: 24 additions & 28 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,23 +220,26 @@ async def on_receive_pdu(
room_id = pdu.room_id
event_id = pdu.event_id

logger.info("handling received PDU: %s", pdu)

# We reprocess pdus when we have seen them only as outliers
existing = await self.store.get_event(
event_id, allow_none=True, allow_rejected=True
)

# FIXME: Currently we fetch an event again when we already have it
# if it has been marked as an outlier.

already_seen = existing and (
not existing.internal_metadata.is_outlier()
or pdu.internal_metadata.is_outlier()
)
if already_seen:
logger.debug("Already seen pdu")
return
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"Ignoring received event %s which we have already seen", event_id
)
return
if pdu.internal_metadata.is_outlier():
logger.info(
"Ignoring received outlier %s which we already have as an outlier",
event_id,
)
return
logger.info("De-outliering event %s", event_id)

# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
Expand Down Expand Up @@ -331,7 +334,8 @@ async def on_receive_pdu(
"Found all missing prev_events",
)

if prevs - seen:
missing_prevs = prevs - seen
if missing_prevs:
# We've still not been able to get all of the prev_events for this event.
#
# In this case, we need to fall back to asking another server in the
Expand Down Expand Up @@ -359,8 +363,8 @@ async def on_receive_pdu(
if sent_to_us_directly:
logger.warning(
"Rejecting: failed to fetch %d prev events: %s",
len(prevs - seen),
shortstr(prevs - seen),
len(missing_prevs),
shortstr(missing_prevs),
)
raise FederationError(
"ERROR",
Expand All @@ -373,9 +377,10 @@ async def on_receive_pdu(
)

logger.info(
"Event %s is missing prev_events: calculating state for a "
"Event %s is missing prev_events %s: calculating state for a "
"backwards extremity",
event_id,
shortstr(missing_prevs),
)

# Calculate the state after each of the previous events, and
Expand All @@ -393,7 +398,7 @@ async def on_receive_pdu(

# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
for p in missing_prevs:
logger.info("Requesting state after missing prev_event %s", p)

with nested_logging_context(p):
Expand Down Expand Up @@ -556,21 +561,14 @@ async def _get_missing_events_for_pdu(
logger.warning("Failed to get prev_events: %s", e)
return

logger.info(
"Got %d prev_events: %s",
len(missing_events),
shortstr(missing_events),
)
logger.info("Got %d prev_events", len(missing_events))

# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)

for ev in missing_events:
logger.info(
"Handling received prev_event %s",
ev.event_id,
)
logger.info("Handling received prev_event %s", ev)
with nested_logging_context(ev.event_id):
try:
await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
Expand Down Expand Up @@ -1762,10 +1760,8 @@ async def _handle_queued_pdus(
for p, origin in room_queue:
try:
logger.info(
"Processing queued PDU %s which was received "
"while we were joining %s",
p.event_id,
p.room_id,
"Processing queued PDU %s which was received while we were joining",
p,
)
with nested_logging_context(p.event_id):
await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
Expand Down