diff --git a/.dockerignore b/.dockerignore index 4fe21126f..2598c9154 100644 --- a/.dockerignore +++ b/.dockerignore @@ -55,5 +55,5 @@ MANIFEST venv*/ # User configuration with secrets -config.yml +/config.yml node-secret.key diff --git a/.gitignore b/.gitignore index 81750de95..d76dc4dc8 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,6 @@ MANIFEST venv*/ # Secret files -config.yml +/config.yml node-secret.key keys/ diff --git a/deployment/docker-build/dev/Dockerfile b/deployment/docker-build/dev/Dockerfile new file mode 100644 index 000000000..3572c3340 --- /dev/null +++ b/deployment/docker-build/dev/Dockerfile @@ -0,0 +1,80 @@ +FROM ubuntu:22.04 as base + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get update && apt-get -y upgrade && apt-get install -y software-properties-common +RUN add-apt-repository -y ppa:deadsnakes/ppa + +# Runtime + build packages +RUN apt-get update && apt-get -y upgrade && apt-get install -y \ + git \ + libgmp-dev \ + libpq5 \ + python3.12 + +FROM base as builder + +RUN openssl version +RUN cat /etc/ssl/openssl.cnf +RUN echo "$OPENSSL_CONF" + +# Build-only packages +RUN apt-get update && apt-get install -y \ + build-essential \ + curl \ + pkg-config \ + python3.12-dev \ + python3.12-venv \ + libpq-dev \ + software-properties-common + +# Install Rust to build Python packages +RUN curl https://sh.rustup.rs > rustup-installer.sh +RUN sh rustup-installer.sh -y +ENV PATH="/root/.cargo/bin:${PATH}" + +# Some packages (py-ed25519-bindings, required by substrate-interface) need the nightly +# Rust toolchain to be built at this time +RUN rustup default nightly + +# Create virtualenv +RUN python3.12 -m venv /opt/venv + +# Install pip +ENV PIP_NO_CACHE_DIR yes +RUN /opt/venv/bin/python3.12 -m pip install --upgrade pip wheel +ENV PATH="/opt/venv/bin:${PATH}" + +WORKDIR /opt/pyaleph +COPY alembic.ini pyproject.toml ./ +COPY LICENSE.txt README.md ./ +COPY deployment/migrations ./deployment/migrations +COPY deployment/scripts ./deployment/scripts +COPY .git ./.git +COPY src ./src + +RUN pip install -e .[linting] +RUN pip install hatch + +FROM base + +COPY --from=builder /opt/venv /opt/venv +COPY --from=builder /opt/pyaleph /opt/pyaleph + +RUN apt-get update && apt-get install -y \ + libsodium23 \ + libsodium-dev \ + libgmp-dev + +# OpenSSL 3 disabled some hash algorithms by default. They must be reenabled +# by enabling the "legacy" providers in /etc/ssl/openssl.cnf. +COPY ./deployment/docker-build/openssl.cnf.patch /etc/ssl/openssl.cnf.patch +RUN patch /etc/ssl/openssl.cnf /etc/ssl/openssl.cnf.patch + +RUN mkdir /var/lib/pyaleph + +ENV PATH="/opt/venv/bin:${PATH}" +WORKDIR /opt/pyaleph + +RUN hatch build +ENTRYPOINT ["bash", "deployment/scripts/run_aleph_ccn.sh"] diff --git a/deployment/docker-build/dev/config.yml b/deployment/docker-build/dev/config.yml new file mode 100644 index 000000000..266e48a39 --- /dev/null +++ b/deployment/docker-build/dev/config.yml @@ -0,0 +1,65 @@ +--- +nuls2: + chain_id: 1 + enabled: false + packing_node: false + sync_address: NULSd6HgUkssMi6oSjwEn3puNSijLKnyiRV7H + api_url: https://apiserver.nuls.io/ + explorer_url: https://nuls.world/ + token_contract: NULSd6Hh1FjbnAktH1FFvFnTzfgFnZtgAhYut + + +ethereum: + enabled: true + # api_url: {{ ALEPH_ETHEREUM_URL }} + chain_id: 1 + packing_node: false + sync_contract: "0x166fd4299364B21c7567e163d85D78d2fb2f8Ad5" + start_height: 21614811 + token_contract: "0x27702a26126e0B3702af63Ee09aC4d1A084EF628" + token_start_height: 21614792 + + +postgres: + host: postgres + port: 5432 + user: aleph + password: decentralize-everything + name: aleph-test + + +storage: + store_files: true + engine: filesystem + folder: /var/lib/pyaleph + + +ipfs: + enabled: true + host: ipfs + port: 5001 + gateway_port: 8080 + + +aleph: + queue_topic: ALEPH-TEST + + +p2p: + daemon_host: p2p-service + http_port: 4024 + port: 4025 + control_port: 4030 + listen_port: 4031 + reconnect_delay: 60 + + +rabbitmq: + host: rabbitmq + port: 5672 + username: aleph-p2p + password: decentralize-everything + + +sentry: + dsn: "" diff --git a/deployment/docker-build/dev/docker-compose.yml b/deployment/docker-build/dev/docker-compose.yml new file mode 100644 index 000000000..ecce78286 --- /dev/null +++ b/deployment/docker-build/dev/docker-compose.yml @@ -0,0 +1,133 @@ +--- + +volumes: + pyaleph-ipfs: + pyaleph-local-storage: + pyaleph-postgres: + + +services: + pyaleph: + # platform: linux/amd64 + restart: always + # image: alephim/pyaleph-node:0.5.8 + image: localhost/alephim/pyaleph-node-dev:build + build: + dockerfile: ./deployment/docker-build/dev/Dockerfile + context: ../../.. + command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -v + volumes: + - pyaleph-local-storage:/var/lib/pyaleph + - ./config.yml:/opt/pyaleph/config.yml + - ./keys:/opt/pyaleph/keys + - ../../..:/opt/pyaleph + depends_on: + - postgres + - ipfs + - p2p-service + - redis + networks: + - pyaleph + logging: + options: + max-size: 50m + + pyaleph-api: + # platform: linux/amd64 + restart: always + # image: alephim/pyaleph-node:0.5.8 + image: localhost/alephim/pyaleph-node:build + build: + dockerfile: ./deployment/docker-build/dev/Dockerfile + context: ../../.. + command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -v + entrypoint: ["bash", "deployment/scripts/run_aleph_ccn_api.sh"] + ports: + - "4024:4024/tcp" + volumes: + - pyaleph-local-storage:/var/lib/pyaleph + - ./config.yml:/opt/pyaleph/config.yml + - ../../..:/opt/pyaleph + environment: + CCN_CONFIG_API_PORT: 4024 + CCN_CONFIG_API_NB_WORKERS: 8 + depends_on: + - pyaleph + networks: + - pyaleph + logging: + options: + max-size: 50m + + p2p-service: + restart: always + image: alephim/p2p-service:0.1.3 + networks: + - pyaleph + volumes: + - ./config.yml:/etc/p2p-service/config.yml + - ./keys/node-secret.pkcs8.der:/etc/p2p-service/node-secret.pkcs8.der + depends_on: + - rabbitmq + environment: + RUST_LOG: info + ports: + - "4025:4025" + - "127.0.0.1:4030:4030" + command: + - "--config" + - "/etc/p2p-service/config.yml" + - "--private-key-file" + - "/etc/p2p-service/node-secret.pkcs8.der" + + postgres: + restart: always + image: postgres:15.1 + ports: + - "127.0.0.1:5432:5432" + volumes: + - pyaleph-postgres:/var/lib/postgresql/data + environment: + POSTGRES_USER: aleph + POSTGRES_PASSWORD: decentralize-everything + POSTGRES_DB: aleph + networks: + - pyaleph + shm_size: "2gb" + + rabbitmq: + restart: always + image: rabbitmq:3.11.15-management + networks: + - pyaleph + environment: + RABBITMQ_DEFAULT_USER: aleph-p2p + RABBITMQ_DEFAULT_PASS: decentralize-everything + ports: + - "127.0.0.1:5672:5672" + - "127.0.0.1:15672:15672" + + redis: + restart: always + image: redis:7.0.10 + networks: + - pyaleph + + ipfs: + restart: always + image: ipfs/kubo:v0.15.0 + ports: + - "4001:4001" + - "4001:4001/udp" + - "127.0.0.1:5001:5001" + volumes: + - "pyaleph-ipfs:/data/ipfs" + environment: + - IPFS_PROFILE=server + networks: + - pyaleph + command: ["daemon", "--enable-pubsub-experiment", "--migrate"] + + +networks: + pyaleph: diff --git a/deployment/docker-build/test/Dockerfile b/deployment/docker-build/test/Dockerfile new file mode 100644 index 000000000..7a6169ace --- /dev/null +++ b/deployment/docker-build/test/Dockerfile @@ -0,0 +1,78 @@ +FROM ubuntu:22.04 + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get update && apt-get -y upgrade && apt-get install -y software-properties-common +RUN add-apt-repository -y ppa:deadsnakes/ppa + +# Runtime + build packages +RUN apt-get update && apt-get -y upgrade && apt-get install -y \ + git \ + libgmp-dev \ + libpq5 \ + python3.12 + +RUN openssl version +RUN cat /etc/ssl/openssl.cnf +RUN echo "$OPENSSL_CONF" + +# Build-only packages +RUN apt-get update && apt-get install -y \ + build-essential \ + curl \ + pkg-config \ + python3.12-dev \ + python3.12-venv \ + libpq-dev \ + software-properties-common + +# Install Rust to build Python packages +RUN curl https://sh.rustup.rs > rustup-installer.sh +RUN sh rustup-installer.sh -y +ENV PATH="/root/.cargo/bin:${PATH}" + +# Some packages (py-ed25519-bindings, required by substrate-interface) need the nightly +# Rust toolchain to be built at this time +RUN rustup default nightly + +# Create virtualenv +RUN python3.12 -m venv /opt/venv + +# Install pip +ENV PIP_NO_CACHE_DIR yes +RUN /opt/venv/bin/python3.12 -m pip install --upgrade pip wheel +ENV PATH="/opt/venv/bin:${PATH}" + +WORKDIR /opt/pyaleph +COPY alembic.ini pyproject.toml ./ +COPY LICENSE.txt README.md ./ +COPY deployment/migrations ./deployment/migrations +COPY deployment/scripts ./deployment/scripts +COPY .git ./.git +COPY src ./src + +# Install project deps and test deps +RUN pip install -e .[testing,docs] +RUN pip install hatch + +# Install project test deps +RUN apt-get update && apt-get install -y \ + libsodium23 \ + libsodium-dev \ + libgmp-dev \ + postgresql \ + redis \ + curl + +# OpenSSL 3 disabled some hash algorithms by default. They must be reenabled +# by enabling the "legacy" providers in /etc/ssl/openssl.cnf. +COPY ./deployment/docker-build/openssl.cnf.patch /etc/ssl/openssl.cnf.patch +RUN patch /etc/ssl/openssl.cnf /etc/ssl/openssl.cnf.patch + +RUN mkdir /var/lib/pyaleph +ENV PATH="/opt/venv/bin:${PATH}" +WORKDIR /opt/pyaleph + +RUN hatch build +CMD ["hatch", "run", "testing:test"] + diff --git a/deployment/docker-build/test/config.yml b/deployment/docker-build/test/config.yml new file mode 100644 index 000000000..7adaa65a8 --- /dev/null +++ b/deployment/docker-build/test/config.yml @@ -0,0 +1,12 @@ +--- +postgres: + host: postgres + port: 5432 + user: aleph + password: decentralize-everything + database: aleph + + +redis: + host: redis + port: 6379 diff --git a/deployment/docker-build/test/docker-compose.yml b/deployment/docker-build/test/docker-compose.yml new file mode 100644 index 000000000..39c9569be --- /dev/null +++ b/deployment/docker-build/test/docker-compose.yml @@ -0,0 +1,48 @@ +--- + +volumes: + pyaleph-ipfs: + pyaleph-local-storage: + pyaleph-postgres: + + +services: + pyaleph: + image: localhost/alephim/pyaleph-node-test:build + build: + dockerfile: ./deployment/docker-build/test/Dockerfile + context: ../../.. + volumes: + - pyaleph-local-storage:/var/lib/pyaleph + - ./config.yml:/opt/pyaleph/config.yml + - ../../..:/opt/pyaleph + depends_on: + - postgres + - redis + networks: + - pyaleph + logging: + options: + max-size: 50m + + postgres: + image: postgres:15.1 + volumes: + - pyaleph-postgres:/var/lib/postgresql/data + environment: + POSTGRES_USER: aleph + POSTGRES_PASSWORD: decentralize-everything + POSTGRES_DB: aleph + networks: + - pyaleph + shm_size: "2gb" + + redis: + restart: always + image: redis:7.0.10 + networks: + - pyaleph + + +networks: + pyaleph: diff --git a/deployment/migrations/versions/0032_a3ef27f0db81_fix_duplicated_forgotten_messages.py b/deployment/migrations/versions/0032_a3ef27f0db81_fix_duplicated_forgotten_messages.py new file mode 100644 index 000000000..d1c6d66ee --- /dev/null +++ b/deployment/migrations/versions/0032_a3ef27f0db81_fix_duplicated_forgotten_messages.py @@ -0,0 +1,179 @@ +"""fix_duplicated_forgotten_messages + +Revision ID: a3ef27f0db81 +Revises: d8e9852e5775 +Create Date: 2025-01-23 14:52:43.314424 + +""" + +import asyncio +from threading import Thread + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import insert +from aleph.types.db_session import DbSession + +from aleph.db.models.vms import VmBaseDb, VmVersionDb +import logging + +logger = logging.getLogger("alembic") + +# revision identifiers, used by Alembic. +revision = "a3ef27f0db81" +down_revision = "d8e9852e5775" +branch_labels = None +depends_on = None + + +def refresh_vm_version(session: DbSession, vm_hash: str) -> None: + coalesced_ref = sa.func.coalesce(VmBaseDb.replaces, VmBaseDb.item_hash) + select_latest_revision_stmt = ( + sa.select( + coalesced_ref.label("replaces"), + sa.func.max(VmBaseDb.created).label("created"), + ).group_by(coalesced_ref) + ).subquery() + select_latest_program_version_stmt = ( + sa.select( + coalesced_ref, + VmBaseDb.owner, + VmBaseDb.item_hash, + VmBaseDb.created, + ) + .join( + select_latest_revision_stmt, + (coalesced_ref == select_latest_revision_stmt.c.replaces) + & (VmBaseDb.created == select_latest_revision_stmt.c.created), + ) + .where(coalesced_ref == vm_hash) + ) + + insert_stmt = insert(VmVersionDb).from_select( + ["vm_hash", "owner", "current_version", "last_updated"], + select_latest_program_version_stmt, + ) + upsert_stmt = insert_stmt.on_conflict_do_update( + constraint="program_versions_pkey", + set_={ + "current_version": insert_stmt.excluded.current_version, + "last_updated": insert_stmt.excluded.last_updated, + }, + ) + session.execute(sa.delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash)) + session.execute(upsert_stmt) + + +def do_delete_vms(session: DbSession) -> None: + # DELETE VMS + + vm_hashes = ( + session.execute( + """ + SELECT m.item_hash + FROM messages m + INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash) + WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM' + """ + ) + .scalars() + .all() + ) + + logger.debug("DELETE VMS: %r", vm_hashes) + + session.execute( + """ + DELETE + FROM vms v + WHERE v.item_hash in + (SELECT m.item_hash + FROM messages m + INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash) + WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM') + """ + ) + + session.execute( + """ + DELETE + FROM vms v + WHERE v.replaces in + (SELECT m.item_hash + FROM messages m + INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash) + WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM') + """ + ) + + for item_hash in vm_hashes: + refresh_vm_version(session, item_hash) + + +def do_delete_store(session: DbSession) -> None: + # DELETE STORE + + session.execute( + """ + DELETE + FROM file_pins fp + WHERE fp.item_hash in ( + SELECT m.item_hash + FROM messages m + INNER JOIN forgotten_messages fm ON m.item_hash = fm.item_hash + WHERE m.type = 'STORE' + ) + """ + ) + + +def do_delete_messages(session: DbSession) -> None: + # DELETE MESSAGES + + session.execute( + """ + DELETE + FROM messages m + using forgotten_messages fm + WHERE m.item_hash = fm.item_hash + """ + ) + + +def do_delete(session: DbSession) -> None: + """ + NOTE: We need to migrate (delete duplicates) from aggregate_elements, aggregates, file_tags and posts tables. + The issue that was causing this inconsistent state has been fixed and we have considered that it doesn't worth to clean + this tables for now as there are less than 1k orphan rows + """ + + op.execute( + """ + INSERT INTO error_codes(code, description) VALUES + (504, 'Cannot process a forgotten message') + """ + ) + + do_delete_vms(session) + do_delete_store(session) + do_delete_messages(session) + + +async def upgrade_async() -> None: + session = DbSession(bind=op.get_bind()) + do_delete(session) + session.close() + + +def upgrade_thread(): + asyncio.run(upgrade_async()) + + +def upgrade() -> None: + thread = Thread(target=upgrade_thread, daemon=True) + thread.start() + thread.join() + + +def downgrade() -> None: + op.execute("DELETE FROM error_codes WHERE code = 504") diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 117fa3f20..5d8ba5f00 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -13,6 +13,7 @@ from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import insert_content_file_pin, upsert_file from aleph.db.accessors.messages import ( + get_forgotten_message, get_message_by_item_hash, make_confirmation_upsert_query, make_message_status_upsert_query, @@ -21,6 +22,7 @@ ) from aleph.db.accessors.pending_messages import delete_pending_message from aleph.db.models import MessageDb, MessageStatusDb, PendingMessageDb +from aleph.db.models.messages import ForgottenMessageDb from aleph.exceptions import ( ContentCurrentlyUnavailable, InvalidContent, @@ -37,8 +39,9 @@ from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.db_session import DbSession, DbSessionFactory from aleph.types.files import FileType -from aleph.types.message_processing_result import ProcessedMessage +from aleph.types.message_processing_result import ProcessedMessage, RejectedMessage from aleph.types.message_status import ( + ErrorCode, InvalidMessageException, InvalidMessageFormat, InvalidSignature, @@ -129,23 +132,6 @@ async def fetch_related_content(self, session: DbSession, message: MessageDb): f"Invalid IPFS hash for message {message.item_hash}" ) from e - @staticmethod - async def confirm_existing_message( - session: DbSession, - existing_message: MessageDb, - pending_message: PendingMessageDb, - ): - if pending_message.signature != existing_message.signature: - raise InvalidSignature(f"Invalid signature for {pending_message.item_hash}") - - delete_pending_message(session=session, pending_message=pending_message) - if tx_hash := pending_message.tx_hash: - session.execute( - make_confirmation_upsert_query( - item_hash=pending_message.item_hash, tx_hash=tx_hash - ) - ) - async def load_fetched_content( self, session: DbSession, pending_message: PendingMessageDb ) -> PendingMessageDb: @@ -334,6 +320,17 @@ async def confirm_existing_message( ) ) + @staticmethod + async def confirm_existing_forgotten_message( + session: DbSession, + forgotten_message: ForgottenMessageDb, + pending_message: PendingMessageDb, + ): + if pending_message.signature != forgotten_message.signature: + raise InvalidSignature(f"Invalid signature for {pending_message.item_hash}") + + delete_pending_message(session=session, pending_message=pending_message) + async def insert_message( self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb ): @@ -382,7 +379,7 @@ async def verify_and_fetch( async def process( self, session: DbSession, pending_message: PendingMessageDb - ) -> ProcessedMessage: + ) -> ProcessedMessage | RejectedMessage: """ Process a pending message. @@ -396,6 +393,7 @@ async def process( is a new one or a confirmation. """ + # Note: Check if message already exists (and confirm it) existing_message = get_message_by_item_hash( session=session, item_hash=ItemHash(pending_message.item_hash) ) @@ -407,6 +405,22 @@ async def process( ) return ProcessedMessage(message=existing_message, is_confirmation=True) + # Note: Check if message is already forgotten (and confirm it) + # this is to avoid race conditions when a confirmation arrives after the FORGET message has been preocessed + forgotten_message = get_forgotten_message( + session=session, item_hash=ItemHash(pending_message.item_hash) + ) + if forgotten_message: + await self.confirm_existing_forgotten_message( + session=session, + forgotten_message=forgotten_message, + pending_message=pending_message, + ) + return RejectedMessage( + pending_message=pending_message, + error_code=ErrorCode.FORGOTTEN_DUPLICATE, + ) + message = await self.verify_and_fetch( session=session, pending_message=pending_message ) diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 6c1efa328..2e1403638 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -53,6 +53,7 @@ class ErrorCode(IntEnum): FORGET_TARGET_NOT_FOUND = 501 FORGET_FORGET = 502 FORGET_NOT_ALLOWED = 503 + FORGOTTEN_DUPLICATE = 504 class MessageProcessingException(Exception): diff --git a/tests/message_processing/fixtures/test-data-forgotten-messages.json b/tests/message_processing/fixtures/test-data-forgotten-messages.json new file mode 100644 index 000000000..47387b190 --- /dev/null +++ b/tests/message_processing/fixtures/test-data-forgotten-messages.json @@ -0,0 +1,36 @@ +[ + { + "chain": "ETH", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "channel": "INTEGRATION_TESTS", + "signature": "0x271939ae35918d0e90877f2319dd0d9737f8334d52539125743caf2460b3896423ca69b1fc85662443cc0bd4ce91e2fb247d7d8291284d8c431e6962c611c4c31c", + "time": 1665758931.005458, + "item_type": "inline", + "item_content": "{\"address\":\"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106\",\"time\":1665758931.0054002,\"content\":{\"title\":\"My first blog post\",\"body\":\"Ermahgerd, a bleug!\"},\"type\":\"test-post\"}", + "item_hash": "9f02e3b5efdbdc0b487359117ae3af40db654892487feae452689a0b84dc1025" + }, + { + "chain": "ETH", + "channel": "TEST", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "FORGET", + "time": 1652786534.1139255, + "item_type": "inline", + "item_content": "{\"address\":\"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106\",\"time\":1652786534.1138077,\"hashes\":[\"9f02e3b5efdbdc0b487359117ae3af40db654892487feae452689a0b84dc1025\"]}", + "item_hash": "431a0d2f79ecfa859949d2a09f67068ce7ebd4eb777d179ad958be6c79abc66b", + "signature": "0x409cdef65af51d6a508a1fdc56c0baa6d1abac7f539ab5f290e3245c522a4c766b930c4196d9f5d8c8c94a4d36c4b65bf04a2773f058f03803b9b0bca2fd85a51b" + }, + { + "chain": "ETH", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "channel": "INTEGRATION_TESTS", + "signature": "0x271939ae35918d0e90877f2319dd0d9737f8334d52539125743caf2460b3896423ca69b1fc85662443cc0bd4ce91e2fb247d7d8291284d8c431e6962c611c4c31c", + "time": 1665758931.005458, + "item_type": "inline", + "item_content": "{\"address\":\"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106\",\"time\":1665758931.0054002,\"content\":{\"title\":\"My first blog post\",\"body\":\"Ermahgerd, a bleug!\"},\"type\":\"test-post\"}", + "item_hash": "9f02e3b5efdbdc0b487359117ae3af40db654892487feae452689a0b84dc1025", + "tx_hash": "0xfffd825eff4dfeb229d5fe6cfc7ca7f0a6f692fbd0286a6b08b7d0890cbeeb4a" + } +] \ No newline at end of file diff --git a/tests/message_processing/load_fixtures.py b/tests/message_processing/load_fixtures.py index 92c9d4b3f..8469200c5 100644 --- a/tests/message_processing/load_fixtures.py +++ b/tests/message_processing/load_fixtures.py @@ -15,3 +15,10 @@ def load_fixture_message(fixture: str) -> Dict: with open(fixture_path) as f: return json.load(f) + + +def load_fixture_message_list(fixture: str) -> List[Dict]: + fixture_path = Path(__file__).parent / "fixtures" / fixture + + with open(fixture_path) as f: + return json.load(f) diff --git a/tests/message_processing/test_process_forgotten_messages.py b/tests/message_processing/test_process_forgotten_messages.py new file mode 100644 index 000000000..54a5b9e6a --- /dev/null +++ b/tests/message_processing/test_process_forgotten_messages.py @@ -0,0 +1,93 @@ +import datetime as dt +from typing import cast + +import pytest +from configmanager import Config + +from aleph.db.models import PendingMessageDb +from aleph.db.models.messages import ForgottenMessageDb, MessageDb +from aleph.handlers.message_handler import MessageHandler +from aleph.storage import StorageService +from aleph.types.db_session import DbSessionFactory +from aleph.types.message_processing_result import ProcessedMessage, RejectedMessage +from aleph.types.message_status import ErrorCode + +from .load_fixtures import load_fixture_message_list + + +@pytest.mark.asyncio +async def test_duplicated_forgotten_message( + mocker, + mock_config: Config, + session_factory: DbSessionFactory, + test_storage_service: StorageService, +): + signature_verifier = mocker.AsyncMock() + + messages = load_fixture_message_list("test-data-forgotten-messages.json") + + m1 = PendingMessageDb.from_message_dict( + messages[0], fetched=True, reception_time=dt.datetime(2025, 1, 1) + ) + m2 = PendingMessageDb.from_message_dict( + messages[1], fetched=True, reception_time=dt.datetime(2025, 1, 2) + ) + m3 = PendingMessageDb.from_message_dict( + messages[2], fetched=True, reception_time=dt.datetime(2025, 1, 3) + ) + post_hash = m1.item_hash + + message_handler = MessageHandler( + signature_verifier=signature_verifier, + storage_service=test_storage_service, + config=mock_config, + ) + + with session_factory() as session: + # 1) process post message + test1 = await message_handler.process( + session=session, + pending_message=m1, + ) + assert isinstance(test1, ProcessedMessage) + + res1 = cast( + MessageDb, + session.query(MessageDb).where(MessageDb.item_hash == post_hash).first(), + ) + assert res1.item_hash == post_hash + + # 2) process forget message + test2 = await message_handler.process( + session=session, + pending_message=m2, + ) + assert isinstance(test2, ProcessedMessage) + + res2 = cast( + MessageDb, + session.query(MessageDb).where(MessageDb.item_hash == post_hash).first(), + ) + assert res2 is None + + # 3) process post message confirmation (discarding it) + test3 = await message_handler.process( + session=session, + pending_message=m3, + ) + assert isinstance(test3, RejectedMessage) + assert test3.error_code == ErrorCode.FORGOTTEN_DUPLICATE + + res3 = cast( + MessageDb, + session.query(MessageDb).where(MessageDb.item_hash == post_hash).first(), + ) + assert res3 is None + + res4 = cast( + ForgottenMessageDb, + session.query(ForgottenMessageDb) + .where(ForgottenMessageDb.item_hash == post_hash) + .first(), + ) + assert res4