Skip to content

Commit 7881831

Browse files
committed
Fix: drop invalid pending messages
Problem: if an invalid message somehow managed to reach the pending message collection, the message would be retried indefinitely logging exceptions on each run. Solution: drop invalid messages.
1 parent e17a06c commit 7881831

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

src/aleph/handlers/storage.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ async def handle_new_storage(store_message: ValidatedStoreMessage) -> Optional[b
4848

4949
ipfs_enabled = config.ipfs.enabled.value
5050
do_standard_lookup = True
51-
5251
if engine == ItemType.ipfs and ipfs_enabled:
5352
if item_type_from_hash(item_hash) != ItemType.ipfs:
5453
LOGGER.warning("Invalid IPFS hash: '%s'", item_hash)

src/aleph/jobs/process_pending_messages.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from aleph.model.pending import PendingMessage
2020
from aleph.services.p2p import singleton
2121
from .job_utils import prepare_loop, process_job_results
22+
from ..exceptions import InvalidMessageError
2223
from ..schemas.pending_messages import parse_message
2324

2425
LOGGER = getLogger("jobs.pending_messages")
@@ -49,7 +50,15 @@ async def handle_pending_message(
4950
seen_ids: Dict[Tuple, int],
5051
) -> List[DbBulkOperation]:
5152

52-
message = parse_message(pending["message"])
53+
delete_pending_message_op = DbBulkOperation(
54+
PendingMessage, DeleteOne({"_id": pending["_id"]})
55+
)
56+
57+
try:
58+
message = parse_message(pending["message"])
59+
except InvalidMessageError:
60+
# If an invalid message somehow ended in pending messages, drop it.
61+
return [delete_pending_message_op]
5362

5463
async with sem:
5564
status, operations = await incoming(
@@ -64,9 +73,7 @@ async def handle_pending_message(
6473
)
6574

6675
if status != IncomingStatus.RETRYING_LATER:
67-
operations.append(
68-
DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]}))
69-
)
76+
operations.append(delete_pending_message_op)
7077

7178
return operations
7279

0 commit comments

Comments
 (0)